Page Menu
Home
FreeBSD
Search
Configure Global Search
Log In
Files
F102558935
D46411.id142351.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
28 KB
Referenced Files
None
Subscribers
None
D46411.id142351.diff
View Options
diff --git a/lib/libsys/getsockopt.2 b/lib/libsys/getsockopt.2
--- a/lib/libsys/getsockopt.2
+++ b/lib/libsys/getsockopt.2
@@ -25,7 +25,7 @@
.\" OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
.\" SUCH DAMAGE.
.\"
-.Dd February 8, 2021
+.Dd July 8, 2024
.Dt GETSOCKOPT 2
.Os
.Sh NAME
@@ -191,6 +191,7 @@
.It Dv SO_MAX_PACING_RATE Ta "set the maximum transmit rate in bytes per second for the socket"
.It Dv SO_NO_OFFLOAD Ta "disables protocol offloads"
.It Dv SO_NO_DDP Ta "disables direct data placement offload"
+.It Dv SO_SPLICE Ta "splice two sockets together"
.El
.Pp
.Dv SO_DEBUG
@@ -551,6 +552,56 @@
reassembled TCP data streams to be received via zero-copy in
user-supplied buffers using
.Xr aio_read 2 .
+.Pp
+.Dv SO_SPLICE ,
+when passed to
+.Fn setsockopt ,
+splices two sockets together using the following
+.Fa optval :
+.Bd -literal
+struct so_splice {
+ int sp_fd;
+ off_t sp_max;
+ struct timeval sp_idle;
+};
+.Ed
+.Pp
+Data received on
+.Fa s
+will automatically be transmitted from the socket specified in
+.Fa sp_fd
+without any intervention by userspace.
+Splicing is a one-way operation; a given pair of sockets may be
+spliced in one or both directions.
+Currently only connected
+.Xr tcp 4
+sockets may be spliced together.
+If
+.Fa sp_max
+is greater than zero, the socket pair will automatically be unspliced
+once that number of bytes have been transmitted.
+If
+.Fa sp_idle
+is non-zero, the socket pair will automatically be unspliced once the
+specified amount of time has elapsed since the initial call to
+.Fn setsockopt .
+If
+.Fa sp_fd
+is -1, the socket will be unspliced immediately.
+.Pp
+When passed to
+.Fn getsockopt ,
+the
+.Dv SO_SPLICE
+option returns a 64-bit integer containing the number of bytes transmitted by
+the most recent splice.
+That is, while the socket is spliced, the value returned will be the number
+of bytes spliced so far.
+When unsplicing, this value is saved and is returned until the socket is closed
+or spliced again.
+For example, if a splice transmits 100 bytes and is then unspliced, a subsequent
+.Nm getsockopt
+call will return 100 until the socket is spliced again.
.Sh RETURN VALUES
.Rv -std
.Sh ERRORS
@@ -618,5 +669,14 @@
.Fn setsockopt
system calls appeared in
.Bx 4.2 .
+The
+.Dv SO_SPLICE
+option originated in
+.Ox 4.9
+and first appeared in
+.Fx 15.0 .
+The
+.Fx
+implementation aims to be source-compatible.
.Sh BUGS
Several of the socket options should be handled at lower levels of the system.
diff --git a/sys/kern/uipc_sockbuf.c b/sys/kern/uipc_sockbuf.c
--- a/sys/kern/uipc_sockbuf.c
+++ b/sys/kern/uipc_sockbuf.c
@@ -508,6 +508,32 @@
SOCK_BUF_UNLOCK_ASSERT(so, which);
}
+static void
+splice_push(struct socket *so)
+{
+ struct so_splice *sp;
+
+ SOCK_RECVBUF_LOCK_ASSERT(so);
+
+ sp = so->so_splice;
+ mtx_lock(&sp->mtx);
+ SOCK_RECVBUF_UNLOCK(so);
+ so_splice_dispatch(sp);
+}
+
+static void
+splice_pull(struct socket *so)
+{
+ struct so_splice *sp;
+
+ SOCK_SENDBUF_LOCK_ASSERT(so);
+
+ sp = so->so_splice_back;
+ mtx_lock(&sp->mtx);
+ SOCK_SENDBUF_UNLOCK(so);
+ so_splice_dispatch(sp);
+}
+
/*
* Do we need to notify the other side when I/O is possible?
*/
@@ -522,7 +548,9 @@
sorwakeup_locked(struct socket *so)
{
SOCK_RECVBUF_LOCK_ASSERT(so);
- if (sb_notify(&so->so_rcv))
+ if (so->so_rcv.sb_flags & SB_SPLICED)
+ splice_push(so);
+ else if (sb_notify(&so->so_rcv))
sowakeup(so, SO_RCV);
else
SOCK_RECVBUF_UNLOCK(so);
@@ -532,7 +560,9 @@
sowwakeup_locked(struct socket *so)
{
SOCK_SENDBUF_LOCK_ASSERT(so);
- if (sb_notify(&so->so_snd))
+ if (so->so_snd.sb_flags & SB_SPLICED)
+ splice_pull(so);
+ else if (sb_notify(&so->so_snd))
sowakeup(so, SO_SND);
else
SOCK_SENDBUF_UNLOCK(so);
diff --git a/sys/kern/uipc_socket.c b/sys/kern/uipc_socket.c
--- a/sys/kern/uipc_socket.c
+++ b/sys/kern/uipc_socket.c
@@ -122,6 +122,7 @@
#include <sys/hhook.h>
#include <sys/kernel.h>
#include <sys/khelp.h>
+#include <sys/kthread.h>
#include <sys/ktls.h>
#include <sys/event.h>
#include <sys/eventhandler.h>
@@ -134,6 +135,7 @@
#include <sys/resourcevar.h>
#include <net/route.h>
#include <sys/signalvar.h>
+#include <sys/smp.h>
#include <sys/stat.h>
#include <sys/sx.h>
#include <sys/sysctl.h>
@@ -159,8 +161,17 @@
#include <compat/freebsd32/freebsd32.h>
#endif
+static int soreceive_generic_locked(struct socket *so,
+ struct sockaddr **psa, struct uio *uio, struct mbuf **mp,
+ struct mbuf **controlp, int *flagsp);
static int soreceive_rcvoob(struct socket *so, struct uio *uio,
int flags);
+static int soreceive_stream_locked(struct socket *so, struct sockbuf *sb,
+ struct sockaddr **psa, struct uio *uio, struct mbuf **mp,
+ struct mbuf **controlp, int flags);
+static int sosend_generic_locked(struct socket *so, struct sockaddr *addr,
+ struct uio *uio, struct mbuf *top, struct mbuf *control,
+ int flags, struct thread *td);
static void so_rdknl_lock(void *);
static void so_rdknl_unlock(void *);
static void so_rdknl_assert_lock(void *, int);
@@ -278,6 +289,393 @@
maxsockets = uma_zone_set_max(socket_zone, maxsockets);
}
+static int splice_init_state;
+static struct sx splice_init_lock;
+SX_SYSINIT(splice_init_lock, &splice_init_lock, "splice_init");
+
+static SYSCTL_NODE(_kern_ipc, OID_AUTO, splice, CTLFLAG_RW, 0,
+ "Settings relating to the SO_SPLICE socket option");
+
+static bool splice_receive_stream = true;
+SYSCTL_BOOL(_kern_ipc_splice, OID_AUTO, receive_stream, CTLFLAG_RWTUN,
+ &splice_receive_stream, 0,
+ "Use soreceive_stream() for stream splices");
+
+static uma_zone_t splice_zone;
+static struct proc *splice_proc;
+struct splice_wq {
+ struct mtx mtx;
+ STAILQ_HEAD(, so_splice) head;
+ bool running;
+} __aligned(CACHE_LINE_SIZE);
+static struct splice_wq *splice_wq;
+struct splice_domain_info {
+ int count;
+ int cpu[MAXCPU];
+};
+static int splice_bind_threads = 1;
+static uint16_t splice_cpuid_lookup[MAXCPU];
+static struct splice_domain_info splice_domains[MAXMEMDOM];
+static uint32_t splice_index = 0;
+
+static void so_splice_timeout(void *arg, int pending);
+static void so_splice_xfer(struct so_splice *s);
+static int so_unsplice(struct socket *so, bool timeout);
+
+static void
+splice_work_thread(void *ctx)
+{
+ struct splice_wq *wq = ctx;
+ struct so_splice *s, *s_temp;
+ STAILQ_HEAD(, so_splice) local_head;
+ int cpu;
+
+ cpu = wq - splice_wq;
+ if (bootverbose)
+ printf("starting so_splice worker thread for CPU %d\n", cpu);
+
+ for (;;) {
+ mtx_lock(&wq->mtx);
+ while (STAILQ_EMPTY(&wq->head)) {
+ wq->running = false;
+ mtx_sleep(wq, &wq->mtx, 0, "-", 0);
+ wq->running = true;
+ }
+ STAILQ_INIT(&local_head);
+ STAILQ_CONCAT(&local_head, &wq->head);
+ STAILQ_INIT(&wq->head);
+ mtx_unlock(&wq->mtx);
+ STAILQ_FOREACH_SAFE(s, &local_head, next, s_temp) {
+ mtx_lock(&s->mtx);
+ CURVNET_SET(s->src->so_vnet);
+ so_splice_xfer(s);
+ CURVNET_RESTORE();
+ }
+ }
+}
+
+static void
+so_splice_dispatch_async(struct so_splice *sp)
+{
+ struct splice_wq *wq;
+ bool running;
+
+ wq = &splice_wq[sp->wq_index];
+ mtx_lock(&wq->mtx);
+ STAILQ_INSERT_TAIL(&wq->head, sp, next);
+ running = wq->running;
+ mtx_unlock(&wq->mtx);
+ if (!running)
+ wakeup(wq);
+}
+
+void
+so_splice_dispatch(struct so_splice *sp)
+{
+ mtx_assert(&sp->mtx, MA_OWNED);
+
+ if (sp->state != SPLICE_IDLE) {
+ mtx_unlock(&sp->mtx);
+ } else {
+ sp->state = SPLICE_QUEUED;
+ mtx_unlock(&sp->mtx);
+ so_splice_dispatch_async(sp);
+ }
+}
+
+static int
+splice_zinit(void *mem, int size __unused, int flags __unused)
+{
+ struct so_splice *s;
+
+ s = (struct so_splice *)mem;
+ mtx_init(&s->mtx, "so_splice", NULL, MTX_DEF);
+ return (0);
+}
+
+static void
+splice_zfini(void *mem, int size)
+{
+ struct so_splice *s;
+
+ s = (struct so_splice *)mem;
+ mtx_destroy(&s->mtx);
+}
+
+static int
+splice_init(void)
+{
+ struct thread *td;
+ struct pcpu *pc;
+ int count, domain, error, i, nthr, state;
+
+ state = atomic_load_acq_int(&splice_init_state);
+ if (__predict_true(state > 0))
+ return (0);
+ if (state < 0)
+ return (ENXIO);
+ sx_xlock(&splice_init_lock);
+ if (splice_init_state != 0) {
+ sx_xunlock(&splice_init_lock);
+ return (0);
+ }
+
+ splice_zone = uma_zcreate("splice", sizeof(struct so_splice), NULL,
+ NULL, splice_zinit, splice_zfini, UMA_ALIGN_CACHE, 0);
+
+ splice_wq = mallocarray(mp_maxid + 1, sizeof(*splice_wq), M_TEMP,
+ M_WAITOK | M_ZERO);
+
+ /*
+ * Initialize the workqueues to run the splice work. We create a
+ * work queue for each CPU.
+ */
+ nthr = 0;
+ CPU_FOREACH(i) {
+ STAILQ_INIT(&splice_wq[i].head);
+ mtx_init(&splice_wq[i].mtx, "splice work queue", NULL, MTX_DEF);
+ if (splice_bind_threads > 1) {
+ pc = pcpu_find(i);
+ domain = pc->pc_domain;
+ count = splice_domains[domain].count;
+ splice_domains[domain].cpu[count] = i;
+ splice_domains[domain].count++;
+ }
+ splice_cpuid_lookup[nthr++] = i;
+ }
+
+ /*
+ * If we somehow have an empty domain, fall back to choosing
+ * among all SPLICE threads.
+ */
+ if (splice_bind_threads > 1) {
+ for (i = 0; i < vm_ndomains; i++) {
+ if (splice_domains[i].count == 0) {
+ splice_bind_threads = 1;
+ break;
+ }
+ }
+ }
+
+ /* Start kthreads for each workqueue. */
+ error = 0;
+ CPU_FOREACH(i) {
+ error = kproc_kthread_add(splice_work_thread, &splice_wq[i],
+ &splice_proc, &td, 0, 0, "so_splice", "thr_%d", i);
+ if (error) {
+ printf("Can't add so_splice thread %d error %d\n",
+ i, error);
+ break;
+ }
+ }
+
+ splice_init_state = error != 0 ? -1 : 1;
+ sx_xunlock(&splice_init_lock);
+
+ return (error);
+}
+
+/*
+ * Lock a pair of socket's I/O locks for splicing. Avoid blocking while holding
+ * one lock in order to avoid potential deadlocks in case there is some other
+ * code path which acquires more than one I/O lock at a time.
+ */
+static void
+splice_lock_pair(struct socket *so_src, struct socket *so_dst)
+{
+ int error;
+
+ for (;;) {
+ error = SOCK_IO_SEND_LOCK(so_dst, SBL_WAIT | SBL_NOINTR);
+ KASSERT(error == 0,
+ ("%s: failed to lock send I/O lock: %d", __func__, error));
+ error = SOCK_IO_RECV_LOCK(so_src, 0);
+ KASSERT(error == 0 || error == EWOULDBLOCK,
+ ("%s: failed to lock recv I/O lock: %d", __func__, error));
+ if (error == 0)
+ break;
+ SOCK_IO_SEND_UNLOCK(so_dst);
+
+ error = SOCK_IO_RECV_LOCK(so_src, SBL_WAIT | SBL_NOINTR);
+ KASSERT(error == 0,
+ ("%s: failed to lock recv I/O lock: %d", __func__, error));
+ error = SOCK_IO_SEND_LOCK(so_dst, 0);
+ KASSERT(error == 0 || error == EWOULDBLOCK,
+ ("%s: failed to lock send I/O lock: %d", __func__, error));
+ if (error == 0)
+ break;
+ SOCK_IO_RECV_UNLOCK(so_src);
+ }
+}
+
+static void
+splice_unlock_pair(struct socket *so_src, struct socket *so_dst)
+{
+ SOCK_IO_RECV_UNLOCK(so_src);
+ SOCK_IO_SEND_UNLOCK(so_dst);
+}
+
+/*
+ * Move data from the source to the sink. Assumes that both of the relevant
+ * socket I/O locks are held.
+ */
+static int
+so_splice_xfer_data(struct socket *so_src, struct socket *so_dst, off_t max,
+ ssize_t *lenp)
+{
+ struct uio uio;
+ struct mbuf *m;
+ struct sockbuf *sb_src, *sb_dst;
+ ssize_t len;
+ long space;
+ int error, flags;
+
+ SOCK_IO_RECV_ASSERT_LOCKED(so_src);
+ SOCK_IO_SEND_ASSERT_LOCKED(so_dst);
+
+ error = 0;
+ m = NULL;
+ memset(&uio, 0, sizeof(uio));
+
+ sb_src = &so_src->so_rcv;
+ sb_dst = &so_dst->so_snd;
+
+ space = sbspace(sb_dst);
+ if (space < 0)
+ space = 0;
+ len = MIN(max, MIN(space, sbavail(sb_src)));
+ if (len == 0) {
+ SOCK_RECVBUF_LOCK(so_src);
+ if ((sb_src->sb_state & SBS_CANTRCVMORE) != 0)
+ error = EPIPE;
+ SOCK_RECVBUF_UNLOCK(so_src);
+ } else {
+ flags = MSG_DONTWAIT;
+ uio.uio_resid = len;
+ if (splice_receive_stream && sb_src->sb_tls_info == NULL) {
+ error = soreceive_stream_locked(so_src, sb_src, NULL,
+ &uio, &m, NULL, flags);
+ } else {
+ error = soreceive_generic_locked(so_src, NULL,
+ &uio, &m, NULL, &flags);
+ }
+ if (error != 0 && m != NULL) {
+ m_freem(m);
+ m = NULL;
+ }
+ }
+ if (m != NULL) {
+ len -= uio.uio_resid;
+ error = sosend_generic_locked(so_dst, NULL, NULL, m, NULL,
+ MSG_DONTWAIT, curthread);
+ } else if (error == 0) {
+ len = 0;
+ SOCK_SENDBUF_LOCK(so_dst);
+ if ((sb_dst->sb_state & SBS_CANTSENDMORE) != 0)
+ error = EPIPE;
+ SOCK_SENDBUF_UNLOCK(so_dst);
+ }
+ if (error == 0)
+ *lenp = len;
+ return (error);
+}
+
+/*
+ * Transfer data from the source to the sink.
+ *
+ * If "direct" is true, the transfer is done in the context of whichever thread
+ * is operating on one of the socket buffers. We do not know which locks are
+ * held, so we can only trylock the socket buffers; if this fails, we fall back
+ * to the worker thread, which invokes this routine with "direct" set to false.
+ */
+static void
+so_splice_xfer(struct so_splice *sp)
+{
+ struct socket *so_src, *so_dst;
+ off_t max;
+ ssize_t len;
+ int error;
+
+ mtx_assert(&sp->mtx, MA_OWNED);
+ KASSERT(sp->state == SPLICE_QUEUED || sp->state == SPLICE_CLOSING,
+ ("so_splice_xfer: invalid state %d", sp->state));
+ KASSERT(sp->max != 0, ("so_splice_xfer: max == 0"));
+
+ if (sp->state == SPLICE_CLOSING) {
+ /* Userspace asked us to close the splice. */
+ goto closing;
+ }
+
+ sp->state = SPLICE_RUNNING;
+ so_src = sp->src;
+ so_dst = sp->dst;
+ max = sp->max > 0 ? sp->max - so_src->so_splice_sent : OFF_MAX;
+ if (max < 0)
+ max = 0;
+
+ /*
+ * Lock the sockets in order to block userspace from doing anything
+ * sneaky. If an error occurs or one of the sockets can no longer
+ * transfer data, we will automatically unsplice.
+ */
+ mtx_unlock(&sp->mtx);
+ splice_lock_pair(so_src, so_dst);
+
+ error = so_splice_xfer_data(so_src, so_dst, max, &len);
+
+ mtx_lock(&sp->mtx);
+
+ /*
+ * Update our stats while still holding the socket locks. This
+ * synchronizes with getsockopt(SO_SPLICE), see the comment there.
+ */
+ if (error == 0) {
+ KASSERT(len >= 0, ("%s: len %zd < 0", __func__, len));
+ so_src->so_splice_sent += len;
+ }
+ splice_unlock_pair(so_src, so_dst);
+
+ switch (sp->state) {
+ case SPLICE_CLOSING:
+closing:
+ sp->state = SPLICE_CLOSED;
+ wakeup(sp);
+ mtx_unlock(&sp->mtx);
+ break;
+ case SPLICE_RUNNING:
+ if (error != 0 ||
+ (sp->max > 0 && so_src->so_splice_sent >= sp->max)) {
+ sp->state = SPLICE_EXCEPTION;
+ soref(so_src);
+ mtx_unlock(&sp->mtx);
+ (void)so_unsplice(so_src, false);
+ sorele(so_src);
+ } else {
+ /*
+ * Locklessly check for additional bytes in the source's
+ * receive buffer and queue more work if possible. We
+ * may end up queuing needless work, but that's ok, and
+ * if we race with a thread inserting more data into the
+ * buffer and observe sbavail() == 0, the splice mutex
+ * ensures that splice_push() will queue more work for
+ * us.
+ */
+ if (sbavail(&so_src->so_rcv) > 0 &&
+ sbspace(&so_dst->so_snd) > 0) {
+ sp->state = SPLICE_QUEUED;
+ mtx_unlock(&sp->mtx);
+ so_splice_dispatch_async(sp);
+ } else {
+ sp->state = SPLICE_IDLE;
+ mtx_unlock(&sp->mtx);
+ }
+ }
+ break;
+ default:
+ __assert_unreachable();
+ }
+}
+
static void
socket_init(void *tag)
{
@@ -1213,6 +1611,219 @@
return (0);
}
+static struct so_splice *
+so_splice_alloc(off_t max)
+{
+ struct so_splice *sp;
+
+ sp = uma_zalloc(splice_zone, M_WAITOK);
+ sp->src = NULL;
+ sp->dst = NULL;
+ sp->max = max > 0 ? max : -1;
+ do {
+ sp->wq_index = atomic_fetchadd_32(&splice_index, 1) %
+ (mp_maxid + 1);
+ } while (CPU_ABSENT(sp->wq_index));
+ sp->state = SPLICE_IDLE;
+ TIMEOUT_TASK_INIT(taskqueue_thread, &sp->timeout, 0, so_splice_timeout,
+ sp);
+ return (sp);
+}
+
+static void
+so_splice_free(struct so_splice *sp)
+{
+ KASSERT(sp->state == SPLICE_CLOSED,
+ ("so_splice_free: sp %p not closed", sp));
+ uma_zfree(splice_zone, sp);
+}
+
+static void
+so_splice_timeout(void *arg, int pending __unused)
+{
+ struct so_splice *sp;
+
+ sp = arg;
+ (void)so_unsplice(sp->src, true);
+}
+
+/*
+ * Splice the output from so to the input of so2.
+ */
+static int
+so_splice(struct socket *so, struct socket *so2, struct splice *splice)
+{
+ struct so_splice *sp;
+ int error;
+
+ if (splice->sp_max < 0)
+ return (EINVAL);
+ /* Handle only TCP for now; TODO: other streaming protos */
+ if (so->so_proto->pr_protocol != IPPROTO_TCP ||
+ so2->so_proto->pr_protocol != IPPROTO_TCP)
+ return (EPROTONOSUPPORT);
+ if (so->so_vnet != so2->so_vnet)
+ return (EINVAL);
+
+ /* so_splice_xfer() assumes that we're using these implementations. */
+ KASSERT(so->so_proto->pr_sosend == sosend_generic,
+ ("so_splice: sosend not sosend_generic"));
+ KASSERT(so2->so_proto->pr_soreceive == soreceive_generic ||
+ so2->so_proto->pr_soreceive == soreceive_stream,
+ ("so_splice: soreceive not soreceive_generic/stream"));
+
+ sp = so_splice_alloc(splice->sp_max);
+ so->so_splice_sent = 0;
+ sp->src = so;
+ sp->dst = so2;
+
+ error = 0;
+ SOCK_LOCK(so);
+ if (SOLISTENING(so))
+ error = EINVAL;
+ else if ((so->so_state & (SS_ISCONNECTED | SS_ISCONNECTING)) == 0)
+ error = ENOTCONN;
+ else if (so->so_splice != NULL)
+ error = EBUSY;
+ if (error != 0) {
+ SOCK_UNLOCK(so);
+ uma_zfree(splice_zone, sp);
+ return (error);
+ }
+ soref(so);
+ so->so_splice = sp;
+ SOCK_RECVBUF_LOCK(so);
+ so->so_rcv.sb_flags |= SB_SPLICED;
+ SOCK_RECVBUF_UNLOCK(so);
+ SOCK_UNLOCK(so);
+
+ error = 0;
+ SOCK_LOCK(so2);
+ if (SOLISTENING(so2))
+ error = EINVAL;
+ else if ((so2->so_state & (SS_ISCONNECTED | SS_ISCONNECTING)) == 0)
+ error = ENOTCONN;
+ else if (so2->so_splice_back != NULL)
+ error = EBUSY;
+ if (error != 0) {
+ SOCK_UNLOCK(so2);
+ SOCK_LOCK(so);
+ so->so_splice = NULL;
+ SOCK_RECVBUF_LOCK(so);
+ so->so_rcv.sb_flags &= ~SB_SPLICED;
+ SOCK_RECVBUF_UNLOCK(so);
+ SOCK_UNLOCK(so);
+ sorele(so);
+ uma_zfree(splice_zone, sp);
+ return (error);
+ }
+ soref(so2);
+ so2->so_splice_back = sp;
+ SOCK_SENDBUF_LOCK(so2);
+ so2->so_snd.sb_flags |= SB_SPLICED;
+ mtx_lock(&sp->mtx);
+ SOCK_SENDBUF_UNLOCK(so2);
+ SOCK_UNLOCK(so2);
+
+ if (splice->sp_idle.tv_sec != 0 || splice->sp_idle.tv_usec != 0) {
+ taskqueue_enqueue_timeout_sbt(taskqueue_thread, &sp->timeout,
+ tvtosbt(splice->sp_idle), 0, C_PREL(4));
+ }
+
+ /*
+ * Transfer any data already present in the socket buffer.
+ */
+ sp->state = SPLICE_QUEUED;
+ so_splice_xfer(sp);
+ return (0);
+}
+
+static int
+so_unsplice(struct socket *so, bool timeout)
+{
+ struct socket *so2;
+ struct so_splice *sp;
+ bool drain;
+
+ /*
+ * First unset SB_SPLICED and hide the splice structure so that
+ * wakeup routines will stop enqueuing work. This also ensures that
+ * a only a single thread will proceed with the unsplice.
+ */
+ SOCK_LOCK(so);
+ if (SOLISTENING(so)) {
+ SOCK_UNLOCK(so);
+ return (EINVAL);
+ }
+ SOCK_RECVBUF_LOCK(so);
+ if ((so->so_rcv.sb_flags & SB_SPLICED) == 0) {
+ SOCK_RECVBUF_UNLOCK(so);
+ SOCK_UNLOCK(so);
+ return (ENOTCONN);
+ }
+ so->so_rcv.sb_flags &= ~SB_SPLICED;
+ sp = so->so_splice;
+ so->so_splice = NULL;
+ SOCK_RECVBUF_UNLOCK(so);
+ SOCK_UNLOCK(so);
+
+ so2 = sp->dst;
+ SOCK_LOCK(so2);
+ KASSERT(!SOLISTENING(so2), ("%s: so2 is listening", __func__));
+ SOCK_SENDBUF_LOCK(so2);
+ KASSERT((so2->so_snd.sb_flags & SB_SPLICED) != 0,
+ ("%s: so2 is not spliced", __func__));
+ KASSERT(so2->so_splice_back == sp,
+ ("%s: so_splice_back != sp", __func__));
+ so2->so_snd.sb_flags &= ~SB_SPLICED;
+ so2->so_splice_back = NULL;
+ SOCK_SENDBUF_UNLOCK(so2);
+ SOCK_UNLOCK(so2);
+
+ /*
+ * No new work is being enqueued. The worker thread might be
+ * splicing data right now, in which case we want to wait for it to
+ * finish before proceeding.
+ */
+ mtx_lock(&sp->mtx);
+ switch (sp->state) {
+ case SPLICE_QUEUED:
+ case SPLICE_RUNNING:
+ sp->state = SPLICE_CLOSING;
+ while (sp->state == SPLICE_CLOSING)
+ msleep(sp, &sp->mtx, PSOCK, "unsplice", 0);
+ break;
+ case SPLICE_IDLE:
+ case SPLICE_EXCEPTION:
+ sp->state = SPLICE_CLOSED;
+ break;
+ default:
+ __assert_unreachable();
+ }
+ if (!timeout) {
+ drain = taskqueue_cancel_timeout(taskqueue_thread, &sp->timeout,
+ NULL) != 0;
+ } else {
+ drain = false;
+ }
+ mtx_unlock(&sp->mtx);
+ if (drain)
+ taskqueue_drain_timeout(taskqueue_thread, &sp->timeout);
+
+ /*
+ * Now we hold the sole reference to the splice structure.
+ * Clean up: signal userspace and release socket references.
+ */
+ sorwakeup(so);
+ CURVNET_SET(so->so_vnet);
+ sorele(so);
+ sowwakeup(so2);
+ sorele(so2);
+ CURVNET_RESTORE();
+ so_splice_free(sp);
+ return (0);
+}
+
/*
* Free socket upon release of the very last reference.
*/
@@ -1226,6 +1837,12 @@
("%s: so %p has references", __func__, so));
KASSERT(SOLISTENING(so) || so->so_qstate == SQ_NONE,
("%s: so %p is on listen queue", __func__, so));
+ KASSERT(SOLISTENING(so) || (so->so_rcv.sb_flags & SB_SPLICED) == 0,
+ ("%s: so %p rcvbuf is spliced", __func__, so));
+ KASSERT(SOLISTENING(so) || (so->so_snd.sb_flags & SB_SPLICED) == 0,
+ ("%s: so %p sndbuf is spliced", __func__, so));
+ KASSERT(so->so_splice == NULL && so->so_splice_back == NULL,
+ ("%s: so %p has spliced data", __func__, so));
SOCK_UNLOCK(so);
@@ -3318,6 +3935,40 @@
so->so_max_pacing_rate = val32;
break;
+ case SO_SPLICE: {
+ struct splice splice;
+
+ error = sooptcopyin(sopt, &splice, sizeof(splice),
+ sizeof(splice));
+ if (error)
+ goto bad;
+
+ error = splice_init();
+ if (error != 0)
+ goto bad;
+
+ if (splice.sp_fd >= 0) {
+ struct file *fp;
+ struct socket *so2;
+
+ if (!cap_rights_contains(sopt->sopt_rights,
+ &cap_recv_rights)) {
+ error = ENOTCAPABLE;
+ goto bad;
+ }
+ error = getsock(sopt->sopt_td, splice.sp_fd,
+ &cap_send_rights, &fp);
+ if (error != 0)
+ goto bad;
+ so2 = fp->f_data;
+
+ error = so_splice(so, so2, &splice);
+ fdrop(fp, sopt->sopt_td);
+ } else {
+ error = so_unsplice(so, false);
+ }
+ break;
+ }
default:
#ifdef SOCKET_HHOOK
if (V_socket_hhh[HHOOK_SOCKET_OPT]->hhh_nhooks > 0)
@@ -3537,6 +4188,33 @@
optval = so->so_max_pacing_rate;
goto integer;
+ case SO_SPLICE: {
+ off_t n;
+
+ /*
+ * Acquire the I/O lock to serialize with
+ * so_splice_xfer(). This is not required for
+ * correctness, but makes testing simpler: once a byte
+ * has been transmitted to the sink and observed (e.g.,
+ * by reading from the socket to which the sink is
+ * connected), a subsequent getsockopt(SO_SPLICE) will
+ * return an up-to-date value.
+ */
+ error = SOCK_IO_RECV_LOCK(so, SBL_WAIT);
+ if (error != 0)
+ goto bad;
+ SOCK_LOCK(so);
+ if (SOLISTENING(so)) {
+ n = 0;
+ } else {
+ n = so->so_splice_sent;
+ }
+ SOCK_UNLOCK(so);
+ SOCK_IO_RECV_UNLOCK(so);
+ error = sooptcopyout(sopt, &n, sizeof(n));
+ break;
+ }
+
default:
#ifdef SOCKET_HHOOK
if (V_socket_hhh[HHOOK_SOCKET_OPT]->hhh_nhooks > 0)
@@ -3548,9 +4226,7 @@
break;
}
}
-#ifdef MAC
bad:
-#endif
CURVNET_RESTORE();
return (error);
}
@@ -3713,10 +4389,10 @@
SOCK_SENDBUF_LOCK(so);
SOCK_RECVBUF_LOCK(so);
if (events & (POLLIN | POLLRDNORM))
- if (soreadabledata(so))
+ if (soreadabledata(so) && !isspliced(so))
revents |= events & (POLLIN | POLLRDNORM);
if (events & (POLLOUT | POLLWRNORM))
- if (sowriteable(so))
+ if (sowriteable(so) && !issplicedback(so))
revents |= events & (POLLOUT | POLLWRNORM);
if (events & (POLLPRI | POLLRDBAND))
if (so->so_oobmark ||
@@ -3824,6 +4500,9 @@
return (!TAILQ_EMPTY(&so->sol_comp));
}
+ if ((so->so_rcv.sb_flags & SB_SPLICED) != 0)
+ return (0);
+
SOCK_RECVBUF_LOCK_ASSERT(so);
kn->kn_data = sbavail(&so->so_rcv) - so->so_rcv.sb_ctl;
@@ -4330,6 +5009,8 @@
xso->so_oobmark = so->so_oobmark;
sbtoxsockbuf(&so->so_snd, &xso->so_snd);
sbtoxsockbuf(&so->so_rcv, &xso->so_rcv);
+ if ((so->so_rcv.sb_flags & SB_SPLICED) != 0)
+ xso->so_splice_so = (uintptr_t)so->so_splice->dst;
}
SOCK_UNLOCK(so);
}
diff --git a/sys/sys/sockbuf.h b/sys/sys/sockbuf.h
--- a/sys/sys/sockbuf.h
+++ b/sys/sys/sockbuf.h
@@ -48,7 +48,8 @@
#define SB_AUTOSIZE 0x800 /* automatically size socket buffer */
#define SB_STOP 0x1000 /* backpressure indicator */
#define SB_AIO_RUNNING 0x2000 /* AIO operation running */
-#define SB_UNUSED 0x4000 /* previously used for SB_TLS_IFNET */
+#define SB_SPLICED 0x4000 /* socket buffer is spliced;
+ previously used for SB_TLS_IFNET */
#define SB_TLS_RX_RESYNC 0x8000 /* KTLS RX lost HW sync */
#define SBS_CANTSENDMORE 0x0010 /* can't send more data to peer */
diff --git a/sys/sys/socket.h b/sys/sys/socket.h
--- a/sys/sys/socket.h
+++ b/sys/sys/socket.h
@@ -35,6 +35,7 @@
#include <sys/cdefs.h>
#include <sys/_types.h>
#include <sys/_iovec.h>
+#include <sys/_timeval.h>
#include <machine/_align.h>
/*
@@ -173,6 +174,7 @@
#endif
#if __BSD_VISIBLE
+#define SO_SPLICE 0x1023 /* splice data to other socket */
#define SO_TS_REALTIME_MICRO 0 /* microsecond resolution, realtime */
#define SO_TS_BINTIME 1 /* sub-nanosecond resolution, realtime */
#define SO_TS_REALTIME 2 /* nanosecond resolution, realtime */
@@ -668,6 +670,16 @@
struct msghdr msg_hdr; /* message header */
ssize_t msg_len; /* message length */
};
+
+/*
+ * Structure used for manipulating splice option.
+ */
+struct splice {
+ int sp_fd; /* drain socket file descriptor */
+ off_t sp_max; /* if set, maximum bytes to splice */
+ struct timeval sp_idle; /* idle timeout */
+};
+
#endif /* __BSD_VISIBLE */
#if defined(_FORTIFY_SOURCE) && _FORTIFY_SOURCE > 0
diff --git a/sys/sys/socketvar.h b/sys/sys/socketvar.h
--- a/sys/sys/socketvar.h
+++ b/sys/sys/socketvar.h
@@ -45,9 +45,12 @@
#include <sys/osd.h>
#include <sys/_sx.h>
#include <sys/sockbuf.h>
+#include <sys/_task.h>
#ifdef _KERNEL
#include <sys/caprights.h>
#include <sys/sockopt.h>
+#else
+#include <stdbool.h>
#endif
struct vnet;
@@ -69,6 +72,25 @@
SQ_COMP = 0x1000, /* on sol_comp */
};
+
+struct so_splice {
+ struct socket *src;
+ struct socket *dst;
+ off_t max; /* maximum bytes to splice, or -1 */
+ struct mtx mtx;
+ unsigned int wq_index;
+ enum so_splice_state {
+ SPLICE_IDLE, /* waiting for work to arrive */
+ SPLICE_QUEUED, /* a wakeup has queued some work */
+ SPLICE_RUNNING, /* currently transferring data */
+ SPLICE_CLOSING, /* waiting for work to drain */
+ SPLICE_CLOSED, /* unsplicing, terminal state */
+ SPLICE_EXCEPTION, /* I/O error or limit, implicit unsplice */
+ } state;
+ struct timeout_task timeout;
+ STAILQ_ENTRY(so_splice) next;
+};
+
/*-
* Locking key to struct socket:
* (a) constant after allocation, no locking required.
@@ -79,6 +101,7 @@
* (f) not locked since integer reads/writes are atomic.
* (g) used only as a sleep/wakeup address, no value.
* (h) locked by global mutex so_global_mtx.
+ * (ir,is) locked by recv or send I/O locks.
* (k) locked by KTLS workqueue mutex
*/
TAILQ_HEAD(accept_queue, socket);
@@ -117,6 +140,9 @@
int so_ts_clock; /* type of the clock used for timestamps */
uint32_t so_max_pacing_rate; /* (f) TX rate limit in bytes/s */
+ struct so_splice *so_splice; /* (b) splice state for sink */
+ struct so_splice *so_splice_back; /* (b) splice state for source */
+ off_t so_splice_sent; /* (ir) splice bytes sent so far */
/*
* Mutexes to prevent interleaving of socket I/O. These have to be
@@ -297,6 +323,11 @@
* Macros for sockets and socket buffering.
*/
+
+#define isspliced(so) ((so->so_splice != NULL && \
+ so->so_splice->src != NULL))
+#define issplicedback(so) ((so->so_splice_back != NULL && \
+ so->so_splice_back->dst != NULL))
/*
* Flags to soiolock().
*/
@@ -327,9 +358,17 @@
#define soreadabledata(so) \
(sbavail(&(so)->so_rcv) >= (so)->so_rcv.sb_lowat || \
(so)->so_error || (so)->so_rerror)
-#define soreadable(so) \
+#define _soreadable(so) \
(soreadabledata(so) || ((so)->so_rcv.sb_state & SBS_CANTRCVMORE))
+static inline bool
+soreadable(struct socket *so)
+{
+ if (isspliced(so))
+ return (false);
+ return (_soreadable(so));
+}
+
/* can we write something to so? */
#define sowriteable(so) \
((sbspace(&(so)->so_snd) >= (so)->so_snd.sb_lowat && \
@@ -539,6 +578,11 @@
int soiolock(struct socket *so, struct sx *sx, int flags);
void soiounlock(struct sx *sx);
+/*
+ * Socket splicing routines.
+ */
+void so_splice_dispatch(struct so_splice *sp);
+
/*
* Accept filter functions (duh).
*/
@@ -562,7 +606,8 @@
kvaddr_t xso_so; /* kernel address of struct socket */
kvaddr_t so_pcb; /* kernel address of struct inpcb */
uint64_t so_oobmark;
- int64_t so_spare64[8];
+ kvaddr_t so_splice_so; /* kernel address of spliced socket */
+ int64_t so_spare64[7];
int32_t xso_protocol;
int32_t xso_family;
uint32_t so_qlen;
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Fri, Nov 15, 1:34 AM (6 m, 22 s)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
14636455
Default Alt Text
D46411.id142351.diff (28 KB)
Attached To
Mode
D46411: socket: Implement SO_SPLICE
Attached
Detach File
Event Timeline
Log In to Comment