Page MenuHomeFreeBSD

D5209.diff
No OneTemporary

D5209.diff

Index: sys/kern/subr_taskqueue.c
===================================================================
--- sys/kern/subr_taskqueue.c
+++ sys/kern/subr_taskqueue.c
@@ -34,12 +34,14 @@
#include <sys/interrupt.h>
#include <sys/kernel.h>
#include <sys/kthread.h>
+#include <sys/libkern.h>
#include <sys/limits.h>
#include <sys/lock.h>
#include <sys/malloc.h>
#include <sys/mutex.h>
#include <sys/proc.h>
#include <sys/sched.h>
+#include <sys/smp.h>
#include <sys/taskqueue.h>
#include <sys/unistd.h>
#include <machine/stdarg.h>
@@ -62,6 +64,7 @@
STAILQ_HEAD(, task) tq_queue;
taskqueue_enqueue_fn tq_enqueue;
void *tq_context;
+ char *tq_name;
TAILQ_HEAD(, taskqueue_busy) tq_active;
struct mtx tq_mutex;
struct thread **tq_threads;
@@ -119,11 +122,17 @@
}
static struct taskqueue *
-_taskqueue_create(const char *name __unused, int mflags,
+_taskqueue_create(const char *name, int mflags,
taskqueue_enqueue_fn enqueue, void *context,
- int mtxflags, const char *mtxname)
+ int mtxflags, const char *mtxname __unused)
{
struct taskqueue *queue;
+ char *tq_name = NULL;
+
+ if (name != NULL)
+ tq_name = strndup(name, 32, M_TASKQUEUE);
+ if (tq_name == NULL)
+ tq_name = "taskqueue";
queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
if (!queue)
@@ -133,6 +142,7 @@
TAILQ_INIT(&queue->tq_active);
queue->tq_enqueue = enqueue;
queue->tq_context = context;
+ queue->tq_name = tq_name;
queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
queue->tq_flags |= TQ_FLAGS_ACTIVE;
if (enqueue == taskqueue_fast_enqueue ||
@@ -140,7 +150,7 @@
enqueue == taskqueue_swi_giant_enqueue ||
enqueue == taskqueue_thread_enqueue)
queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
- mtx_init(&queue->tq_mutex, mtxname, NULL, mtxflags);
+ mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
return queue;
}
@@ -149,8 +159,9 @@
taskqueue_create(const char *name, int mflags,
taskqueue_enqueue_fn enqueue, void *context)
{
+
return _taskqueue_create(name, mflags, enqueue, context,
- MTX_DEF, "taskqueue");
+ MTX_DEF, name);
}
void
@@ -194,6 +205,7 @@
KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
mtx_destroy(&queue->tq_mutex);
free(queue->tq_threads, M_TASKQUEUE);
+ free(queue->tq_name, M_TASKQUEUE);
free(queue, M_TASKQUEUE);
}
@@ -203,6 +215,7 @@
struct task *ins;
struct task *prev;
+ KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func"));
/*
* Count multiple enqueues.
*/
@@ -410,6 +423,7 @@
struct task *task;
int pending;
+ KASSERT(queue != NULL, ("tq is NULL"));
TQ_ASSERT_LOCKED(queue);
tb.tb_running = NULL;
@@ -421,12 +435,14 @@
* zero its pending count.
*/
task = STAILQ_FIRST(&queue->tq_queue);
+ KASSERT(task != NULL, ("task is NULL"));
STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
pending = task->ta_pending;
task->ta_pending = 0;
tb.tb_running = task;
TQ_UNLOCK(queue);
+ KASSERT(task->ta_func != NULL, ("task->ta_func is NULL"));
task->ta_func(task->ta_context, pending);
TQ_LOCK(queue);
@@ -680,6 +696,7 @@
taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
TQ_LOCK(tq);
while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
+ /* XXX ? */
taskqueue_run_locked(tq);
/*
* Because taskqueue_run() can drop tq_mutex, we need to
@@ -779,3 +796,317 @@
}
return (ret);
}
+
+struct taskqgroup_cpu {
+ LIST_HEAD(, grouptask) tgc_tasks;
+ struct taskqueue *tgc_taskq;
+ int tgc_cnt;
+ int tgc_cpu;
+};
+
+struct taskqgroup {
+ struct taskqgroup_cpu tqg_queue[MAXCPU];
+ struct mtx tqg_lock;
+ char * tqg_name;
+ int tqg_adjusting;
+ int tqg_stride;
+ int tqg_cnt;
+};
+
+struct taskq_bind_task {
+ struct task bt_task;
+ int bt_cpuid;
+};
+
+static void
+taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx)
+{
+ struct taskqgroup_cpu *qcpu;
+
+ qcpu = &qgroup->tqg_queue[idx];
+ LIST_INIT(&qcpu->tgc_tasks);
+ qcpu->tgc_taskq = taskqueue_create_fast(NULL, M_WAITOK,
+ taskqueue_thread_enqueue, &qcpu->tgc_taskq);
+ taskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT,
+ "%s_%d", qgroup->tqg_name, idx);
+ qcpu->tgc_cpu = idx * qgroup->tqg_stride;
+}
+
+static void
+taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx)
+{
+
+ taskqueue_free(qgroup->tqg_queue[idx].tgc_taskq);
+}
+
+/*
+ * Find the taskq with least # of tasks that doesn't currently have any
+ * other queues from the uniq identifier.
+ */
+static int
+taskqgroup_find(struct taskqgroup *qgroup, void *uniq)
+{
+ struct grouptask *n;
+ int i, idx, mincnt;
+ int strict;
+
+ mtx_assert(&qgroup->tqg_lock, MA_OWNED);
+ if (qgroup->tqg_cnt == 0)
+ return (0);
+ idx = -1;
+ mincnt = INT_MAX;
+ /*
+ * Two passes; First scan for a queue with the least tasks that
+ * does not already service this uniq id. If that fails simply find
+ * the queue with the least total tasks;
+ */
+ for (strict = 1; mincnt == INT_MAX; strict = 0) {
+ for (i = 0; i < qgroup->tqg_cnt; i++) {
+ if (qgroup->tqg_queue[i].tgc_cnt > mincnt)
+ continue;
+ if (strict) {
+ LIST_FOREACH(n,
+ &qgroup->tqg_queue[i].tgc_tasks, gt_list)
+ if (n->gt_uniq == uniq)
+ break;
+ if (n != NULL)
+ continue;
+ }
+ mincnt = qgroup->tqg_queue[i].tgc_cnt;
+ idx = i;
+ }
+ }
+ if (idx == -1)
+ panic("taskqgroup_find: Failed to pick a qid.");
+
+ return (idx);
+}
+
+void
+taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask,
+ void *uniq, int irq, char *name)
+{
+ cpuset_t mask;
+ int qid;
+
+ gtask->gt_uniq = uniq;
+ gtask->gt_name = name;
+ gtask->gt_irq = irq;
+ mtx_lock(&qgroup->tqg_lock);
+ qid = taskqgroup_find(qgroup, uniq);
+ qgroup->tqg_queue[qid].tgc_cnt++;
+ LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
+ gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
+ if (irq != -1 && smp_started) {
+ CPU_ZERO(&mask);
+ CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask);
+ mtx_unlock(&qgroup->tqg_lock);
+ intr_setaffinity(irq, &mask);
+ } else
+ mtx_unlock(&qgroup->tqg_lock);
+}
+
+int
+taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask,
+ void *uniq, int cpu, int irq, char *name)
+{
+ cpuset_t mask;
+ int i, qid;
+
+ qid = -1;
+ gtask->gt_uniq = uniq;
+ gtask->gt_name = name;
+ gtask->gt_irq = irq;
+ mtx_lock(&qgroup->tqg_lock);
+ for (i = 0; i < qgroup->tqg_cnt; i++)
+ if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
+ qid = i;
+ break;
+ }
+ if (qid == -1) {
+ mtx_unlock(&qgroup->tqg_lock);
+ return (EINVAL);
+ }
+ qgroup->tqg_queue[qid].tgc_cnt++;
+ LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
+ gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
+ if (irq != -1 && smp_started) {
+ CPU_ZERO(&mask);
+ CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask);
+ mtx_unlock(&qgroup->tqg_lock);
+ intr_setaffinity(irq, &mask);
+ } else
+ mtx_unlock(&qgroup->tqg_lock);
+ return (0);
+}
+
+void
+taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask)
+{
+ int i;
+
+ mtx_lock(&qgroup->tqg_lock);
+ for (i = 0; i < qgroup->tqg_cnt; i++)
+ if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue)
+ break;
+ if (i == qgroup->tqg_cnt)
+ panic("taskqgroup_detach: task not in group\n");
+ qgroup->tqg_queue[i].tgc_cnt--;
+ LIST_REMOVE(gtask, gt_list);
+ mtx_unlock(&qgroup->tqg_lock);
+ gtask->gt_taskqueue = NULL;
+}
+
+static void
+taskqgroup_binder(void *ctx, int pending)
+{
+ struct taskq_bind_task *task = (struct taskq_bind_task *)ctx;
+ cpuset_t mask;
+ int error;
+
+ CPU_ZERO(&mask);
+ CPU_SET(task->bt_cpuid, &mask);
+ error = cpuset_setthread(curthread->td_tid, &mask);
+ if (error)
+ printf("taskqgroup_binder: setaffinity failed: %d\n",
+ error);
+ free(task, M_DEVBUF);
+}
+
+static void
+taskqgroup_bind(struct taskqgroup *qgroup)
+{
+ struct taskq_bind_task *task;
+ int i;
+
+ /*
+ * Bind taskqueue threads to specific CPUs, if they have been assigned
+ * one.
+ */
+ for (i = 0; i < qgroup->tqg_cnt; i++) {
+ task = malloc(sizeof (*task), M_DEVBUF, M_NOWAIT);
+ TASK_INIT(&task->bt_task, 0, taskqgroup_binder, task);
+ task->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu;
+ taskqueue_enqueue_fast(qgroup->tqg_queue[i].tgc_taskq,
+ &task->bt_task);
+ }
+}
+
+static int
+_taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
+{
+ LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL);
+ cpuset_t mask;
+ struct grouptask *gtask;
+ int i, old_cnt, qid;
+
+ mtx_assert(&qgroup->tqg_lock, MA_OWNED);
+
+ if (cnt < 1 || cnt * stride > mp_ncpus || !smp_started) {
+ printf("taskqgroup_adjust failed cnt: %d stride: %d mp_ncpus: %d smp_started: %d\n",
+ cnt, stride, mp_ncpus, smp_started);
+ return (EINVAL);
+ }
+ if (qgroup->tqg_adjusting) {
+ printf("taskqgroup_adjust failed: adjusting\n");
+ return (EBUSY);
+ }
+ qgroup->tqg_adjusting = 1;
+ old_cnt = qgroup->tqg_cnt;
+ mtx_unlock(&qgroup->tqg_lock);
+ /*
+ * Set up queue for tasks added before boot.
+ */
+ if (old_cnt == 0) {
+ LIST_SWAP(&gtask_head, &qgroup->tqg_queue[0].tgc_tasks,
+ grouptask, gt_list);
+ qgroup->tqg_queue[0].tgc_cnt = 0;
+ }
+
+ /*
+ * If new taskq threads have been added.
+ */
+ for (i = old_cnt; i < cnt; i++)
+ taskqgroup_cpu_create(qgroup, i);
+ mtx_lock(&qgroup->tqg_lock);
+ qgroup->tqg_cnt = cnt;
+ qgroup->tqg_stride = stride;
+
+ /*
+ * Adjust drivers to use new taskqs.
+ */
+ for (i = 0; i < old_cnt; i++) {
+ while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) {
+ LIST_REMOVE(gtask, gt_list);
+ qgroup->tqg_queue[i].tgc_cnt--;
+ LIST_INSERT_HEAD(&gtask_head, gtask, gt_list);
+ }
+ }
+
+ while ((gtask = LIST_FIRST(&gtask_head))) {
+ LIST_REMOVE(gtask, gt_list);
+ qid = taskqgroup_find(qgroup, gtask->gt_uniq);
+ qgroup->tqg_queue[qid].tgc_cnt++;
+ LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask,
+ gt_list);
+ gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
+ }
+ /*
+ * Set new CPU and IRQ affinity
+ */
+ for (i = 0; i < cnt; i++) {
+ qgroup->tqg_queue[i].tgc_cpu = i * qgroup->tqg_stride;
+ CPU_ZERO(&mask);
+ CPU_SET(qgroup->tqg_queue[i].tgc_cpu, &mask);
+ LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list) {
+ if (gtask->gt_irq == -1)
+ continue;
+ intr_setaffinity(gtask->gt_irq, &mask);
+ }
+ }
+ mtx_unlock(&qgroup->tqg_lock);
+
+ /*
+ * If taskq thread count has been reduced.
+ */
+ for (i = cnt; i < old_cnt; i++)
+ taskqgroup_cpu_remove(qgroup, i);
+
+ mtx_lock(&qgroup->tqg_lock);
+ qgroup->tqg_adjusting = 0;
+
+ taskqgroup_bind(qgroup);
+
+ return (0);
+}
+
+int
+taskqgroup_adjust(struct taskqgroup *qgroup, int cpu, int stride)
+{
+ int error;
+
+ mtx_lock(&qgroup->tqg_lock);
+ error = _taskqgroup_adjust(qgroup, cpu, stride);
+ mtx_unlock(&qgroup->tqg_lock);
+
+ return (error);
+}
+
+struct taskqgroup *
+taskqgroup_create(char *name)
+{
+ struct taskqgroup *qgroup;
+
+ qgroup = malloc(sizeof(*qgroup), M_TASKQUEUE, M_WAITOK | M_ZERO);
+ mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF);
+ qgroup->tqg_name = name;
+ LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks);
+
+ return (qgroup);
+}
+
+void
+taskqgroup_destroy(struct taskqgroup *qgroup)
+{
+
+}
Index: sys/sys/_task.h
===================================================================
--- sys/sys/_task.h
+++ sys/sys/_task.h
@@ -51,4 +51,13 @@
void *ta_context; /* (c) argument for handler */
};
+struct grouptask {
+ struct task gt_task;
+ void *gt_taskqueue;
+ LIST_ENTRY(grouptask) gt_list;
+ void *gt_uniq;
+ char *gt_name;
+ int gt_irq;
+};
+
#endif /* !_SYS__TASK_H_ */
Index: sys/sys/taskqueue.h
===================================================================
--- sys/sys/taskqueue.h
+++ sys/sys/taskqueue.h
@@ -39,6 +39,7 @@
#include <sys/_cpuset.h>
struct taskqueue;
+struct taskqgroup;
struct thread;
struct timeout_task {
@@ -143,7 +144,7 @@
init; \
} \
\
-SYSINIT(taskqueue_##name, SI_SUB_CONFIGURE, SI_ORDER_SECOND, \
+SYSINIT(taskqueue_##name, SI_SUB_INIT_IF, SI_ORDER_SECOND, \
taskqueue_define_##name, NULL); \
\
struct __hack
@@ -168,7 +169,7 @@
init; \
} \
\
-SYSINIT(taskqueue_##name, SI_SUB_CONFIGURE, SI_ORDER_SECOND, \
+SYSINIT(taskqueue_##name, SI_SUB_INIT_IF, SI_ORDER_SECOND, \
taskqueue_define_##name, NULL); \
\
struct __hack
@@ -203,4 +204,52 @@
taskqueue_enqueue_fn enqueue,
void *context);
+/*
+ * Taskqueue groups. Manages dynamic thread groups and irq binding for
+ * device and other tasks.
+ */
+void taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask,
+ void *uniq, int irq, char *name);
+int taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask,
+ void *uniq, int cpu, int irq, char *name);
+void taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask);
+struct taskqgroup *taskqgroup_create(char *name);
+void taskqgroup_destroy(struct taskqgroup *qgroup);
+int taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride);
+
+#define GROUPTASK_INIT(gtask, priority, func, context) \
+ TASK_INIT(&(gtask)->gt_task, priority, func, context)
+
+#define GROUPTASK_ENQUEUE(gtask) \
+ taskqueue_enqueue((gtask)->gt_taskqueue, &(gtask)->gt_task)
+
+#define TASKQGROUP_DECLARE(name) \
+extern struct taskqgroup *qgroup_##name
+
+#define TASKQGROUP_DEFINE(name, cnt, stride) \
+ \
+struct taskqgroup *qgroup_##name; \
+ \
+static void \
+taskqgroup_define_##name(void *arg) \
+{ \
+ qgroup_##name = taskqgroup_create(#name); \
+} \
+ \
+SYSINIT(taskqgroup_##name, SI_SUB_CONFIGURE, SI_ORDER_SECOND, \
+ taskqgroup_define_##name, NULL); \
+ \
+static void \
+taskqgroup_adjust_##name(void *arg) \
+{ \
+ taskqgroup_adjust(qgroup_##name, (cnt), (stride)); \
+} \
+ \
+SYSINIT(taskqgroup_adj_##name, SI_SUB_SMP, SI_ORDER_ANY, \
+ taskqgroup_adjust_##name, NULL); \
+ \
+struct __hack
+
+TASKQGROUP_DECLARE(net);
+
#endif /* !_SYS_TASKQUEUE_H_ */

File Metadata

Mime Type
text/plain
Expires
Mon, Nov 18, 12:47 AM (21 h, 6 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
14687724
Default Alt Text
D5209.diff (13 KB)

Event Timeline