Page Menu
Home
FreeBSD
Search
Configure Global Search
Log In
Files
F102903316
D1945.id3919.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
7 KB
Referenced Files
None
Subscribers
None
D1945.id3919.diff
View Options
Index: sys/sys/buf_ring.h
===================================================================
--- sys/sys/buf_ring.h
+++ sys/sys/buf_ring.h
@@ -1,5 +1,5 @@
/*-
- * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
+ * Copyright (c) 2007-2015 Kip Macy <kmacy@freebsd.org>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -58,8 +58,41 @@
};
/*
- * multi-producer safe lock-free ring buffer enqueue
+ * Multi-producer safe lock-free ring buffer enqueue
*
+ * Most architectures do not support the atomic update of multiple
+ * discontiguous locations. So it is not possible to atomically update
+ * the producer index and ring buffer entry. To side-step this limitation
+ * we split update in to 3 steps:
+ * 1) atomically acquiring an index
+ * 2) updating the corresponding ring entry
+ * 3) making the update available to the consumer
+ * In order to split the index update in to an acquire and release
+ * phase there are _two_ producer indexes. 'prod_head' is used for
+ * step 1) and is thus only used by the enqueue itself. 'prod_tail'
+ * is used for step 3) to signal to the consumer that the update is
+ * complete. To guarantee memory ordering the update of 'prod_tail' is
+ * done with a atomic_store_rel_32(...) and the corresponding
+ * initial read of 'prod_tail' by the dequeue functions is done with
+ * an atomic_load_acq_int(...).
+ *
+ * Some implementation notes:
+ * - Much like a simpler single-producer single consumer ring buffer,
+ * the producer can not produce faster than the consumer. Hence the
+ * check of 'prod_head' + 1 against 'cons_tail'.
+ * - The use of "prod_next = (prod_head + 1) & br->br_prod_mask" to
+ * calculate the next index is slightly cheaper than a modulo but
+ * requires the ring to be power-of-2 sized.
+ * - The critical_enter() / critical_exit() are a performance optimization.
+ * They are not required for correctness. They prevent updates from
+ * stalling by having a producer be preempted after updating 'prod_head'
+ * but before updating 'prod_tail'.
+ * - The "while (atomic_load_acq_32(&br->br_prod_tail) != prod_head)"
+ * check assures in order completion (probably not strictly necessary,
+ * but makes it easier to reason about) and allows us to update
+ * 'prod_tail' without a cmpxchg / LOCK prefix.
+ * - Some effort is made to avoid extra memory barriers by using one
+ * atomic_<>_acq_32 operation to guarantee the value of multiple values
*/
static __inline int
buf_ring_enqueue(struct buf_ring *br, void *buf)
@@ -74,22 +107,28 @@
buf, i, br->br_prod_tail, br->br_cons_tail);
#endif
critical_enter();
+ cons_tail = atomic_load_acq_32(&br->br_cons_tail);
do {
+ /* br_cons_tail is up to date by virtue of previous read memory
+ * barrier or the subsequent one in atomic_cmpset_acq_32
+ */
+ cons_tail = br->br_cons_tail;
prod_head = br->br_prod_head;
prod_next = (prod_head + 1) & br->br_prod_mask;
- cons_tail = br->br_cons_tail;
if (prod_next == cons_tail) {
- rmb();
- if (prod_head == br->br_prod_head &&
- cons_tail == br->br_cons_tail) {
+ /* This case should be rare - so the overhead of the redundant
+ * memory barrier is OK.
+ */
+ if (prod_head == atomic_load_acq_32(&br->br_prod_head) &&
+ cons_tail == atomic_load_acq_32(&br->br_cons_tail)) {
br->br_drops++;
critical_exit();
return (ENOBUFS);
}
continue;
}
- } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next));
+ } while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next));
#ifdef DEBUG_BUFRING
if (br->br_ring[prod_head] != NULL)
panic("dangling value in enqueue");
@@ -101,9 +140,9 @@
* that preceeded us, we need to wait for them
* to complete
*/
- while (br->br_prod_tail != prod_head)
+ while (atomic_load_acq_32(&br->br_prod_tail) != prod_head)
cpu_spinwait();
- atomic_store_rel_int(&br->br_prod_tail, prod_next);
+ atomic_store_rel_32(&br->br_prod_tail, prod_next);
critical_exit();
return (0);
}
@@ -119,15 +158,18 @@
void *buf;
critical_enter();
+ cons_head = atomic_load_acq_32(&br->br_cons_head);
do {
+ /* subsequent reads should have memory ordering guaranteed by
+ * the atomic_cmpset_acq_32 below
+ */
cons_head = br->br_cons_head;
- cons_next = (cons_head + 1) & br->br_cons_mask;
-
if (cons_head == br->br_prod_tail) {
critical_exit();
return (NULL);
}
- } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next));
+ cons_next = (cons_head + 1) & br->br_cons_mask;
+ } while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next));
buf = br->br_ring[cons_head];
#ifdef DEBUG_BUFRING
@@ -138,10 +180,10 @@
* that preceeded us, we need to wait for them
* to complete
*/
- while (br->br_cons_tail != cons_head)
+ while (atomic_load_acq_32(&br->br_cons_tail) != cons_head)
cpu_spinwait();
- atomic_store_rel_int(&br->br_cons_tail, cons_next);
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
critical_exit();
return (buf);
@@ -162,18 +204,14 @@
uint32_t prod_tail;
void *buf;
+ prod_tail = atomic_load_acq_32(&br->br_prod_tail);
cons_head = br->br_cons_head;
- prod_tail = br->br_prod_tail;
+ if (cons_head == prod_tail)
+ return (NULL);
cons_next = (cons_head + 1) & br->br_cons_mask;
#ifdef PREFETCH_DEFINED
cons_next_next = (cons_head + 2) & br->br_cons_mask;
-#endif
-
- if (cons_head == prod_tail)
- return (NULL);
-
-#ifdef PREFETCH_DEFINED
if (cons_next != prod_tail) {
prefetch(br->br_ring[cons_next]);
if (cons_next_next != prod_tail)
@@ -191,7 +229,10 @@
panic("inconsistent list cons_tail=%d cons_head=%d",
br->br_cons_tail, cons_head);
#endif
- br->br_cons_tail = cons_next;
+ /* do atomic_store_rel_32 to assure that producer has
+ * up to date view
+ */
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
return (buf);
}
@@ -207,7 +248,7 @@
uint32_t prod_tail;
cons_head = br->br_cons_head;
- prod_tail = br->br_prod_tail;
+ prod_tail = atomic_load_acq_32(&br->br_prod_tail);
cons_next = (cons_head + 1) & br->br_cons_mask;
if (cons_head == prod_tail)
@@ -216,7 +257,7 @@
#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
#endif
- br->br_cons_tail = cons_next;
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
}
/*
@@ -256,13 +297,8 @@
if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
panic("lock not held on single consumer dequeue");
#endif
- /*
- * I believe it is safe to not have a memory barrier
- * here because we control cons and tail is worst case
- * a lagging indicator so we worst case we might
- * return NULL immediately after a buffer has been enqueued
- */
- if (br->br_cons_head == br->br_prod_tail)
+
+ if (br->br_cons_head == atomic_load_acq_32(&br->br_prod_tail))
return (NULL);
return (br->br_ring[br->br_cons_head]);
@@ -271,13 +307,18 @@
static __inline int
buf_ring_full(struct buf_ring *br)
{
-
+ /* br_cons_tail may be stale but the consumer understands that this is
+ * only a point in time snapshot
+ */
return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail);
}
static __inline int
buf_ring_empty(struct buf_ring *br)
{
+ /* br_prod_tail may be stale but the consumer understands that this is
+ * only a point in time snapshot
+ */
return (br->br_cons_head == br->br_prod_tail);
}
@@ -285,6 +326,9 @@
static __inline int
buf_ring_count(struct buf_ring *br)
{
+ /* br_cons_tail and br_prod_tail may be stale but the consumer
+ * understands that this is only a point in time snapshot
+ */
return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail)
& br->br_prod_mask);
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Nov 19, 1:17 PM (21 h, 28 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
14717152
Default Alt Text
D1945.id3919.diff (7 KB)
Attached To
Mode
D1945: Buf ring cleanups
Attached
Detach File
Event Timeline
Log In to Comment