Page MenuHomeFreeBSD

D1945.id3919.diff
No OneTemporary

D1945.id3919.diff

Index: sys/sys/buf_ring.h
===================================================================
--- sys/sys/buf_ring.h
+++ sys/sys/buf_ring.h
@@ -1,5 +1,5 @@
/*-
- * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
+ * Copyright (c) 2007-2015 Kip Macy <kmacy@freebsd.org>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -58,8 +58,41 @@
};
/*
- * multi-producer safe lock-free ring buffer enqueue
+ * Multi-producer safe lock-free ring buffer enqueue
*
+ * Most architectures do not support the atomic update of multiple
+ * discontiguous locations. So it is not possible to atomically update
+ * the producer index and ring buffer entry. To side-step this limitation
+ * we split update in to 3 steps:
+ * 1) atomically acquiring an index
+ * 2) updating the corresponding ring entry
+ * 3) making the update available to the consumer
+ * In order to split the index update in to an acquire and release
+ * phase there are _two_ producer indexes. 'prod_head' is used for
+ * step 1) and is thus only used by the enqueue itself. 'prod_tail'
+ * is used for step 3) to signal to the consumer that the update is
+ * complete. To guarantee memory ordering the update of 'prod_tail' is
+ * done with a atomic_store_rel_32(...) and the corresponding
+ * initial read of 'prod_tail' by the dequeue functions is done with
+ * an atomic_load_acq_int(...).
+ *
+ * Some implementation notes:
+ * - Much like a simpler single-producer single consumer ring buffer,
+ * the producer can not produce faster than the consumer. Hence the
+ * check of 'prod_head' + 1 against 'cons_tail'.
+ * - The use of "prod_next = (prod_head + 1) & br->br_prod_mask" to
+ * calculate the next index is slightly cheaper than a modulo but
+ * requires the ring to be power-of-2 sized.
+ * - The critical_enter() / critical_exit() are a performance optimization.
+ * They are not required for correctness. They prevent updates from
+ * stalling by having a producer be preempted after updating 'prod_head'
+ * but before updating 'prod_tail'.
+ * - The "while (atomic_load_acq_32(&br->br_prod_tail) != prod_head)"
+ * check assures in order completion (probably not strictly necessary,
+ * but makes it easier to reason about) and allows us to update
+ * 'prod_tail' without a cmpxchg / LOCK prefix.
+ * - Some effort is made to avoid extra memory barriers by using one
+ * atomic_<>_acq_32 operation to guarantee the value of multiple values
*/
static __inline int
buf_ring_enqueue(struct buf_ring *br, void *buf)
@@ -74,22 +107,28 @@
buf, i, br->br_prod_tail, br->br_cons_tail);
#endif
critical_enter();
+ cons_tail = atomic_load_acq_32(&br->br_cons_tail);
do {
+ /* br_cons_tail is up to date by virtue of previous read memory
+ * barrier or the subsequent one in atomic_cmpset_acq_32
+ */
+ cons_tail = br->br_cons_tail;
prod_head = br->br_prod_head;
prod_next = (prod_head + 1) & br->br_prod_mask;
- cons_tail = br->br_cons_tail;
if (prod_next == cons_tail) {
- rmb();
- if (prod_head == br->br_prod_head &&
- cons_tail == br->br_cons_tail) {
+ /* This case should be rare - so the overhead of the redundant
+ * memory barrier is OK.
+ */
+ if (prod_head == atomic_load_acq_32(&br->br_prod_head) &&
+ cons_tail == atomic_load_acq_32(&br->br_cons_tail)) {
br->br_drops++;
critical_exit();
return (ENOBUFS);
}
continue;
}
- } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next));
+ } while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next));
#ifdef DEBUG_BUFRING
if (br->br_ring[prod_head] != NULL)
panic("dangling value in enqueue");
@@ -101,9 +140,9 @@
* that preceeded us, we need to wait for them
* to complete
*/
- while (br->br_prod_tail != prod_head)
+ while (atomic_load_acq_32(&br->br_prod_tail) != prod_head)
cpu_spinwait();
- atomic_store_rel_int(&br->br_prod_tail, prod_next);
+ atomic_store_rel_32(&br->br_prod_tail, prod_next);
critical_exit();
return (0);
}
@@ -119,15 +158,18 @@
void *buf;
critical_enter();
+ cons_head = atomic_load_acq_32(&br->br_cons_head);
do {
+ /* subsequent reads should have memory ordering guaranteed by
+ * the atomic_cmpset_acq_32 below
+ */
cons_head = br->br_cons_head;
- cons_next = (cons_head + 1) & br->br_cons_mask;
-
if (cons_head == br->br_prod_tail) {
critical_exit();
return (NULL);
}
- } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next));
+ cons_next = (cons_head + 1) & br->br_cons_mask;
+ } while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next));
buf = br->br_ring[cons_head];
#ifdef DEBUG_BUFRING
@@ -138,10 +180,10 @@
* that preceeded us, we need to wait for them
* to complete
*/
- while (br->br_cons_tail != cons_head)
+ while (atomic_load_acq_32(&br->br_cons_tail) != cons_head)
cpu_spinwait();
- atomic_store_rel_int(&br->br_cons_tail, cons_next);
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
critical_exit();
return (buf);
@@ -162,18 +204,14 @@
uint32_t prod_tail;
void *buf;
+ prod_tail = atomic_load_acq_32(&br->br_prod_tail);
cons_head = br->br_cons_head;
- prod_tail = br->br_prod_tail;
+ if (cons_head == prod_tail)
+ return (NULL);
cons_next = (cons_head + 1) & br->br_cons_mask;
#ifdef PREFETCH_DEFINED
cons_next_next = (cons_head + 2) & br->br_cons_mask;
-#endif
-
- if (cons_head == prod_tail)
- return (NULL);
-
-#ifdef PREFETCH_DEFINED
if (cons_next != prod_tail) {
prefetch(br->br_ring[cons_next]);
if (cons_next_next != prod_tail)
@@ -191,7 +229,10 @@
panic("inconsistent list cons_tail=%d cons_head=%d",
br->br_cons_tail, cons_head);
#endif
- br->br_cons_tail = cons_next;
+ /* do atomic_store_rel_32 to assure that producer has
+ * up to date view
+ */
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
return (buf);
}
@@ -207,7 +248,7 @@
uint32_t prod_tail;
cons_head = br->br_cons_head;
- prod_tail = br->br_prod_tail;
+ prod_tail = atomic_load_acq_32(&br->br_prod_tail);
cons_next = (cons_head + 1) & br->br_cons_mask;
if (cons_head == prod_tail)
@@ -216,7 +257,7 @@
#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
#endif
- br->br_cons_tail = cons_next;
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
}
/*
@@ -256,13 +297,8 @@
if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
panic("lock not held on single consumer dequeue");
#endif
- /*
- * I believe it is safe to not have a memory barrier
- * here because we control cons and tail is worst case
- * a lagging indicator so we worst case we might
- * return NULL immediately after a buffer has been enqueued
- */
- if (br->br_cons_head == br->br_prod_tail)
+
+ if (br->br_cons_head == atomic_load_acq_32(&br->br_prod_tail))
return (NULL);
return (br->br_ring[br->br_cons_head]);
@@ -271,13 +307,18 @@
static __inline int
buf_ring_full(struct buf_ring *br)
{
-
+ /* br_cons_tail may be stale but the consumer understands that this is
+ * only a point in time snapshot
+ */
return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail);
}
static __inline int
buf_ring_empty(struct buf_ring *br)
{
+ /* br_prod_tail may be stale but the consumer understands that this is
+ * only a point in time snapshot
+ */
return (br->br_cons_head == br->br_prod_tail);
}
@@ -285,6 +326,9 @@
static __inline int
buf_ring_count(struct buf_ring *br)
{
+ /* br_cons_tail and br_prod_tail may be stale but the consumer
+ * understands that this is only a point in time snapshot
+ */
return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail)
& br->br_prod_mask);

File Metadata

Mime Type
text/plain
Expires
Tue, Nov 19, 1:17 PM (21 h, 28 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
14717152
Default Alt Text
D1945.id3919.diff (7 KB)

Event Timeline