扒開 Linux 中斷的底褲之 workqueue
workqueue 是除了 softirq 和 tasklet 以外最常用的下半部機(jī)制之一。workqueue 的本質(zhì)是把 work 交給一個內(nèi)核線程,在進(jìn)程上下文調(diào)度的時候執(zhí)行。因為這個特點,所以 workqueue 允許重新調(diào)度和睡眠,這種異步執(zhí)行的進(jìn)程上下文,能解決因為 softirq 和 tasklet 執(zhí)行時間長而導(dǎo)致的系統(tǒng)實時性下降等問題。
當(dāng)驅(qū)動程序在進(jìn)程上下文中有異步執(zhí)行的工作任務(wù)時,可以用 work 來描述工作任務(wù)。把 work 添加到一個鏈表 worklist 中,然后由一個內(nèi)核線程 worker 遍歷鏈表,串行地執(zhí)行掛入 worklist 中的所有 work。如果 worklist 中沒有 work,那么內(nèi)核線程 worker 就會變成 IDLE 狀態(tài);如果有 work,則執(zhí)行 work 中的回調(diào)函數(shù)。
workqueue 相關(guān)的數(shù)據(jù)結(jié)構(gòu)
關(guān)于 workqueue 中幾個概念都是 work 相關(guān)的數(shù)據(jù)結(jié)構(gòu)非常容易混淆,大概可以這樣來理解:
work_struct :
工作。初始化一個 work 并添加到工作隊列后,將會將其傳遞到合適的內(nèi)核線程來進(jìn)行處理,它是用于調(diào)度的最小單位。
- struct work_struct {
- atomic_long_t data;
- struct list_head entry;
- work_func_t func;
- #ifdef CONFIG_LOCKDEP
- struct lockdep_map lockdep_map;
- #endif
- };
- data:低比特存放狀態(tài)位,高比特存放 worker_pool 的ID或者 pool_workqueue 的指針
- entry:用于添加到其他隊列上
- func:工作任務(wù)的處理函數(shù),在內(nèi)核線程中回調(diào)
workqueue_struct :
工作的集合。workqueue 和 work 是一對多的關(guān)系。內(nèi)核中工作隊列分為兩種:
- bound:綁定處理器的工作隊列,每個 worker 創(chuàng)建的內(nèi)核線程綁定到特定的 CPU 上運(yùn)行。
- unbound:不綁定處理器的工作隊列,創(chuàng)建的時候需要指定 WQ_UNBOUND 標(biāo)志,內(nèi)核線程可以在處理器間遷移。
- struct workqueue_struct {
- struct list_head pwqs; /* WR: all pwqs of this wq */
- struct list_head list; /* PR: list of all workqueues */
- struct list_head maydays; /* MD: pwqs requesting rescue */
- struct worker *rescuer; /* I: rescue worker */
- struct pool_workqueue *dfl_pwq; /* PW: only for unbound wqs */
- char name[WQ_NAME_LEN]; /* I: workqueue name */
- /* hot fields used during command issue, aligned to cacheline */
- unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags */
- struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */
- struct pool_workqueue __rcu *numa_pwq_tbl[]; /* PWR: unbound pwqs indexed by node */ //Per-Node創(chuàng)建pool_workqueue
- ...
- };
- pwqs:所有的 pool_workqueue 都添加到本鏈表中
- list:用于將工作隊列添加到全局鏈表 workqueues 中
- maydays:rescue狀態(tài)下的 pool_workqueue 添加到本鏈表中
- rescuer:rescuer 內(nèi)核線程,用于處理內(nèi)存緊張時創(chuàng)建工作線程失敗的情況
- cpu_pwqs:Per-CPU 創(chuàng)建 pool_workqueue
- numa_pwq_tbl[]:Per-Node 創(chuàng)建 pool_workqueue
pool_workqueue:
中間人 / 中介,負(fù)責(zé)建立起 workqueue 和 worker_pool 之間的關(guān)系。workqueue 和 pool_workqueue 是一對多的關(guān)系。
- struct pool_workqueue {
- struct worker_pool *pool; /* I: the associated pool */
- struct workqueue_struct *wq; /* I: the owning workqueue */
- int nr_active; /* L: nr of active works */
- int max_active; /* L: max active works */
- struct list_head delayed_works; /* L: delayed works */
- struct list_head pwqs_node; /* WR: node on wq->pwqs */
- struct list_head mayday_node; /* MD: node on wq->maydays */ //用于添加到workqueue鏈表中
- ...
- } __aligned(1 << WORK_STRUCT_FLAG_BITS);
- pool:指向 worker_pool
- wq:指向所屬的 workqueue
- nr_active:活躍的 work 數(shù)量
- max_active:活躍的最大 work 數(shù)量
- delayed_works:延遲執(zhí)行的 work 掛入本鏈表
- pwqs_node:用于添加到 workqueue 鏈表中
- mayday_node:用于添加到 workqueue 鏈表中
worker_pool:
工人的集合。pool_workqueue 和 worker_pool 是一對一的關(guān)系,worker_pool 和 worker 是一對多的關(guān)系。
- bound 類型的工作隊列:worker_pool 是 Per-CPU 創(chuàng)建,每個 CPU 都有兩個 worker_pool,對應(yīng)不同的優(yōu)先級,nice 值分別為 0 和 -20。
- unbound 類型的工作隊列:worker_pool 創(chuàng)建后會添加到 unbound_pool_hash 哈希表中。
- struct worker_pool {
- spinlock_t lock; /* the pool lock */
- int cpu; /* I: the associated cpu */
- int node; /* I: the associated node ID */
- int id; /* I: pool ID */
- unsigned int flags; /* X: flags */
- unsigned long watchdog_ts; /* L: watchdog timestamp */
- struct list_head worklist; /* L: list of pending works */
- int nr_workers; /* L: total number of workers */
- /* nr_idle includes the ones off idle_list for rebinding */
- int nr_idle; /* L: currently idle ones */
- struct list_head idle_list; /* X: list of idle workers */
- struct timer_list idle_timer; /* L: worker idle timeout */
- struct timer_list mayday_timer; /* L: SOS timer for workers */
- /* a workers is either on busy_hash or idle_list, or the manager */
- DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER); /* L: hash of busy workers */
- /* see manage_workers() for details on the two manager mutexes */
- struct worker *manager; /* L: purely informational */
- struct mutex attach_mutex; /* attach/detach exclusion */
- struct list_head workers; /* A: attached workers */
- struct completion *detach_completion; /* all workers detached */
- struct ida worker_ida; /* worker IDs for task name */
- struct workqueue_attrs *attrs; /* I: worker attributes */
- struct hlist_node hash_node; /* PL: unbound_pool_hash node */
- ...
- } ____cacheline_aligned_in_smp;
- cpu:綁定到 CPU 的 workqueue,代表 CPU ID
- node:非綁定類型的 workqueue,代表內(nèi)存 Node ID
- worklist:pending 狀態(tài)的 work 添加到本鏈表
- nr_workers:worker 的數(shù)量
- idle_list:處于 IDLE 狀態(tài)的 worker 添加到本鏈表
- busy_hash:工作狀態(tài)的 worker 添加到本哈希表中
- workers:worker_pool 管理的 worker 添加到本鏈表中
- hash_node:用于添加到 unbound_pool_hash 中
worker :
工人。在代碼中 worker 對應(yīng)一個 work_thread() 內(nèi)核線程。
- struct worker {
- /* on idle list while idle, on busy hash table while busy */
- union {
- struct list_head entry; /* L: while idle */
- struct hlist_node hentry; /* L: while busy */
- };
- struct work_struct *current_work; /* L: work being processed */
- work_func_t current_func; /* L: current_work's fn */
- struct pool_workqueue *current_pwq; /* L: current_work's pwq */
- struct list_head scheduled; /* L: scheduled works */
- /* 64 bytes boundary on 64bit, 32 on 32bit */
- struct task_struct *task; /* I: worker task */
- struct worker_pool *pool; /* I: the associated pool */
- /* L: for rescuers */
- struct list_head node; /* A: anchored at pool->workers */ //添加到worker_pool->workers鏈表中
- /* A: runs through worker->node */
- ...
- };
- entry:用于添加到 worker_pool 的空閑鏈表中
- hentry:用于添加到 worker_pool 的忙碌列表中
- current_work:當(dāng)前正在處理的 work
- current_func:當(dāng)前正在執(zhí)行的 work 回調(diào)函數(shù)
- current_pwq:指向當(dāng)前 work 所屬的 pool_workqueue
- scheduled:所有被調(diào)度執(zhí)行的 work 都將添加到該鏈表中
- task:指向內(nèi)核線程
- pool:該 worker 所屬的 worker_pool
- node:添加到 worker_pool->workers 鏈表中
可以用下圖來總結(jié):
workqueue 的初始化
內(nèi)核在啟動的時候會對 workqueue 做初始化,workqueue 的初始化包含兩個階段,分別是 workqueue_init_early 和 workqueue_init。
workqueue_init_early
- 分配 worker_pool,并且對該結(jié)構(gòu)中的字段進(jìn)行初始化操作
- 分配 workqueue_struct,并且對該結(jié)構(gòu)中的字段進(jìn)行初始化操作
- alloc_and_link_pwqs:分配 pool_workqueue,將 workqueue_struct 和 worker_pool 關(guān)聯(lián)起來
workqueue_init
這里主要完成的工作是給之前創(chuàng)建好的 worker_pool,添加一個初始的 worker,然后利用函數(shù) create_worker,創(chuàng)建名字為 kworker/XX:YY 或者 kworker/uXX:YY 的內(nèi)核線程。其中 XX 表示 worker_pool 的編號,YY 表示 worker 的編號,u 表示unbound。
- 分配 worker,并且對該結(jié)構(gòu)中的字段進(jìn)行初始化操作
- 為 worker 創(chuàng)建內(nèi)核線程 worker_thread
- 將 worker 添加到 worker_pool 中
- worker 進(jìn)入 IDLE 狀態(tài)
經(jīng)過上面兩個階段的初始化,workqueue 子系統(tǒng)基本就已經(jīng)將數(shù)據(jù)結(jié)構(gòu)的關(guān)聯(lián)建立好了,當(dāng)有 work 來進(jìn)行調(diào)度的時候,就可以進(jìn)行處理了。
使用 workqueue
內(nèi)核推薦驅(qū)動開發(fā)者使用默認(rèn)的 workqueue,而不是新建 workqueue。要使用系統(tǒng)默認(rèn)的 workqueue,首先需要初始化 work,內(nèi)核提供了相應(yīng)的宏 INIT_WORK。
初始化 work
- #define INIT_WORK(_work, _func) \
- __INIT_WORK((_work), (_func), 0)
- #define __INIT_WORK(_work, _func, _onstack) \
- do { \
- __init_work((_work), _onstack); \
- (_work)->data = (atomic_long_t) WORK_DATA_INIT(); \
- INIT_LIST_HEAD(&(_work)->entry); \
- (_work)->func = (_func); \
- } while (0)
初始化 work 后,就可以調(diào)用 shedule_work 函數(shù)把 work 掛入系統(tǒng)默認(rèn)的 workqueue 中。
work 調(diào)度
- 將 work 添加到系統(tǒng)的 system_wq 工作隊列中。
- 判斷 workqueue 的類型,如果是 bound 類型,根據(jù) CPU 來獲取 pool_workqueue。如果是 unbound 類型,通過 node 號來獲取 pool_workqueue。
- 判斷 pool_workqueue 活躍的 work 數(shù)量,少于最大限值則將 work 加入到 pool->worklist 中,否則加入到 pwq->delayed_works 鏈表中。
- 如果 __need_more_worker 判斷沒有 worker 在執(zhí)行,則通過 wake_up_worker 喚醒 worker 內(nèi)核線程。
worker_thread
worker 內(nèi)核線程的執(zhí)行函數(shù)是 worker_thread。
- 設(shè)置標(biāo)志位 PF_WQ_WORKER,調(diào)度器在進(jìn)行調(diào)度處理時會對 task 進(jìn)行判斷,針對 workerqueue worker 有特殊的處理。
- worker 被喚醒的時候,跳轉(zhuǎn)到 woke_up 執(zhí)行。
- woke_up 中,如果此時 worker 是需要銷毀的,那就進(jìn)行清理工作并返回。否則,離開 IDLE 狀態(tài),并進(jìn)入 recheck 模塊執(zhí)行。
- recheck 中,判斷是否需要更多的 worker 來處理,如果沒有任務(wù)處理,跳轉(zhuǎn)到 sleep 地方進(jìn)行睡眠。如果有任務(wù)需要處理時,遍歷工作鏈表,對鏈表中的每個節(jié)點調(diào)用 process_one_work 來執(zhí)行 work 的回調(diào)函數(shù),即 INIT_WORK 里的回調(diào)函數(shù)。
- sleep 中,沒有任務(wù)處理時,worker 進(jìn)入空閑狀態(tài),并將當(dāng)前的內(nèi)核線程設(shè)置成睡眠狀態(tài),讓出 CPU。
總結(jié)