/* * Structure fields follow one of the following exclusion rules. * * I: Modifiable by initialization/destruction paths and read-only for * everyone else. * * P: Preemption protected. Disabling preemption is enough and should * only be modified and accessed from the local cpu. * * L: pool->lock protected. Access with pool->lock held. * * X: During normal operation, modification requires pool->lock and should * be done only from local cpu. Either disabling preemption on local * cpu or grabbing pool->lock is enough for read access. If * POOL_DISASSOCIATED is set, it's identical to L. * * A: wq_pool_attach_mutex protected. * * PL: wq_pool_mutex protected. * * PR: wq_pool_mutex protected for writes. RCU protected for reads. * * PW: wq_pool_mutex and wq->mutex protected for writes. Either for reads. * * PWR: wq_pool_mutex and wq->mutex protected for writes. Either or * RCU for reads. * * WQ: wq->mutex protected. * * WR: wq->mutex protected for writes. RCU protected for reads. * * MD: wq_mayday_lock protected. */
structworker_pool { raw_spinlock_t lock; /* the pool lock */ int cpu; /* I: the associated cpu */ int node; /* I: the associated node ID */ int id; /* I: pool ID */ unsignedint flags; /* X: flags */
/* * The counter is incremented in a process context on the associated CPU * w/ preemption disabled, and decremented or reset in the same context * but w/ pool->lock held. The readers grab pool->lock and are * guaranteed to see if the counter reached zero. */ int nr_running;
structlist_headworklist;/* L: list of pending works */
int nr_workers; /* L: total number of workers */ int nr_idle; /* L: currently idle workers */
structlist_headidle_list;/* L: list of idle workers */ structtimer_listidle_timer;/* L: worker idle timeout */ structtimer_listmayday_timer;/* L: SOS timer for workers */
/* a workers is either on busy_hash or idle_list, or the manager */ DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER); /* L: hash of busy workers */
/* * The per-pool workqueue. While queued, the lower WORK_STRUCT_FLAG_BITS * of work_struct->data are used for flags and the remaining high bits * point to the pwq; thus, pwqs need to be aligned at two's power of the * number of flag bits. */ structpool_workqueue { structworker_pool *pool;/* I: the associated pool */ structworkqueue_struct *wq;/* I: the owning workqueue */ int work_color; /* L: current color */ int flush_color; /* L: flushing color */ int refcnt; /* L: reference count */ int nr_in_flight[WORK_NR_COLORS]; /* L: nr of in_flight works */
/* * nr_active management and WORK_STRUCT_INACTIVE: * * When pwq->nr_active >= max_active, new work item is queued to * pwq->inactive_works instead of pool->worklist and marked with * WORK_STRUCT_INACTIVE. * * All work items marked with WORK_STRUCT_INACTIVE do not participate * in pwq->nr_active and all work items in pwq->inactive_works are * marked with WORK_STRUCT_INACTIVE. But not all WORK_STRUCT_INACTIVE * work items are in pwq->inactive_works. Some of them are ready to * run in pool->worklist or worker->scheduled. Those work itmes are * only struct wq_barrier which is used for flush_work() and should * not participate in pwq->nr_active. For non-barrier work item, it * is marked with WORK_STRUCT_INACTIVE iff it is in pwq->inactive_works. */ int nr_active; /* L: nr of active works */ int max_active; /* L: max active works */ structlist_headinactive_works;/* L: inactive works */ structlist_headpwqs_node;/* WR: node on wq->pwqs */ structlist_headmayday_node;/* MD: node on wq->maydays */
/* * Release of unbound pwq is punted to system_wq. See put_pwq() * and pwq_unbound_release_workfn() for details. pool_workqueue * itself is also RCU protected so that the first pwq can be * determined without grabbing wq->mutex. */ structwork_structunbound_release_work; structrcu_headrcu; } __aligned(1 << WORK_STRUCT_FLAG_BITS);
/* * The externally visible workqueue. It relays the issued work items to * the appropriate worker_pool through its pool_workqueues. */ structworkqueue_struct { structlist_headpwqs;/* WR: all pwqs of this wq */ structlist_headlist;/* PR: list of all workqueues */
structmutexmutex;/* protects this wq */ int work_color; /* WQ: current work color */ int flush_color; /* WQ: current flush color */ atomic_t nr_pwqs_to_flush; /* flush in progress */ structwq_flusher *first_flusher;/* WQ: first flusher */ structlist_headflusher_queue;/* WQ: flush waiters */ structlist_headflusher_overflow;/* WQ: flush overflow list */
/* * Destruction of workqueue_struct is RCU protected to allow walking * the workqueues list without grabbing wq_pool_mutex. * This is used to dump all workqueues from sysrq. */ structrcu_headrcu;
/* hot fields used during command issue, aligned to cacheline */ unsignedint flags ____cacheline_aligned; /* WQ: WQ_* flags */ structpool_workqueue __percpu *cpu_pwqs;/* I: per-cpu pwqs */ structpool_workqueue __rcu *numa_pwq_tbl[];/* PWR: unbound pwqs indexed by node */ };
// 这里给定义了一个长度为2的worker_pool数组,即percpu worker pool // 普通优先级的worker pool为worker_pool[0] // 高优先级的worker pool为worker_pool[1] /* the per-cpu worker pools */ staticDEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);
// 在workqueue_init_early()中会对percpu worker pool进行初始化工作 /** * workqueue_init_early - early init for workqueue subsystem * * This is the first half of two-staged workqueue subsystem initialization * and invoked as soon as the bare basics - memory allocation, cpumasks and * idr are up. It sets up all the data structures and system workqueues * and allows early boot code to create workqueues and queue/cancel work * items. Actual work item execution starts only after kthreads can be * created and scheduled right before early initcalls. */ void __init workqueue_init_early(void) { int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL }; int i, cpu;
/* * An ordered wq should have only one pwq as ordering is * guaranteed by max_active which is enforced by pwqs. * Turn off NUMA so that dfl_pwq is used for all nodes. */ BUG_ON(!(attrs = alloc_workqueue_attrs())); attrs->nice = std_nice[i]; attrs->no_numa = true; ordered_wq_attrs[i] = attrs; }
/** * workqueue_init - bring workqueue subsystem fully online * * This is the latter half of two-staged workqueue subsystem initialization * and invoked as soon as kthreads can be created and scheduled. * Workqueues have been created and work items queued on them, but there * are no kworkers executing the work items yet. Populate the worker pools * with the initial workers and enable future kworker creations. */ void __init workqueue_init(void) { structworkqueue_struct *wq; structworker_pool *pool; int cpu, bkt;
/* * It'd be simpler to initialize NUMA in workqueue_init_early() but * CPU to node mapping may not be available that early on some * archs such as power and arm64. As per-cpu pools created * previously could be missing node hint and unbound pools NUMA * affinity, fix them up. * * Also, while iterating workqueues, create rescuers if requested. */ wq_numa_init();
/** * create_worker - create a new workqueue worker * @pool: pool the new worker will belong to * * Create and start a new worker which is attached to @pool. * * CONTEXT: * Might sleep. Does GFP_KERNEL allocations. * * Return: * Pointer to the newly created worker. */ static struct worker *create_worker(struct worker_pool *pool) { structworker *worker; int id; char id_buf[16];
/* ID is needed to determine kthread name */ // 这里用到了ida的分配机制 id = ida_alloc(&pool->worker_ida, GFP_KERNEL); if (id < 0) returnNULL;
/* * Unbound && max_active == 1 used to imply ordered, which is no * longer the case on NUMA machines due to per-node pools. While * alloc_ordered_workqueue() is the right way to create an ordered * workqueue, keep the previous behavior to avoid subtle breakages * on NUMA. */ if ((flags & WQ_UNBOUND) && max_active == 1) flags |= __WQ_ORDERED;
/* see the comment above the definition of WQ_POWER_EFFICIENT */ // 这里WQ_POWER_EFFICIENT标志和workqueue.power_efficient这个内核参数都可以让 // workqueue编程unbind workqueue从而降低能耗 if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient) flags |= WQ_UNBOUND;
/* allocate wq and format name */ // 这里涉及到在NUMA中unbound workqueue的work thread在不同node之间迁移的问题 // 在NUMA中,CPU访问不同node中的内存,访问速度是相差很大的 // 实际上unbound workqueue实际上是创建了per node的pool_workqueue if (flags & WQ_UNBOUND) tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);
wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL); if (!wq) returnNULL;
// unbound_attrs属性是给unbound workqueue来决定work的的分配的 if (flags & WQ_UNBOUND) { wq->unbound_attrs = alloc_workqueue_attrs(); if (!wq->unbound_attrs) goto err_free_wq; }
if (alloc_and_link_pwqs(wq) < 0) goto err_unreg_lockdep;
if (wq_online && init_rescuer(wq) < 0) goto err_destroy;
if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq)) goto err_destroy;
/* * wq_pool_mutex protects global freeze state and workqueues list. * Grab it, adjust max_active and add the new @wq to workqueues * list. */ mutex_lock(&wq_pool_mutex);
cpus_read_lock(); if (wq->flags & __WQ_ORDERED) { ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]); /* there should only be single pwq for ordering guarantee */ WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node || wq->pwqs.prev != &wq->dfl_pwq->pwqs_node), "ordering guarantee broken for workqueue %s\n", wq->name); } else { ret = apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]); } cpus_read_unlock();
return ret; }
// 每个pool_workqueue都有一个对应的worker thread pool // 对于percpu workqueue这里静态定义了如下的cpu_worker_pools /* the per-cpu worker pools */ staticDEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);
/** * flush_work - wait for a work to finish executing the last queueing instance * @work: the work to flush * * Wait until @work has finished execution. @work is guaranteed to be idle * on return if it hasn't been requeued since flush started. * * Return: * %true if flush_work() waited for the work to finish execution, * %false if it was already idle. */ boolflush_work(struct work_struct *work) { return __flush_work(work, false); } EXPORT_SYMBOL_GPL(flush_work);
rcu_read_lock(); pool = get_work_pool(work); if (!pool) { rcu_read_unlock(); returnfalse; }
raw_spin_lock_irq(&pool->lock); /* see the comment in try_to_grab_pending() with the same code */ pwq = get_work_pwq(work); if (pwq) { if (unlikely(pwq->pool != pool)) goto already_gone; } else { worker = find_worker_executing_work(pool, work); if (!worker) goto already_gone; pwq = worker->current_pwq; }
/* * Force a lock recursion deadlock when using flush_work() inside a * single-threaded or rescuer equipped workqueue. * * For single threaded workqueues the deadlock happens when the work * is after the work issuing the flush_work(). For rescuer equipped * workqueues the deadlock happens when the rescuer stalls, blocking * forward progress. */ if (!from_cancel && (pwq->wq->saved_max_active == 1 || pwq->wq->rescuer)) { lock_map_acquire(&pwq->wq->lockdep_map); lock_map_release(&pwq->wq->lockdep_map); } rcu_read_unlock(); returntrue; already_gone: raw_spin_unlock_irq(&pool->lock); rcu_read_unlock(); returnfalse; }