Page MenuHomeFreeBSD

D46411.id142637.diff
No OneTemporary

D46411.id142637.diff

diff --git a/lib/libsys/getsockopt.2 b/lib/libsys/getsockopt.2
--- a/lib/libsys/getsockopt.2
+++ b/lib/libsys/getsockopt.2
@@ -25,7 +25,7 @@
.\" OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
.\" SUCH DAMAGE.
.\"
-.Dd February 8, 2021
+.Dd July 8, 2024
.Dt GETSOCKOPT 2
.Os
.Sh NAME
@@ -191,6 +191,7 @@
.It Dv SO_MAX_PACING_RATE Ta "set the maximum transmit rate in bytes per second for the socket"
.It Dv SO_NO_OFFLOAD Ta "disables protocol offloads"
.It Dv SO_NO_DDP Ta "disables direct data placement offload"
+.It Dv SO_SPLICE Ta "splice two sockets together"
.El
.Pp
.Dv SO_DEBUG
@@ -551,6 +552,56 @@
reassembled TCP data streams to be received via zero-copy in
user-supplied buffers using
.Xr aio_read 2 .
+.Pp
+.Dv SO_SPLICE ,
+when passed to
+.Fn setsockopt ,
+splices two sockets together using the following
+.Fa optval :
+.Bd -literal
+struct so_splice {
+ int sp_fd;
+ off_t sp_max;
+ struct timeval sp_idle;
+};
+.Ed
+.Pp
+Data received on
+.Fa s
+will automatically be transmitted from the socket specified in
+.Fa sp_fd
+without any intervention by userspace.
+Splicing is a one-way operation; a given pair of sockets may be
+spliced in one or both directions.
+Currently only connected
+.Xr tcp 4
+sockets may be spliced together.
+If
+.Fa sp_max
+is greater than zero, the socket pair will automatically be unspliced
+once that number of bytes have been transmitted.
+If
+.Fa sp_idle
+is non-zero, the socket pair will automatically be unspliced once the
+specified amount of time has elapsed since the initial call to
+.Fn setsockopt .
+If
+.Fa sp_fd
+is -1, the socket will be unspliced immediately.
+.Pp
+When passed to
+.Fn getsockopt ,
+the
+.Dv SO_SPLICE
+option returns a 64-bit integer containing the number of bytes transmitted by
+the most recent splice.
+That is, while the socket is spliced, the value returned will be the number
+of bytes spliced so far.
+When unsplicing, this value is saved and is returned until the socket is closed
+or spliced again.
+For example, if a splice transmits 100 bytes and is then unspliced, a subsequent
+.Nm getsockopt
+call will return 100 until the socket is spliced again.
.Sh RETURN VALUES
.Rv -std
.Sh ERRORS
@@ -618,5 +669,14 @@
.Fn setsockopt
system calls appeared in
.Bx 4.2 .
+The
+.Dv SO_SPLICE
+option originated in
+.Ox 4.9
+and first appeared in
+.Fx 15.0 .
+The
+.Fx
+implementation aims to be source-compatible.
.Sh BUGS
Several of the socket options should be handled at lower levels of the system.
diff --git a/sys/kern/uipc_sockbuf.c b/sys/kern/uipc_sockbuf.c
--- a/sys/kern/uipc_sockbuf.c
+++ b/sys/kern/uipc_sockbuf.c
@@ -508,6 +508,32 @@
SOCK_BUF_UNLOCK_ASSERT(so, which);
}
+static void
+splice_push(struct socket *so)
+{
+ struct so_splice *sp;
+
+ SOCK_RECVBUF_LOCK_ASSERT(so);
+
+ sp = so->so_splice;
+ mtx_lock(&sp->mtx);
+ SOCK_RECVBUF_UNLOCK(so);
+ so_splice_dispatch(sp);
+}
+
+static void
+splice_pull(struct socket *so)
+{
+ struct so_splice *sp;
+
+ SOCK_SENDBUF_LOCK_ASSERT(so);
+
+ sp = so->so_splice_back;
+ mtx_lock(&sp->mtx);
+ SOCK_SENDBUF_UNLOCK(so);
+ so_splice_dispatch(sp);
+}
+
/*
* Do we need to notify the other side when I/O is possible?
*/
@@ -522,7 +548,9 @@
sorwakeup_locked(struct socket *so)
{
SOCK_RECVBUF_LOCK_ASSERT(so);
- if (sb_notify(&so->so_rcv))
+ if (so->so_rcv.sb_flags & SB_SPLICED)
+ splice_push(so);
+ else if (sb_notify(&so->so_rcv))
sowakeup(so, SO_RCV);
else
SOCK_RECVBUF_UNLOCK(so);
@@ -532,7 +560,9 @@
sowwakeup_locked(struct socket *so)
{
SOCK_SENDBUF_LOCK_ASSERT(so);
- if (sb_notify(&so->so_snd))
+ if (so->so_snd.sb_flags & SB_SPLICED)
+ splice_pull(so);
+ else if (sb_notify(&so->so_snd))
sowakeup(so, SO_SND);
else
SOCK_SENDBUF_UNLOCK(so);
diff --git a/sys/kern/uipc_socket.c b/sys/kern/uipc_socket.c
--- a/sys/kern/uipc_socket.c
+++ b/sys/kern/uipc_socket.c
@@ -122,6 +122,7 @@
#include <sys/hhook.h>
#include <sys/kernel.h>
#include <sys/khelp.h>
+#include <sys/kthread.h>
#include <sys/ktls.h>
#include <sys/event.h>
#include <sys/eventhandler.h>
@@ -134,6 +135,7 @@
#include <sys/resourcevar.h>
#include <net/route.h>
#include <sys/signalvar.h>
+#include <sys/smp.h>
#include <sys/stat.h>
#include <sys/sx.h>
#include <sys/sysctl.h>
@@ -159,8 +161,17 @@
#include <compat/freebsd32/freebsd32.h>
#endif
+static int soreceive_generic_locked(struct socket *so,
+ struct sockaddr **psa, struct uio *uio, struct mbuf **mp,
+ struct mbuf **controlp, int *flagsp);
static int soreceive_rcvoob(struct socket *so, struct uio *uio,
int flags);
+static int soreceive_stream_locked(struct socket *so, struct sockbuf *sb,
+ struct sockaddr **psa, struct uio *uio, struct mbuf **mp,
+ struct mbuf **controlp, int flags);
+static int sosend_generic_locked(struct socket *so, struct sockaddr *addr,
+ struct uio *uio, struct mbuf *top, struct mbuf *control,
+ int flags, struct thread *td);
static void so_rdknl_lock(void *);
static void so_rdknl_unlock(void *);
static void so_rdknl_assert_lock(void *, int);
@@ -206,6 +217,21 @@
static inline int hhook_run_socket(struct socket *, void *, int32_t);
#endif
+#ifdef COMPAT_FREEBSD32
+#ifdef __amd64__
+/* off_t has 4-byte alignment on i386 but not on other 32-bit platforms. */
+#define __splice32_packed __packed
+#else
+#define __splice32_packed
+#endif
+struct splice32 {
+ int32_t sp_fd;
+ int64_t sp_max;
+ struct timeval32 sp_idle;
+} __splice32_packed;
+#undef __splice32_packed
+#endif
+
/*
* Limit on the number of connections in the listen queue waiting
* for accept(2).
@@ -278,6 +304,393 @@
maxsockets = uma_zone_set_max(socket_zone, maxsockets);
}
+static int splice_init_state;
+static struct sx splice_init_lock;
+SX_SYSINIT(splice_init_lock, &splice_init_lock, "splice_init");
+
+static SYSCTL_NODE(_kern_ipc, OID_AUTO, splice, CTLFLAG_RW, 0,
+ "Settings relating to the SO_SPLICE socket option");
+
+static bool splice_receive_stream = true;
+SYSCTL_BOOL(_kern_ipc_splice, OID_AUTO, receive_stream, CTLFLAG_RWTUN,
+ &splice_receive_stream, 0,
+ "Use soreceive_stream() for stream splices");
+
+static uma_zone_t splice_zone;
+static struct proc *splice_proc;
+struct splice_wq {
+ struct mtx mtx;
+ STAILQ_HEAD(, so_splice) head;
+ bool running;
+} __aligned(CACHE_LINE_SIZE);
+static struct splice_wq *splice_wq;
+struct splice_domain_info {
+ int count;
+ int cpu[MAXCPU];
+};
+static int splice_bind_threads = 1;
+static uint16_t splice_cpuid_lookup[MAXCPU];
+static struct splice_domain_info splice_domains[MAXMEMDOM];
+static uint32_t splice_index = 0;
+
+static void so_splice_timeout(void *arg, int pending);
+static void so_splice_xfer(struct so_splice *s);
+static int so_unsplice(struct socket *so, bool timeout);
+
+static void
+splice_work_thread(void *ctx)
+{
+ struct splice_wq *wq = ctx;
+ struct so_splice *s, *s_temp;
+ STAILQ_HEAD(, so_splice) local_head;
+ int cpu;
+
+ cpu = wq - splice_wq;
+ if (bootverbose)
+ printf("starting so_splice worker thread for CPU %d\n", cpu);
+
+ for (;;) {
+ mtx_lock(&wq->mtx);
+ while (STAILQ_EMPTY(&wq->head)) {
+ wq->running = false;
+ mtx_sleep(wq, &wq->mtx, 0, "-", 0);
+ wq->running = true;
+ }
+ STAILQ_INIT(&local_head);
+ STAILQ_CONCAT(&local_head, &wq->head);
+ STAILQ_INIT(&wq->head);
+ mtx_unlock(&wq->mtx);
+ STAILQ_FOREACH_SAFE(s, &local_head, next, s_temp) {
+ mtx_lock(&s->mtx);
+ CURVNET_SET(s->src->so_vnet);
+ so_splice_xfer(s);
+ CURVNET_RESTORE();
+ }
+ }
+}
+
+static void
+so_splice_dispatch_async(struct so_splice *sp)
+{
+ struct splice_wq *wq;
+ bool running;
+
+ wq = &splice_wq[sp->wq_index];
+ mtx_lock(&wq->mtx);
+ STAILQ_INSERT_TAIL(&wq->head, sp, next);
+ running = wq->running;
+ mtx_unlock(&wq->mtx);
+ if (!running)
+ wakeup(wq);
+}
+
+void
+so_splice_dispatch(struct so_splice *sp)
+{
+ mtx_assert(&sp->mtx, MA_OWNED);
+
+ if (sp->state != SPLICE_IDLE) {
+ mtx_unlock(&sp->mtx);
+ } else {
+ sp->state = SPLICE_QUEUED;
+ mtx_unlock(&sp->mtx);
+ so_splice_dispatch_async(sp);
+ }
+}
+
+static int
+splice_zinit(void *mem, int size __unused, int flags __unused)
+{
+ struct so_splice *s;
+
+ s = (struct so_splice *)mem;
+ mtx_init(&s->mtx, "so_splice", NULL, MTX_DEF);
+ return (0);
+}
+
+static void
+splice_zfini(void *mem, int size)
+{
+ struct so_splice *s;
+
+ s = (struct so_splice *)mem;
+ mtx_destroy(&s->mtx);
+}
+
+static int
+splice_init(void)
+{
+ struct thread *td;
+ struct pcpu *pc;
+ int count, domain, error, i, nthr, state;
+
+ state = atomic_load_acq_int(&splice_init_state);
+ if (__predict_true(state > 0))
+ return (0);
+ if (state < 0)
+ return (ENXIO);
+ sx_xlock(&splice_init_lock);
+ if (splice_init_state != 0) {
+ sx_xunlock(&splice_init_lock);
+ return (0);
+ }
+
+ splice_zone = uma_zcreate("splice", sizeof(struct so_splice), NULL,
+ NULL, splice_zinit, splice_zfini, UMA_ALIGN_CACHE, 0);
+
+ splice_wq = mallocarray(mp_maxid + 1, sizeof(*splice_wq), M_TEMP,
+ M_WAITOK | M_ZERO);
+
+ /*
+ * Initialize the workqueues to run the splice work. We create a
+ * work queue for each CPU.
+ */
+ nthr = 0;
+ CPU_FOREACH(i) {
+ STAILQ_INIT(&splice_wq[i].head);
+ mtx_init(&splice_wq[i].mtx, "splice work queue", NULL, MTX_DEF);
+ if (splice_bind_threads > 1) {
+ pc = pcpu_find(i);
+ domain = pc->pc_domain;
+ count = splice_domains[domain].count;
+ splice_domains[domain].cpu[count] = i;
+ splice_domains[domain].count++;
+ }
+ splice_cpuid_lookup[nthr++] = i;
+ }
+
+ /*
+ * If we somehow have an empty domain, fall back to choosing
+ * among all SPLICE threads.
+ */
+ if (splice_bind_threads > 1) {
+ for (i = 0; i < vm_ndomains; i++) {
+ if (splice_domains[i].count == 0) {
+ splice_bind_threads = 1;
+ break;
+ }
+ }
+ }
+
+ /* Start kthreads for each workqueue. */
+ error = 0;
+ CPU_FOREACH(i) {
+ error = kproc_kthread_add(splice_work_thread, &splice_wq[i],
+ &splice_proc, &td, 0, 0, "so_splice", "thr_%d", i);
+ if (error) {
+ printf("Can't add so_splice thread %d error %d\n",
+ i, error);
+ break;
+ }
+ }
+
+ splice_init_state = error != 0 ? -1 : 1;
+ sx_xunlock(&splice_init_lock);
+
+ return (error);
+}
+
+/*
+ * Lock a pair of socket's I/O locks for splicing. Avoid blocking while holding
+ * one lock in order to avoid potential deadlocks in case there is some other
+ * code path which acquires more than one I/O lock at a time.
+ */
+static void
+splice_lock_pair(struct socket *so_src, struct socket *so_dst)
+{
+ int error;
+
+ for (;;) {
+ error = SOCK_IO_SEND_LOCK(so_dst, SBL_WAIT | SBL_NOINTR);
+ KASSERT(error == 0,
+ ("%s: failed to lock send I/O lock: %d", __func__, error));
+ error = SOCK_IO_RECV_LOCK(so_src, 0);
+ KASSERT(error == 0 || error == EWOULDBLOCK,
+ ("%s: failed to lock recv I/O lock: %d", __func__, error));
+ if (error == 0)
+ break;
+ SOCK_IO_SEND_UNLOCK(so_dst);
+
+ error = SOCK_IO_RECV_LOCK(so_src, SBL_WAIT | SBL_NOINTR);
+ KASSERT(error == 0,
+ ("%s: failed to lock recv I/O lock: %d", __func__, error));
+ error = SOCK_IO_SEND_LOCK(so_dst, 0);
+ KASSERT(error == 0 || error == EWOULDBLOCK,
+ ("%s: failed to lock send I/O lock: %d", __func__, error));
+ if (error == 0)
+ break;
+ SOCK_IO_RECV_UNLOCK(so_src);
+ }
+}
+
+static void
+splice_unlock_pair(struct socket *so_src, struct socket *so_dst)
+{
+ SOCK_IO_RECV_UNLOCK(so_src);
+ SOCK_IO_SEND_UNLOCK(so_dst);
+}
+
+/*
+ * Move data from the source to the sink. Assumes that both of the relevant
+ * socket I/O locks are held.
+ */
+static int
+so_splice_xfer_data(struct socket *so_src, struct socket *so_dst, off_t max,
+ ssize_t *lenp)
+{
+ struct uio uio;
+ struct mbuf *m;
+ struct sockbuf *sb_src, *sb_dst;
+ ssize_t len;
+ long space;
+ int error, flags;
+
+ SOCK_IO_RECV_ASSERT_LOCKED(so_src);
+ SOCK_IO_SEND_ASSERT_LOCKED(so_dst);
+
+ error = 0;
+ m = NULL;
+ memset(&uio, 0, sizeof(uio));
+
+ sb_src = &so_src->so_rcv;
+ sb_dst = &so_dst->so_snd;
+
+ space = sbspace(sb_dst);
+ if (space < 0)
+ space = 0;
+ len = MIN(max, MIN(space, sbavail(sb_src)));
+ if (len == 0) {
+ SOCK_RECVBUF_LOCK(so_src);
+ if ((sb_src->sb_state & SBS_CANTRCVMORE) != 0)
+ error = EPIPE;
+ SOCK_RECVBUF_UNLOCK(so_src);
+ } else {
+ flags = MSG_DONTWAIT;
+ uio.uio_resid = len;
+ if (splice_receive_stream && sb_src->sb_tls_info == NULL) {
+ error = soreceive_stream_locked(so_src, sb_src, NULL,
+ &uio, &m, NULL, flags);
+ } else {
+ error = soreceive_generic_locked(so_src, NULL,
+ &uio, &m, NULL, &flags);
+ }
+ if (error != 0 && m != NULL) {
+ m_freem(m);
+ m = NULL;
+ }
+ }
+ if (m != NULL) {
+ len -= uio.uio_resid;
+ error = sosend_generic_locked(so_dst, NULL, NULL, m, NULL,
+ MSG_DONTWAIT, curthread);
+ } else if (error == 0) {
+ len = 0;
+ SOCK_SENDBUF_LOCK(so_dst);
+ if ((sb_dst->sb_state & SBS_CANTSENDMORE) != 0)
+ error = EPIPE;
+ SOCK_SENDBUF_UNLOCK(so_dst);
+ }
+ if (error == 0)
+ *lenp = len;
+ return (error);
+}
+
+/*
+ * Transfer data from the source to the sink.
+ *
+ * If "direct" is true, the transfer is done in the context of whichever thread
+ * is operating on one of the socket buffers. We do not know which locks are
+ * held, so we can only trylock the socket buffers; if this fails, we fall back
+ * to the worker thread, which invokes this routine with "direct" set to false.
+ */
+static void
+so_splice_xfer(struct so_splice *sp)
+{
+ struct socket *so_src, *so_dst;
+ off_t max;
+ ssize_t len;
+ int error;
+
+ mtx_assert(&sp->mtx, MA_OWNED);
+ KASSERT(sp->state == SPLICE_QUEUED || sp->state == SPLICE_CLOSING,
+ ("so_splice_xfer: invalid state %d", sp->state));
+ KASSERT(sp->max != 0, ("so_splice_xfer: max == 0"));
+
+ if (sp->state == SPLICE_CLOSING) {
+ /* Userspace asked us to close the splice. */
+ goto closing;
+ }
+
+ sp->state = SPLICE_RUNNING;
+ so_src = sp->src;
+ so_dst = sp->dst;
+ max = sp->max > 0 ? sp->max - so_src->so_splice_sent : OFF_MAX;
+ if (max < 0)
+ max = 0;
+
+ /*
+ * Lock the sockets in order to block userspace from doing anything
+ * sneaky. If an error occurs or one of the sockets can no longer
+ * transfer data, we will automatically unsplice.
+ */
+ mtx_unlock(&sp->mtx);
+ splice_lock_pair(so_src, so_dst);
+
+ error = so_splice_xfer_data(so_src, so_dst, max, &len);
+
+ mtx_lock(&sp->mtx);
+
+ /*
+ * Update our stats while still holding the socket locks. This
+ * synchronizes with getsockopt(SO_SPLICE), see the comment there.
+ */
+ if (error == 0) {
+ KASSERT(len >= 0, ("%s: len %zd < 0", __func__, len));
+ so_src->so_splice_sent += len;
+ }
+ splice_unlock_pair(so_src, so_dst);
+
+ switch (sp->state) {
+ case SPLICE_CLOSING:
+closing:
+ sp->state = SPLICE_CLOSED;
+ wakeup(sp);
+ mtx_unlock(&sp->mtx);
+ break;
+ case SPLICE_RUNNING:
+ if (error != 0 ||
+ (sp->max > 0 && so_src->so_splice_sent >= sp->max)) {
+ sp->state = SPLICE_EXCEPTION;
+ soref(so_src);
+ mtx_unlock(&sp->mtx);
+ (void)so_unsplice(so_src, false);
+ sorele(so_src);
+ } else {
+ /*
+ * Locklessly check for additional bytes in the source's
+ * receive buffer and queue more work if possible. We
+ * may end up queuing needless work, but that's ok, and
+ * if we race with a thread inserting more data into the
+ * buffer and observe sbavail() == 0, the splice mutex
+ * ensures that splice_push() will queue more work for
+ * us.
+ */
+ if (sbavail(&so_src->so_rcv) > 0 &&
+ sbspace(&so_dst->so_snd) > 0) {
+ sp->state = SPLICE_QUEUED;
+ mtx_unlock(&sp->mtx);
+ so_splice_dispatch_async(sp);
+ } else {
+ sp->state = SPLICE_IDLE;
+ mtx_unlock(&sp->mtx);
+ }
+ }
+ break;
+ default:
+ __assert_unreachable();
+ }
+}
+
static void
socket_init(void *tag)
{
@@ -1213,6 +1626,219 @@
return (0);
}
+static struct so_splice *
+so_splice_alloc(off_t max)
+{
+ struct so_splice *sp;
+
+ sp = uma_zalloc(splice_zone, M_WAITOK);
+ sp->src = NULL;
+ sp->dst = NULL;
+ sp->max = max > 0 ? max : -1;
+ do {
+ sp->wq_index = atomic_fetchadd_32(&splice_index, 1) %
+ (mp_maxid + 1);
+ } while (CPU_ABSENT(sp->wq_index));
+ sp->state = SPLICE_IDLE;
+ TIMEOUT_TASK_INIT(taskqueue_thread, &sp->timeout, 0, so_splice_timeout,
+ sp);
+ return (sp);
+}
+
+static void
+so_splice_free(struct so_splice *sp)
+{
+ KASSERT(sp->state == SPLICE_CLOSED,
+ ("so_splice_free: sp %p not closed", sp));
+ uma_zfree(splice_zone, sp);
+}
+
+static void
+so_splice_timeout(void *arg, int pending __unused)
+{
+ struct so_splice *sp;
+
+ sp = arg;
+ (void)so_unsplice(sp->src, true);
+}
+
+/*
+ * Splice the output from so to the input of so2.
+ */
+static int
+so_splice(struct socket *so, struct socket *so2, struct splice *splice)
+{
+ struct so_splice *sp;
+ int error;
+
+ if (splice->sp_max < 0)
+ return (EINVAL);
+ /* Handle only TCP for now; TODO: other streaming protos */
+ if (so->so_proto->pr_protocol != IPPROTO_TCP ||
+ so2->so_proto->pr_protocol != IPPROTO_TCP)
+ return (EPROTONOSUPPORT);
+ if (so->so_vnet != so2->so_vnet)
+ return (EINVAL);
+
+ /* so_splice_xfer() assumes that we're using these implementations. */
+ KASSERT(so->so_proto->pr_sosend == sosend_generic,
+ ("so_splice: sosend not sosend_generic"));
+ KASSERT(so2->so_proto->pr_soreceive == soreceive_generic ||
+ so2->so_proto->pr_soreceive == soreceive_stream,
+ ("so_splice: soreceive not soreceive_generic/stream"));
+
+ sp = so_splice_alloc(splice->sp_max);
+ so->so_splice_sent = 0;
+ sp->src = so;
+ sp->dst = so2;
+
+ error = 0;
+ SOCK_LOCK(so);
+ if (SOLISTENING(so))
+ error = EINVAL;
+ else if ((so->so_state & (SS_ISCONNECTED | SS_ISCONNECTING)) == 0)
+ error = ENOTCONN;
+ else if (so->so_splice != NULL)
+ error = EBUSY;
+ if (error != 0) {
+ SOCK_UNLOCK(so);
+ uma_zfree(splice_zone, sp);
+ return (error);
+ }
+ soref(so);
+ so->so_splice = sp;
+ SOCK_RECVBUF_LOCK(so);
+ so->so_rcv.sb_flags |= SB_SPLICED;
+ SOCK_RECVBUF_UNLOCK(so);
+ SOCK_UNLOCK(so);
+
+ error = 0;
+ SOCK_LOCK(so2);
+ if (SOLISTENING(so2))
+ error = EINVAL;
+ else if ((so2->so_state & (SS_ISCONNECTED | SS_ISCONNECTING)) == 0)
+ error = ENOTCONN;
+ else if (so2->so_splice_back != NULL)
+ error = EBUSY;
+ if (error != 0) {
+ SOCK_UNLOCK(so2);
+ SOCK_LOCK(so);
+ so->so_splice = NULL;
+ SOCK_RECVBUF_LOCK(so);
+ so->so_rcv.sb_flags &= ~SB_SPLICED;
+ SOCK_RECVBUF_UNLOCK(so);
+ SOCK_UNLOCK(so);
+ sorele(so);
+ uma_zfree(splice_zone, sp);
+ return (error);
+ }
+ soref(so2);
+ so2->so_splice_back = sp;
+ SOCK_SENDBUF_LOCK(so2);
+ so2->so_snd.sb_flags |= SB_SPLICED;
+ mtx_lock(&sp->mtx);
+ SOCK_SENDBUF_UNLOCK(so2);
+ SOCK_UNLOCK(so2);
+
+ if (splice->sp_idle.tv_sec != 0 || splice->sp_idle.tv_usec != 0) {
+ taskqueue_enqueue_timeout_sbt(taskqueue_thread, &sp->timeout,
+ tvtosbt(splice->sp_idle), 0, C_PREL(4));
+ }
+
+ /*
+ * Transfer any data already present in the socket buffer.
+ */
+ sp->state = SPLICE_QUEUED;
+ so_splice_xfer(sp);
+ return (0);
+}
+
+static int
+so_unsplice(struct socket *so, bool timeout)
+{
+ struct socket *so2;
+ struct so_splice *sp;
+ bool drain;
+
+ /*
+ * First unset SB_SPLICED and hide the splice structure so that
+ * wakeup routines will stop enqueuing work. This also ensures that
+ * a only a single thread will proceed with the unsplice.
+ */
+ SOCK_LOCK(so);
+ if (SOLISTENING(so)) {
+ SOCK_UNLOCK(so);
+ return (EINVAL);
+ }
+ SOCK_RECVBUF_LOCK(so);
+ if ((so->so_rcv.sb_flags & SB_SPLICED) == 0) {
+ SOCK_RECVBUF_UNLOCK(so);
+ SOCK_UNLOCK(so);
+ return (ENOTCONN);
+ }
+ so->so_rcv.sb_flags &= ~SB_SPLICED;
+ sp = so->so_splice;
+ so->so_splice = NULL;
+ SOCK_RECVBUF_UNLOCK(so);
+ SOCK_UNLOCK(so);
+
+ so2 = sp->dst;
+ SOCK_LOCK(so2);
+ KASSERT(!SOLISTENING(so2), ("%s: so2 is listening", __func__));
+ SOCK_SENDBUF_LOCK(so2);
+ KASSERT((so2->so_snd.sb_flags & SB_SPLICED) != 0,
+ ("%s: so2 is not spliced", __func__));
+ KASSERT(so2->so_splice_back == sp,
+ ("%s: so_splice_back != sp", __func__));
+ so2->so_snd.sb_flags &= ~SB_SPLICED;
+ so2->so_splice_back = NULL;
+ SOCK_SENDBUF_UNLOCK(so2);
+ SOCK_UNLOCK(so2);
+
+ /*
+ * No new work is being enqueued. The worker thread might be
+ * splicing data right now, in which case we want to wait for it to
+ * finish before proceeding.
+ */
+ mtx_lock(&sp->mtx);
+ switch (sp->state) {
+ case SPLICE_QUEUED:
+ case SPLICE_RUNNING:
+ sp->state = SPLICE_CLOSING;
+ while (sp->state == SPLICE_CLOSING)
+ msleep(sp, &sp->mtx, PSOCK, "unsplice", 0);
+ break;
+ case SPLICE_IDLE:
+ case SPLICE_EXCEPTION:
+ sp->state = SPLICE_CLOSED;
+ break;
+ default:
+ __assert_unreachable();
+ }
+ if (!timeout) {
+ drain = taskqueue_cancel_timeout(taskqueue_thread, &sp->timeout,
+ NULL) != 0;
+ } else {
+ drain = false;
+ }
+ mtx_unlock(&sp->mtx);
+ if (drain)
+ taskqueue_drain_timeout(taskqueue_thread, &sp->timeout);
+
+ /*
+ * Now we hold the sole reference to the splice structure.
+ * Clean up: signal userspace and release socket references.
+ */
+ sorwakeup(so);
+ CURVNET_SET(so->so_vnet);
+ sorele(so);
+ sowwakeup(so2);
+ sorele(so2);
+ CURVNET_RESTORE();
+ so_splice_free(sp);
+ return (0);
+}
+
/*
* Free socket upon release of the very last reference.
*/
@@ -1226,6 +1852,12 @@
("%s: so %p has references", __func__, so));
KASSERT(SOLISTENING(so) || so->so_qstate == SQ_NONE,
("%s: so %p is on listen queue", __func__, so));
+ KASSERT(SOLISTENING(so) || (so->so_rcv.sb_flags & SB_SPLICED) == 0,
+ ("%s: so %p rcvbuf is spliced", __func__, so));
+ KASSERT(SOLISTENING(so) || (so->so_snd.sb_flags & SB_SPLICED) == 0,
+ ("%s: so %p sndbuf is spliced", __func__, so));
+ KASSERT(so->so_splice == NULL && so->so_splice_back == NULL,
+ ("%s: so %p has spliced data", __func__, so));
SOCK_UNLOCK(so);
@@ -3318,6 +3950,59 @@
so->so_max_pacing_rate = val32;
break;
+ case SO_SPLICE: {
+ struct splice splice;
+
+#ifdef COMPAT_FREEBSD32
+ if (SV_CURPROC_FLAG(SV_ILP32)) {
+ struct splice32 splice32;
+
+ error = sooptcopyin(sopt, &splice32,
+ sizeof(splice32), sizeof(splice32));
+ if (error == 0) {
+ splice.sp_fd = splice32.sp_fd;
+ splice.sp_max = splice32.sp_max;
+ CP(splice32.sp_idle, splice.sp_idle,
+ tv_sec);
+ CP(splice32.sp_idle, splice.sp_idle,
+ tv_usec);
+ }
+ } else
+#endif
+ {
+ error = sooptcopyin(sopt, &splice,
+ sizeof(splice), sizeof(splice));
+ }
+ if (error)
+ goto bad;
+ ktrsplice(&splice);
+
+ error = splice_init();
+ if (error != 0)
+ goto bad;
+
+ if (splice.sp_fd >= 0) {
+ struct file *fp;
+ struct socket *so2;
+
+ if (!cap_rights_contains(sopt->sopt_rights,
+ &cap_recv_rights)) {
+ error = ENOTCAPABLE;
+ goto bad;
+ }
+ error = getsock(sopt->sopt_td, splice.sp_fd,
+ &cap_send_rights, &fp);
+ if (error != 0)
+ goto bad;
+ so2 = fp->f_data;
+
+ error = so_splice(so, so2, &splice);
+ fdrop(fp, sopt->sopt_td);
+ } else {
+ error = so_unsplice(so, false);
+ }
+ break;
+ }
default:
#ifdef SOCKET_HHOOK
if (V_socket_hhh[HHOOK_SOCKET_OPT]->hhh_nhooks > 0)
@@ -3537,6 +4222,33 @@
optval = so->so_max_pacing_rate;
goto integer;
+ case SO_SPLICE: {
+ off_t n;
+
+ /*
+ * Acquire the I/O lock to serialize with
+ * so_splice_xfer(). This is not required for
+ * correctness, but makes testing simpler: once a byte
+ * has been transmitted to the sink and observed (e.g.,
+ * by reading from the socket to which the sink is
+ * connected), a subsequent getsockopt(SO_SPLICE) will
+ * return an up-to-date value.
+ */
+ error = SOCK_IO_RECV_LOCK(so, SBL_WAIT);
+ if (error != 0)
+ goto bad;
+ SOCK_LOCK(so);
+ if (SOLISTENING(so)) {
+ n = 0;
+ } else {
+ n = so->so_splice_sent;
+ }
+ SOCK_UNLOCK(so);
+ SOCK_IO_RECV_UNLOCK(so);
+ error = sooptcopyout(sopt, &n, sizeof(n));
+ break;
+ }
+
default:
#ifdef SOCKET_HHOOK
if (V_socket_hhh[HHOOK_SOCKET_OPT]->hhh_nhooks > 0)
@@ -3548,9 +4260,7 @@
break;
}
}
-#ifdef MAC
bad:
-#endif
CURVNET_RESTORE();
return (error);
}
@@ -3713,10 +4423,10 @@
SOCK_SENDBUF_LOCK(so);
SOCK_RECVBUF_LOCK(so);
if (events & (POLLIN | POLLRDNORM))
- if (soreadabledata(so))
+ if (soreadabledata(so) && !isspliced(so))
revents |= events & (POLLIN | POLLRDNORM);
if (events & (POLLOUT | POLLWRNORM))
- if (sowriteable(so))
+ if (sowriteable(so) && !issplicedback(so))
revents |= events & (POLLOUT | POLLWRNORM);
if (events & (POLLPRI | POLLRDBAND))
if (so->so_oobmark ||
@@ -3824,6 +4534,9 @@
return (!TAILQ_EMPTY(&so->sol_comp));
}
+ if ((so->so_rcv.sb_flags & SB_SPLICED) != 0)
+ return (0);
+
SOCK_RECVBUF_LOCK_ASSERT(so);
kn->kn_data = sbavail(&so->so_rcv) - so->so_rcv.sb_ctl;
@@ -4330,6 +5043,8 @@
xso->so_oobmark = so->so_oobmark;
sbtoxsockbuf(&so->so_snd, &xso->so_snd);
sbtoxsockbuf(&so->so_rcv, &xso->so_rcv);
+ if ((so->so_rcv.sb_flags & SB_SPLICED) != 0)
+ xso->so_splice_so = (uintptr_t)so->so_splice->dst;
}
SOCK_UNLOCK(so);
}
diff --git a/sys/sys/ktrace.h b/sys/sys/ktrace.h
--- a/sys/sys/ktrace.h
+++ b/sys/sys/ktrace.h
@@ -347,6 +347,8 @@
ktrstruct_error("stat", (s), sizeof(struct stat), error)
#define ktrcpuset(s, l) \
ktrstruct("cpuset_t", (s), l)
+#define ktrsplice(s) \
+ ktrstruct("splice", (s), sizeof(struct splice))
extern u_int ktr_geniosize;
#ifdef KTRACE
extern int ktr_filesize_limit_signal;
diff --git a/sys/sys/sockbuf.h b/sys/sys/sockbuf.h
--- a/sys/sys/sockbuf.h
+++ b/sys/sys/sockbuf.h
@@ -48,7 +48,8 @@
#define SB_AUTOSIZE 0x800 /* automatically size socket buffer */
#define SB_STOP 0x1000 /* backpressure indicator */
#define SB_AIO_RUNNING 0x2000 /* AIO operation running */
-#define SB_UNUSED 0x4000 /* previously used for SB_TLS_IFNET */
+#define SB_SPLICED 0x4000 /* socket buffer is spliced;
+ previously used for SB_TLS_IFNET */
#define SB_TLS_RX_RESYNC 0x8000 /* KTLS RX lost HW sync */
#define SBS_CANTSENDMORE 0x0010 /* can't send more data to peer */
diff --git a/sys/sys/socket.h b/sys/sys/socket.h
--- a/sys/sys/socket.h
+++ b/sys/sys/socket.h
@@ -35,6 +35,7 @@
#include <sys/cdefs.h>
#include <sys/_types.h>
#include <sys/_iovec.h>
+#include <sys/_timeval.h>
#include <machine/_align.h>
/*
@@ -173,6 +174,7 @@
#endif
#if __BSD_VISIBLE
+#define SO_SPLICE 0x1023 /* splice data to other socket */
#define SO_TS_REALTIME_MICRO 0 /* microsecond resolution, realtime */
#define SO_TS_BINTIME 1 /* sub-nanosecond resolution, realtime */
#define SO_TS_REALTIME 2 /* nanosecond resolution, realtime */
@@ -668,6 +670,16 @@
struct msghdr msg_hdr; /* message header */
ssize_t msg_len; /* message length */
};
+
+/*
+ * Structure used for manipulating splice option.
+ */
+struct splice {
+ int sp_fd; /* drain socket file descriptor */
+ off_t sp_max; /* if set, maximum bytes to splice */
+ struct timeval sp_idle; /* idle timeout */
+};
+
#endif /* __BSD_VISIBLE */
#if defined(_FORTIFY_SOURCE) && _FORTIFY_SOURCE > 0
diff --git a/sys/sys/socketvar.h b/sys/sys/socketvar.h
--- a/sys/sys/socketvar.h
+++ b/sys/sys/socketvar.h
@@ -45,9 +45,12 @@
#include <sys/osd.h>
#include <sys/_sx.h>
#include <sys/sockbuf.h>
+#include <sys/_task.h>
#ifdef _KERNEL
#include <sys/caprights.h>
#include <sys/sockopt.h>
+#else
+#include <stdbool.h>
#endif
struct vnet;
@@ -69,6 +72,25 @@
SQ_COMP = 0x1000, /* on sol_comp */
};
+
+struct so_splice {
+ struct socket *src;
+ struct socket *dst;
+ off_t max; /* maximum bytes to splice, or -1 */
+ struct mtx mtx;
+ unsigned int wq_index;
+ enum so_splice_state {
+ SPLICE_IDLE, /* waiting for work to arrive */
+ SPLICE_QUEUED, /* a wakeup has queued some work */
+ SPLICE_RUNNING, /* currently transferring data */
+ SPLICE_CLOSING, /* waiting for work to drain */
+ SPLICE_CLOSED, /* unsplicing, terminal state */
+ SPLICE_EXCEPTION, /* I/O error or limit, implicit unsplice */
+ } state;
+ struct timeout_task timeout;
+ STAILQ_ENTRY(so_splice) next;
+};
+
/*-
* Locking key to struct socket:
* (a) constant after allocation, no locking required.
@@ -79,6 +101,7 @@
* (f) not locked since integer reads/writes are atomic.
* (g) used only as a sleep/wakeup address, no value.
* (h) locked by global mutex so_global_mtx.
+ * (ir,is) locked by recv or send I/O locks.
* (k) locked by KTLS workqueue mutex
*/
TAILQ_HEAD(accept_queue, socket);
@@ -117,6 +140,9 @@
int so_ts_clock; /* type of the clock used for timestamps */
uint32_t so_max_pacing_rate; /* (f) TX rate limit in bytes/s */
+ struct so_splice *so_splice; /* (b) splice state for sink */
+ struct so_splice *so_splice_back; /* (b) splice state for source */
+ off_t so_splice_sent; /* (ir) splice bytes sent so far */
/*
* Mutexes to prevent interleaving of socket I/O. These have to be
@@ -297,6 +323,11 @@
* Macros for sockets and socket buffering.
*/
+
+#define isspliced(so) ((so->so_splice != NULL && \
+ so->so_splice->src != NULL))
+#define issplicedback(so) ((so->so_splice_back != NULL && \
+ so->so_splice_back->dst != NULL))
/*
* Flags to soiolock().
*/
@@ -327,9 +358,17 @@
#define soreadabledata(so) \
(sbavail(&(so)->so_rcv) >= (so)->so_rcv.sb_lowat || \
(so)->so_error || (so)->so_rerror)
-#define soreadable(so) \
+#define _soreadable(so) \
(soreadabledata(so) || ((so)->so_rcv.sb_state & SBS_CANTRCVMORE))
+static inline bool
+soreadable(struct socket *so)
+{
+ if (isspliced(so))
+ return (false);
+ return (_soreadable(so));
+}
+
/* can we write something to so? */
#define sowriteable(so) \
((sbspace(&(so)->so_snd) >= (so)->so_snd.sb_lowat && \
@@ -539,6 +578,11 @@
int soiolock(struct socket *so, struct sx *sx, int flags);
void soiounlock(struct sx *sx);
+/*
+ * Socket splicing routines.
+ */
+void so_splice_dispatch(struct so_splice *sp);
+
/*
* Accept filter functions (duh).
*/
@@ -562,7 +606,8 @@
kvaddr_t xso_so; /* kernel address of struct socket */
kvaddr_t so_pcb; /* kernel address of struct inpcb */
uint64_t so_oobmark;
- int64_t so_spare64[8];
+ kvaddr_t so_splice_so; /* kernel address of spliced socket */
+ int64_t so_spare64[7];
int32_t xso_protocol;
int32_t xso_family;
uint32_t so_qlen;

File Metadata

Mime Type
text/plain
Expires
Fri, Nov 15, 4:13 PM (14 h, 31 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
14645136
Default Alt Text
D46411.id142637.diff (29 KB)

Event Timeline