Page MenuHomeFreeBSD

D1945.diff
No OneTemporary

D1945.diff

Index: sys/sys/buf_ring.h
===================================================================
--- sys/sys/buf_ring.h
+++ sys/sys/buf_ring.h
@@ -1,5 +1,5 @@
/*-
- * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
+ * Copyright (c) 2007-2015 Kip Macy <kmacy@freebsd.org>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -41,24 +41,113 @@
#include <sys/mutex.h>
#endif
+struct br_entry_ {
+ volatile void *bre_ptr;
+};
+
+
struct buf_ring {
volatile uint32_t br_prod_head;
volatile uint32_t br_prod_tail;
int br_prod_size;
int br_prod_mask;
uint64_t br_drops;
+ /* cache line aligned to avoid cache line invalidate traffic
+ * between consumer and producer (false sharing)
+ */
volatile uint32_t br_cons_head __aligned(CACHE_LINE_SIZE);
volatile uint32_t br_cons_tail;
int br_cons_size;
int br_cons_mask;
#ifdef DEBUG_BUFRING
struct mtx *br_lock;
-#endif
- void *br_ring[0] __aligned(CACHE_LINE_SIZE);
+#endif
+ /* cache line aligned to avoid false sharing with other data structures
+ * located just beyond the end of the ring
+ */
+ struct br_entry_ br_ring[0] __aligned(CACHE_LINE_SIZE);
};
/*
- * multi-producer safe lock-free ring buffer enqueue
+ * Many architectures other than x86 permit speculative re-ordering
+ * of loads. Unfortunately, atomic_load_acq_32() is comparatively
+ * expensive so we'd rather elide it if possible.
+ */
+#if defined(__i386__) || defined(__amd64__)
+#define ORDERED_LOAD_32(x) (*x)
+#else
+#define ORDERED_LOAD_32(x) atomic_load_acq_32((x))
+#endif
+
+/*
+ * Multi-producer safe lock-free ring buffer enqueue
+ *
+ * Most architectures do not support the atomic update of multiple
+ * discontiguous locations. So it is not possible to atomically update
+ * the producer index and ring buffer entry. To side-step this limitation
+ * we split update in to 3 steps:
+ * 1) atomically acquiring an index
+ * 2) updating the corresponding ring entry
+ * 3) making the update available to the consumer
+ * In order to split the index update in to an acquire and release
+ * phase there are _two_ producer indexes. 'prod_head' is used for
+ * step 1) and is thus only used by the enqueue itself. 'prod_tail'
+ * is used for step 3) to signal to the consumer that the update is
+ * complete. To guarantee memory ordering the update of 'prod_tail' is
+ * done with a atomic_store_rel_32(...) and the corresponding
+ * initial read of 'prod_tail' by the dequeue functions is done with
+ * an atomic_load_acq_32(...).
+ *
+ * Regarding memory ordering - there are five variables in question:
+ * (br_) prod_head, prod_tail, cons_head, cons_tail, ring[idx={cons, prod}]
+ * It's easiest examine correctness by considering the consequence of
+ * reading a stale value or having an update become visible prior to
+ * preceding writes.
+ *
+ * - prod_head: this is only read by the enqueue routine, if the latter were to
+ * initially read a stale value for it the cmpxchg (atomic_cmpset_acq_32)
+ * would fail. However, the implied memory barrier in cmpxchg would cause the
+ * subsequent read of prod_head to read the up-to-date value permitting the
+ * cmpxchg to succeed the second time.
+ *
+ * - prod_tail: This value is used by dequeue to determine the effective
+ * producer index. On architectures with weaker memory ordering than x86 it
+ * needs special handling. In enqueue it needs to be updated with
+ * atomic_store_rel_32() (i.e. a write memory barrier before update) to
+ * guarantee that the new ring value is committed to memory before it is
+ * made available by prod_tail. In dequeue to guarantee that it is read before
+ * br_ring[cons_head] it needs to be read with atomic_load_acq_32().
+ *
+ * - cons_head: this value is used only by dequeue, it is either updated
+ * atomically (dequeue_mc) or protected by a mutex (dequeue_sc).
+ *
+ * - cons_tail: This is used to communicate the latest consumer index between
+ * dequeue and enqueue. Reading a stale value in enqueue can cause an enqueue
+ * to fail erroneously. To avoid a load being re-ordered after a store (and
+ * thus permitting enqueue to store a new value before the old one has been
+ * consumed) it is updated with an atomic_store_rel_32() in deqeueue.
+ *
+ * - ring[idx] : Updates to this value need to reach memory before the subsequent
+ * update to prod_tail does. Reads need to happen before subsequent updates to
+ * cons_tail.
+ *
+ * Some implementation notes:
+ * - Much like a simpler single-producer single consumer ring buffer,
+ * the producer can not produce faster than the consumer. Hence the
+ * check of 'prod_head' + 1 against 'cons_tail'.
+ *
+ * - The use of "prod_next = (prod_head + 1) & br->br_prod_mask" to
+ * calculate the next index is slightly cheaper than a modulo but
+ * requires the ring to be power-of-2 sized.
+ *
+ * - The critical_enter() / critical_exit() are not required for
+ * correctness. They prevent updates from stalling by having a producer be
+ * preempted after updating 'prod_head' but before updating 'prod_tail'.
+ *
+ * - The "while (br->br_prod_tail != prod_head)"
+ * check assures in order completion (probably not strictly necessary,
+ * but makes it easier to reason about) and allows us to update
+ * 'prod_tail' without a cmpxchg / LOCK prefix.
*
*/
static __inline int
@@ -69,41 +158,48 @@
int i;
for (i = br->br_cons_head; i != br->br_prod_head;
i = ((i + 1) & br->br_cons_mask))
- if(br->br_ring[i] == buf)
+ if(br->br_ring[i].bre_ptr == buf)
panic("buf=%p already enqueue at %d prod=%d cons=%d",
buf, i, br->br_prod_tail, br->br_cons_tail);
#endif
critical_enter();
do {
+
prod_head = br->br_prod_head;
prod_next = (prod_head + 1) & br->br_prod_mask;
cons_tail = br->br_cons_tail;
if (prod_next == cons_tail) {
- rmb();
- if (prod_head == br->br_prod_head &&
- cons_tail == br->br_cons_tail) {
- br->br_drops++;
- critical_exit();
- return (ENOBUFS);
- }
- continue;
+ /* ensure that we only return ENOBUFS
+ * if the latest value matches what we read
+ */
+ if (prod_head != atomic_load_acq_32(&br->br_prod_head) ||
+ cons_tail != atomic_load_acq_32(&br->br_cons_tail))
+ continue;
+
+ br->br_drops++;
+ critical_exit();
+ return (ENOBUFS);
}
- } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next));
+ } while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next));
#ifdef DEBUG_BUFRING
- if (br->br_ring[prod_head] != NULL)
+ if (br->br_ring[prod_head].bre_ptr != NULL)
panic("dangling value in enqueue");
#endif
- br->br_ring[prod_head] = buf;
+ br->br_ring[prod_head].bre_ptr = buf;
/*
* If there are other enqueues in progress
- * that preceeded us, we need to wait for them
- * to complete
- */
+ * that preceded us, we need to wait for them
+ * to complete
+ * re-ordering of reads would not effect correctness
+ */
while (br->br_prod_tail != prod_head)
cpu_spinwait();
- atomic_store_rel_int(&br->br_prod_tail, prod_next);
+ /* ensure that the ring update reaches memory before the new
+ * value of prod_tail
+ */
+ atomic_store_rel_32(&br->br_prod_tail, prod_next);
critical_exit();
return (0);
}
@@ -116,35 +212,47 @@
buf_ring_dequeue_mc(struct buf_ring *br)
{
uint32_t cons_head, cons_next;
- void *buf;
+ volatile void *buf;
critical_enter();
do {
+ /*
+ * prod_tail must be read before br_ring[cons_head] is
+ * and the atomic_cmpset_acq_32 on br_cons_head should
+ * enforce that
+ */
cons_head = br->br_cons_head;
- cons_next = (cons_head + 1) & br->br_cons_mask;
-
if (cons_head == br->br_prod_tail) {
critical_exit();
return (NULL);
}
- } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next));
+ cons_next = (cons_head + 1) & br->br_cons_mask;
+ } while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next));
+
+ /* ensure that the read completes before either of the
+ * subsequent stores
+ */
+ buf = br->br_ring[cons_head].bre_ptr;
+ /* guarantee that the load completes before we update cons_tail */
+ br->br_ring[cons_head].bre_ptr = NULL;
- buf = br->br_ring[cons_head];
-#ifdef DEBUG_BUFRING
- br->br_ring[cons_head] = NULL;
-#endif
/*
* If there are other dequeues in progress
- * that preceeded us, we need to wait for them
- * to complete
- */
+ * that preceded us, we need to wait for them
+ * to complete - no memory barrier needed as
+ * re-ordering shouldn't effect correctness or
+ * progress
+ */
while (br->br_cons_tail != cons_head)
cpu_spinwait();
-
- atomic_store_rel_int(&br->br_cons_tail, cons_next);
+ /*
+ * assure that the ring entry is read before
+ * marking the entry as free by updating cons_tail
+ */
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
critical_exit();
- return (buf);
+ return ((void *)buf);
}
/*
@@ -158,41 +266,47 @@
uint32_t cons_head, cons_next;
#ifdef PREFETCH_DEFINED
uint32_t cons_next_next;
-#endif
uint32_t prod_tail;
- void *buf;
-
+#endif
+ volatile void *buf;
+
+ /*
+ * prod_tail tells whether or not br_ring[cons_head] is valid
+ * thus we must guarantee that it is read first
+ */
cons_head = br->br_cons_head;
- prod_tail = br->br_prod_tail;
-
+ if (cons_head == ORDERED_LOAD_32(&br->br_prod_tail))
+ return (NULL);
+
cons_next = (cons_head + 1) & br->br_cons_mask;
#ifdef PREFETCH_DEFINED
+ /*
+ * If prod_tail is stale we will prefetch the wrong value - but this is safe
+ * as cache coherence (should) ensure that the when the value is loaded for
+ * actual use it is fetched from main memory
+ */
+ prod_tail = br->br_prod_tail;
cons_next_next = (cons_head + 2) & br->br_cons_mask;
-#endif
-
- if (cons_head == prod_tail)
- return (NULL);
-
-#ifdef PREFETCH_DEFINED
if (cons_next != prod_tail) {
- prefetch(br->br_ring[cons_next]);
+ prefetch(br->br_ring[cons_next].bre_ptr);
if (cons_next_next != prod_tail)
- prefetch(br->br_ring[cons_next_next]);
+ prefetch(br->br_ring[cons_next_next].bre_ptr);
}
#endif
br->br_cons_head = cons_next;
- buf = br->br_ring[cons_head];
-
+ buf = br->br_ring[cons_head].bre_ptr;
+ /* guarantee that the load completes before we update cons_tail */
+ br->br_ring[cons_head].bre_ptr = NULL;
#ifdef DEBUG_BUFRING
- br->br_ring[cons_head] = NULL;
if (!mtx_owned(br->br_lock))
panic("lock not held on single consumer dequeue");
if (br->br_cons_tail != cons_head)
panic("inconsistent list cons_tail=%d cons_head=%d",
br->br_cons_tail, cons_head);
#endif
- br->br_cons_tail = cons_next;
- return (buf);
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
+
+ return ((void *)buf);
}
/*
@@ -205,7 +319,7 @@
{
uint32_t cons_head, cons_next;
uint32_t prod_tail;
-
+
cons_head = br->br_cons_head;
prod_tail = br->br_prod_tail;
@@ -213,10 +327,17 @@
if (cons_head == prod_tail)
return;
br->br_cons_head = cons_next;
-#ifdef DEBUG_BUFRING
- br->br_ring[cons_head] = NULL;
-#endif
- br->br_cons_tail = cons_next;
+
+ /*
+ * Storing NULL here serves two purposes:
+ * 1) it assures that the load of ring[cons_head] has completed
+ * (only the most perverted architecture or compiler would
+ * consider re-ordering a = *x; *x = b)
+ * 2) it allows us to enforce global ordering of the cons_tail
+ * update with an atomic_store_rel_32
+ */
+ br->br_ring[cons_head].bre_ptr = NULL;
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
}
/*
@@ -234,13 +355,14 @@
* back (since jhb says the store is probably cheaper),
* if we have to do a multi-queue version we will need
* the compare and an atomic.
+ *
*/
static __inline void
buf_ring_putback_sc(struct buf_ring *br, void *new)
{
KASSERT(br->br_cons_head != br->br_prod_tail,
("Buf-Ring has none in putback")) ;
- br->br_ring[br->br_cons_head] = new;
+ br->br_ring[br->br_cons_head].bre_ptr = new;
}
/*
@@ -251,33 +373,40 @@
static __inline void *
buf_ring_peek(struct buf_ring *br)
{
-
+ uint32_t cons_head;
#ifdef DEBUG_BUFRING
if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
panic("lock not held on single consumer dequeue");
#endif
+ cons_head = br->br_cons_head;
/*
- * I believe it is safe to not have a memory barrier
- * here because we control cons and tail is worst case
- * a lagging indicator so we worst case we might
- * return NULL immediately after a buffer has been enqueued
+ * for correctness prod_tail must be read before ring[cons_head]
*/
- if (br->br_cons_head == br->br_prod_tail)
+
+ if (cons_head == ORDERED_LOAD_32(&br->br_prod_tail))
return (NULL);
-
- return (br->br_ring[br->br_cons_head]);
+
+ /* ensure that the ring load completes before
+ * exposing it to any destructive updates
+ */
+ return ((void *)br->br_ring[cons_head].bre_ptr);
}
static __inline int
buf_ring_full(struct buf_ring *br)
{
-
+ /* br_cons_tail may be stale but the consumer understands that this is
+ * only a point in time snapshot
+ */
return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail);
}
static __inline int
buf_ring_empty(struct buf_ring *br)
{
+ /* br_prod_tail may be stale but the consumer understands that this is
+ * only a point in time snapshot
+ */
return (br->br_cons_head == br->br_prod_tail);
}
@@ -285,6 +414,9 @@
static __inline int
buf_ring_count(struct buf_ring *br)
{
+ /* br_cons_tail and br_prod_tail may be stale but the consumer
+ * understands that this is only a point in time snapshot
+ */
return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail)
& br->br_prod_mask);

File Metadata

Mime Type
text/plain
Expires
Tue, Nov 19, 11:21 AM (21 h, 29 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
14715556
Default Alt Text
D1945.diff (13 KB)

Event Timeline