Page Menu
Home
FreeBSD
Search
Configure Global Search
Log In
Files
F102897207
D1945.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
13 KB
Referenced Files
None
Subscribers
None
D1945.diff
View Options
Index: sys/sys/buf_ring.h
===================================================================
--- sys/sys/buf_ring.h
+++ sys/sys/buf_ring.h
@@ -1,5 +1,5 @@
/*-
- * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
+ * Copyright (c) 2007-2015 Kip Macy <kmacy@freebsd.org>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -41,24 +41,113 @@
#include <sys/mutex.h>
#endif
+struct br_entry_ {
+ volatile void *bre_ptr;
+};
+
+
struct buf_ring {
volatile uint32_t br_prod_head;
volatile uint32_t br_prod_tail;
int br_prod_size;
int br_prod_mask;
uint64_t br_drops;
+ /* cache line aligned to avoid cache line invalidate traffic
+ * between consumer and producer (false sharing)
+ */
volatile uint32_t br_cons_head __aligned(CACHE_LINE_SIZE);
volatile uint32_t br_cons_tail;
int br_cons_size;
int br_cons_mask;
#ifdef DEBUG_BUFRING
struct mtx *br_lock;
-#endif
- void *br_ring[0] __aligned(CACHE_LINE_SIZE);
+#endif
+ /* cache line aligned to avoid false sharing with other data structures
+ * located just beyond the end of the ring
+ */
+ struct br_entry_ br_ring[0] __aligned(CACHE_LINE_SIZE);
};
/*
- * multi-producer safe lock-free ring buffer enqueue
+ * Many architectures other than x86 permit speculative re-ordering
+ * of loads. Unfortunately, atomic_load_acq_32() is comparatively
+ * expensive so we'd rather elide it if possible.
+ */
+#if defined(__i386__) || defined(__amd64__)
+#define ORDERED_LOAD_32(x) (*x)
+#else
+#define ORDERED_LOAD_32(x) atomic_load_acq_32((x))
+#endif
+
+/*
+ * Multi-producer safe lock-free ring buffer enqueue
+ *
+ * Most architectures do not support the atomic update of multiple
+ * discontiguous locations. So it is not possible to atomically update
+ * the producer index and ring buffer entry. To side-step this limitation
+ * we split update in to 3 steps:
+ * 1) atomically acquiring an index
+ * 2) updating the corresponding ring entry
+ * 3) making the update available to the consumer
+ * In order to split the index update in to an acquire and release
+ * phase there are _two_ producer indexes. 'prod_head' is used for
+ * step 1) and is thus only used by the enqueue itself. 'prod_tail'
+ * is used for step 3) to signal to the consumer that the update is
+ * complete. To guarantee memory ordering the update of 'prod_tail' is
+ * done with a atomic_store_rel_32(...) and the corresponding
+ * initial read of 'prod_tail' by the dequeue functions is done with
+ * an atomic_load_acq_32(...).
+ *
+ * Regarding memory ordering - there are five variables in question:
+ * (br_) prod_head, prod_tail, cons_head, cons_tail, ring[idx={cons, prod}]
+ * It's easiest examine correctness by considering the consequence of
+ * reading a stale value or having an update become visible prior to
+ * preceding writes.
+ *
+ * - prod_head: this is only read by the enqueue routine, if the latter were to
+ * initially read a stale value for it the cmpxchg (atomic_cmpset_acq_32)
+ * would fail. However, the implied memory barrier in cmpxchg would cause the
+ * subsequent read of prod_head to read the up-to-date value permitting the
+ * cmpxchg to succeed the second time.
+ *
+ * - prod_tail: This value is used by dequeue to determine the effective
+ * producer index. On architectures with weaker memory ordering than x86 it
+ * needs special handling. In enqueue it needs to be updated with
+ * atomic_store_rel_32() (i.e. a write memory barrier before update) to
+ * guarantee that the new ring value is committed to memory before it is
+ * made available by prod_tail. In dequeue to guarantee that it is read before
+ * br_ring[cons_head] it needs to be read with atomic_load_acq_32().
+ *
+ * - cons_head: this value is used only by dequeue, it is either updated
+ * atomically (dequeue_mc) or protected by a mutex (dequeue_sc).
+ *
+ * - cons_tail: This is used to communicate the latest consumer index between
+ * dequeue and enqueue. Reading a stale value in enqueue can cause an enqueue
+ * to fail erroneously. To avoid a load being re-ordered after a store (and
+ * thus permitting enqueue to store a new value before the old one has been
+ * consumed) it is updated with an atomic_store_rel_32() in deqeueue.
+ *
+ * - ring[idx] : Updates to this value need to reach memory before the subsequent
+ * update to prod_tail does. Reads need to happen before subsequent updates to
+ * cons_tail.
+ *
+ * Some implementation notes:
+ * - Much like a simpler single-producer single consumer ring buffer,
+ * the producer can not produce faster than the consumer. Hence the
+ * check of 'prod_head' + 1 against 'cons_tail'.
+ *
+ * - The use of "prod_next = (prod_head + 1) & br->br_prod_mask" to
+ * calculate the next index is slightly cheaper than a modulo but
+ * requires the ring to be power-of-2 sized.
+ *
+ * - The critical_enter() / critical_exit() are not required for
+ * correctness. They prevent updates from stalling by having a producer be
+ * preempted after updating 'prod_head' but before updating 'prod_tail'.
+ *
+ * - The "while (br->br_prod_tail != prod_head)"
+ * check assures in order completion (probably not strictly necessary,
+ * but makes it easier to reason about) and allows us to update
+ * 'prod_tail' without a cmpxchg / LOCK prefix.
*
*/
static __inline int
@@ -69,41 +158,48 @@
int i;
for (i = br->br_cons_head; i != br->br_prod_head;
i = ((i + 1) & br->br_cons_mask))
- if(br->br_ring[i] == buf)
+ if(br->br_ring[i].bre_ptr == buf)
panic("buf=%p already enqueue at %d prod=%d cons=%d",
buf, i, br->br_prod_tail, br->br_cons_tail);
#endif
critical_enter();
do {
+
prod_head = br->br_prod_head;
prod_next = (prod_head + 1) & br->br_prod_mask;
cons_tail = br->br_cons_tail;
if (prod_next == cons_tail) {
- rmb();
- if (prod_head == br->br_prod_head &&
- cons_tail == br->br_cons_tail) {
- br->br_drops++;
- critical_exit();
- return (ENOBUFS);
- }
- continue;
+ /* ensure that we only return ENOBUFS
+ * if the latest value matches what we read
+ */
+ if (prod_head != atomic_load_acq_32(&br->br_prod_head) ||
+ cons_tail != atomic_load_acq_32(&br->br_cons_tail))
+ continue;
+
+ br->br_drops++;
+ critical_exit();
+ return (ENOBUFS);
}
- } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next));
+ } while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next));
#ifdef DEBUG_BUFRING
- if (br->br_ring[prod_head] != NULL)
+ if (br->br_ring[prod_head].bre_ptr != NULL)
panic("dangling value in enqueue");
#endif
- br->br_ring[prod_head] = buf;
+ br->br_ring[prod_head].bre_ptr = buf;
/*
* If there are other enqueues in progress
- * that preceeded us, we need to wait for them
- * to complete
- */
+ * that preceded us, we need to wait for them
+ * to complete
+ * re-ordering of reads would not effect correctness
+ */
while (br->br_prod_tail != prod_head)
cpu_spinwait();
- atomic_store_rel_int(&br->br_prod_tail, prod_next);
+ /* ensure that the ring update reaches memory before the new
+ * value of prod_tail
+ */
+ atomic_store_rel_32(&br->br_prod_tail, prod_next);
critical_exit();
return (0);
}
@@ -116,35 +212,47 @@
buf_ring_dequeue_mc(struct buf_ring *br)
{
uint32_t cons_head, cons_next;
- void *buf;
+ volatile void *buf;
critical_enter();
do {
+ /*
+ * prod_tail must be read before br_ring[cons_head] is
+ * and the atomic_cmpset_acq_32 on br_cons_head should
+ * enforce that
+ */
cons_head = br->br_cons_head;
- cons_next = (cons_head + 1) & br->br_cons_mask;
-
if (cons_head == br->br_prod_tail) {
critical_exit();
return (NULL);
}
- } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next));
+ cons_next = (cons_head + 1) & br->br_cons_mask;
+ } while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next));
+
+ /* ensure that the read completes before either of the
+ * subsequent stores
+ */
+ buf = br->br_ring[cons_head].bre_ptr;
+ /* guarantee that the load completes before we update cons_tail */
+ br->br_ring[cons_head].bre_ptr = NULL;
- buf = br->br_ring[cons_head];
-#ifdef DEBUG_BUFRING
- br->br_ring[cons_head] = NULL;
-#endif
/*
* If there are other dequeues in progress
- * that preceeded us, we need to wait for them
- * to complete
- */
+ * that preceded us, we need to wait for them
+ * to complete - no memory barrier needed as
+ * re-ordering shouldn't effect correctness or
+ * progress
+ */
while (br->br_cons_tail != cons_head)
cpu_spinwait();
-
- atomic_store_rel_int(&br->br_cons_tail, cons_next);
+ /*
+ * assure that the ring entry is read before
+ * marking the entry as free by updating cons_tail
+ */
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
critical_exit();
- return (buf);
+ return ((void *)buf);
}
/*
@@ -158,41 +266,47 @@
uint32_t cons_head, cons_next;
#ifdef PREFETCH_DEFINED
uint32_t cons_next_next;
-#endif
uint32_t prod_tail;
- void *buf;
-
+#endif
+ volatile void *buf;
+
+ /*
+ * prod_tail tells whether or not br_ring[cons_head] is valid
+ * thus we must guarantee that it is read first
+ */
cons_head = br->br_cons_head;
- prod_tail = br->br_prod_tail;
-
+ if (cons_head == ORDERED_LOAD_32(&br->br_prod_tail))
+ return (NULL);
+
cons_next = (cons_head + 1) & br->br_cons_mask;
#ifdef PREFETCH_DEFINED
+ /*
+ * If prod_tail is stale we will prefetch the wrong value - but this is safe
+ * as cache coherence (should) ensure that the when the value is loaded for
+ * actual use it is fetched from main memory
+ */
+ prod_tail = br->br_prod_tail;
cons_next_next = (cons_head + 2) & br->br_cons_mask;
-#endif
-
- if (cons_head == prod_tail)
- return (NULL);
-
-#ifdef PREFETCH_DEFINED
if (cons_next != prod_tail) {
- prefetch(br->br_ring[cons_next]);
+ prefetch(br->br_ring[cons_next].bre_ptr);
if (cons_next_next != prod_tail)
- prefetch(br->br_ring[cons_next_next]);
+ prefetch(br->br_ring[cons_next_next].bre_ptr);
}
#endif
br->br_cons_head = cons_next;
- buf = br->br_ring[cons_head];
-
+ buf = br->br_ring[cons_head].bre_ptr;
+ /* guarantee that the load completes before we update cons_tail */
+ br->br_ring[cons_head].bre_ptr = NULL;
#ifdef DEBUG_BUFRING
- br->br_ring[cons_head] = NULL;
if (!mtx_owned(br->br_lock))
panic("lock not held on single consumer dequeue");
if (br->br_cons_tail != cons_head)
panic("inconsistent list cons_tail=%d cons_head=%d",
br->br_cons_tail, cons_head);
#endif
- br->br_cons_tail = cons_next;
- return (buf);
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
+
+ return ((void *)buf);
}
/*
@@ -205,7 +319,7 @@
{
uint32_t cons_head, cons_next;
uint32_t prod_tail;
-
+
cons_head = br->br_cons_head;
prod_tail = br->br_prod_tail;
@@ -213,10 +327,17 @@
if (cons_head == prod_tail)
return;
br->br_cons_head = cons_next;
-#ifdef DEBUG_BUFRING
- br->br_ring[cons_head] = NULL;
-#endif
- br->br_cons_tail = cons_next;
+
+ /*
+ * Storing NULL here serves two purposes:
+ * 1) it assures that the load of ring[cons_head] has completed
+ * (only the most perverted architecture or compiler would
+ * consider re-ordering a = *x; *x = b)
+ * 2) it allows us to enforce global ordering of the cons_tail
+ * update with an atomic_store_rel_32
+ */
+ br->br_ring[cons_head].bre_ptr = NULL;
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
}
/*
@@ -234,13 +355,14 @@
* back (since jhb says the store is probably cheaper),
* if we have to do a multi-queue version we will need
* the compare and an atomic.
+ *
*/
static __inline void
buf_ring_putback_sc(struct buf_ring *br, void *new)
{
KASSERT(br->br_cons_head != br->br_prod_tail,
("Buf-Ring has none in putback")) ;
- br->br_ring[br->br_cons_head] = new;
+ br->br_ring[br->br_cons_head].bre_ptr = new;
}
/*
@@ -251,33 +373,40 @@
static __inline void *
buf_ring_peek(struct buf_ring *br)
{
-
+ uint32_t cons_head;
#ifdef DEBUG_BUFRING
if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
panic("lock not held on single consumer dequeue");
#endif
+ cons_head = br->br_cons_head;
/*
- * I believe it is safe to not have a memory barrier
- * here because we control cons and tail is worst case
- * a lagging indicator so we worst case we might
- * return NULL immediately after a buffer has been enqueued
+ * for correctness prod_tail must be read before ring[cons_head]
*/
- if (br->br_cons_head == br->br_prod_tail)
+
+ if (cons_head == ORDERED_LOAD_32(&br->br_prod_tail))
return (NULL);
-
- return (br->br_ring[br->br_cons_head]);
+
+ /* ensure that the ring load completes before
+ * exposing it to any destructive updates
+ */
+ return ((void *)br->br_ring[cons_head].bre_ptr);
}
static __inline int
buf_ring_full(struct buf_ring *br)
{
-
+ /* br_cons_tail may be stale but the consumer understands that this is
+ * only a point in time snapshot
+ */
return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail);
}
static __inline int
buf_ring_empty(struct buf_ring *br)
{
+ /* br_prod_tail may be stale but the consumer understands that this is
+ * only a point in time snapshot
+ */
return (br->br_cons_head == br->br_prod_tail);
}
@@ -285,6 +414,9 @@
static __inline int
buf_ring_count(struct buf_ring *br)
{
+ /* br_cons_tail and br_prod_tail may be stale but the consumer
+ * understands that this is only a point in time snapshot
+ */
return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail)
& br->br_prod_mask);
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Nov 19, 11:21 AM (21 h, 29 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
14715556
Default Alt Text
D1945.diff (13 KB)
Attached To
Mode
D1945: Buf ring cleanups
Attached
Detach File
Event Timeline
Log In to Comment