Linux驅(qū)動技術(shù)(八) _并發(fā)控制技術(shù)
為了實(shí)現(xiàn)對臨界資源的有效管理,應(yīng)用層的程序有原子變量,條件變量,信號量來控制并發(fā),同樣的問題也存在與驅(qū)動開發(fā)中,比如一個驅(qū)動同時被多個應(yīng)用層程序調(diào)用,此時驅(qū)動中的全局變量會同時屬于多個應(yīng)用層進(jìn)程的進(jìn)程空間,這種情況下也要使用一些技術(shù)來實(shí)現(xiàn)對并發(fā)的控制。本文將討論內(nèi)核中下述并發(fā)控制技術(shù)的技術(shù)特點(diǎn)和應(yīng)用場景。
1.中斷屏蔽
2.原子操作
a.原子變量操作
b.原子位操作
3.自旋鎖
a.傳統(tǒng)自旋鎖
b.讀寫自旋鎖
c.順序鎖
d.RCU
4.信號量
a.傳統(tǒng)信號量
b.讀寫信號量
c.完成量
5.互斥量
中斷屏蔽
顧名思義,就是屏蔽所有的中斷。在嵌入式系統(tǒng),中斷屏蔽可以有三級,
1. 硬件接口的屏蔽,
2. 硬件GIC的屏蔽,
3. CPU(內(nèi)核)的屏蔽。
如果在接口處屏蔽了,那么中斷來了就丟了,根本找不到。如果在GIC處屏蔽了,那么在屏蔽期間如果來了irq_1,irq_2,irq_3個中斷,因?yàn)橹挥幸粋€pending標(biāo)志位,所以***irq_3來的時候會將pending置位,之后解除屏蔽了,CPU發(fā)現(xiàn)pending有置位,還是會處理,但是1,2就肯定丟了。在ARM處的屏蔽,即內(nèi)核中的屏蔽,看怎么設(shè)置了,如果就是local_irq_disable,那么丟了就是丟了,和在接口處屏蔽一樣,如果是local_irq_save就和第二種一樣,追到***一個中斷,內(nèi)核也有相應(yīng)的機(jī)制進(jìn)行中斷計數(shù),知道這期間來了多少個中斷,但是實(shí)際操作中,大部分情況我們都不會追著執(zhí)行錯過的中斷,除非這個中斷非常重要。
我們這里討論的,就是在內(nèi)核中進(jìn)行中斷屏蔽。由于內(nèi)核中很多重要的操作都要依賴于中斷,所以屏蔽所有的中斷是十分危險的,里面執(zhí)行的代碼要盡可能的快,而且,由于內(nèi)核的進(jìn)程調(diào)度也是由中斷驅(qū)動的,所以中斷屏蔽中不能有可能引發(fā)休眠的代碼,否則無法被喚醒。注意,中斷屏蔽只是屏蔽了本CPU的中斷,所以并不能解決SMP引發(fā)的競泰問題,通常,中斷屏蔽要和自旋鎖聯(lián)合使用,用于防止訪問自旋鎖保護(hù)的臨界區(qū)時被中斷打斷
普通的中斷屏蔽
- local_irq_disable(); //屏蔽中斷//或
- local_irq_save(flags); //屏蔽中斷并保存目前CPU中的中斷位信息/* 臨界區(qū) */
- local_irq_enable(); //解除屏蔽//或
- local_irq_restore(flags); //解除屏蔽并恢復(fù)中斷位信息
底半部的中斷屏蔽
- local_bh_disable(); //屏蔽中斷
- /* 臨界區(qū) */
- local_bh_enable();
原子操作
原子操作即不能被打斷的操作,和應(yīng)用層的概念一樣,內(nèi)核中的原子操作模板如下:
整型原子變量
- //asm/atomic.h
- //創(chuàng)建并初始化原子變量
- atomic_t tv = ATOMIC_INIT(初值);//讀原子變量
- int atomic_read(atomic_t *v);//寫原子變量
- void atomic_set(atomic_t *v, int i);
- /**
- *atomic_dec_and_test - 嘗試將原子變量-1
- *v:如果-1之后原子變量變?yōu)?,返回非0, 否則返回0
- */
- int atomic_dec_and_test(volatile atomic_t *v);
- int atomic_inc_and_test(volatile atomic_t *v);
- int atomic_sub_and_test(int i, volatile atomic_t *v);//操作并返回
- int atomic_add_return(int i, atomic *v);
- int atomic_sub_return(int i, atomic *v);
- int atomic_inc_return(atomic *v);
- int atomic_dev_return(atomic *v);
模板
- static atomic_t tv;
- static int demo_open(
- struct inode *inode, struct file *filp){
- if(!atomic_dec_and_test(&tv)){
- atomic_inc(&tv);
- return -EBUSY;
- } /* 操作代碼 */
- return 0;
- }
- static int demo_release(struct inode *inode, struct file *filp){
- atomic_inc(&tv);
- return 0;
- }
- static int __init demo_init(void){ // init atomic_t
- atomic_set(&tv, 1);
- }
位原子操作
位原子操作即原子的位操作,內(nèi)核中大量使用"位"來記錄信息,比如位圖,這些操作都必須是原子性的,內(nèi)核API如下:
- //設(shè)置位
- void set_bit(nr,void *addr);//清除位
- void clear_bit(nr,void *addr);//改變位
- void change_bit(nr,void *addr);//測試位
- test_bit(nr, void *addr);//測試并操作位
- int test_and_set_bit(nr, void *addr);
- int test_and_clear_bit(nr,void *addr);
- int test_and_change_bit(nr,void *addr);
自旋鎖
意即"在原地打轉(zhuǎn)",當(dāng)加鎖不成功時,自旋,自旋鎖會不斷的占用CPU進(jìn)行變量的測試,由于屬于原子操作,所以該CPU的占用會升為100%,所以,使用自旋鎖時,臨界區(qū)的代碼需要很短,否則會影響系統(tǒng)性能,此外,作為鎖機(jī)制的一種,使用自旋鎖同樣需要注意死鎖的出現(xiàn),自旋鎖鎖定期間不能調(diào)用可能引起進(jìn)程調(diào)度的函數(shù),如果進(jìn)程獲得自旋鎖之后再阻塞,eg,copy_from_user(),copy_to_user(),kmalloc(),msleep()等,一旦阻塞發(fā)生就可能導(dǎo)致內(nèi)核 崩潰。
自旋鎖可以用來解決SMP競態(tài)問題。不同類型的自旋鎖有自己的處理機(jī)制,適用于不同的情況。包括傳統(tǒng)自旋鎖,讀寫自旋鎖,RCU機(jī)制,順序鎖等,自旋鎖是信號量,互斥體的的底層實(shí)現(xiàn)工具。
比較\類型 | 傳統(tǒng)自旋鎖 | 讀寫自旋鎖 | 順序鎖 | RCU機(jī)制 |
---|---|---|---|---|
應(yīng)用場合 | 需要上鎖者獨(dú)占的資源 | 需要寫者獨(dú)占的資源 | 很少同時讀寫的資源 | 讀多寫少的資源 |
讀+讀 并發(fā) | × | √ | √ | √ |
讀+寫 并發(fā) | × | × | √ | √ |
寫+寫 并發(fā) | × | × | × | √ |
和其他鎖機(jī)制的一樣,使用自旋鎖保護(hù)數(shù)據(jù)分為搶鎖-操作-解鎖,下面就是一個典型的使用鎖的流程,通過自旋鎖實(shí)現(xiàn)一個文件只被一個進(jìn)程打開。
- int cnt=0;
- lock_t lock;static int my_open(){
- lock(&lock);
- if(cnt){
- unlock(&lock)
- }
- cnt++;
- unlock(&lock);
- }
- static int release(){
- lock(&lock);
- cnt--;
- unlock(&lock);
- }
傳統(tǒng)自旋鎖
是一種比較粗暴的自旋鎖,使用這種鎖的時候,被鎖定的臨界區(qū)域不允許其他CPU訪問,需要注意的是,盡管獲得鎖之后執(zhí)行的臨界區(qū)操作不會被其他CPU和本CPU內(nèi)其他搶占進(jìn)程的打擾,但是仍然會被中斷和底半部的影響,所以通常我們會使用下述API中的衍生版本,比如上文中提到的將自旋鎖+中斷屏蔽來防止使用自旋鎖訪問臨界資源的時候被中斷打斷,對應(yīng)的宏函數(shù)就是spin_lock_irq和spin_lock_irqsave。
- //定義并初始化自旋鎖spinlock_t spinlock
- void spin_lock_init(spinlock_t *);//加鎖
- //spin_lock - 加鎖函數(shù)(忙等)
- void spin_lock(spinlock_t *lock);
- int spin_trylock(spinlock_t *lock);
- spin_lock_irq();
- //=spin_lock() + local_irq_disable()spin_lock_irqsave();
- //= spin_lock() + lock_irq_save();
- spin_lock_bh();
- //=spin_lock() + local_bh_disable();
- //解鎖
- void spin_unlock(spinlock_t *lock);
- spin_unlock_irq();
- //=spin_unlock() + local_irq_enable()
- spin_unlock_irqrestore();
- //= spin_unlock() + lock_irq_restore();
- spin_unlock_bh();
- //=spin_unlock() + local_bh_enable();
讀寫自旋鎖
傳統(tǒng)的自旋鎖粗暴的將臨界資源劃歸給一個CPU,但是很多資源都不會因?yàn)樽x而被破壞,所以我們可以允許多個CPU同時讀臨界資源,但不允許同時寫資源,類似于應(yīng)用層的文件鎖,內(nèi)核的讀寫鎖機(jī)制同樣有下述互斥原則:
- 讀者 + 讀者 不互斥
- 讀者 + 寫者 互斥
- 寫者 + 寫者 互斥
- //include/linux/rwlock.h
- //定義并初始化自旋鎖
- rwlock_t rwlock;
- void rwlock_init(rwlock_t *lock);
- //加讀鎖
- void read_lock(rwlock_t *lock);
- int read_trylock(rwlock_t *lock);
- void read_lock_irqsave(rwlock_t *lock,unsigned long flags);
- void read_lock_irq(rwlock_t *lock, unsigned long flags);
- void read_lock_bh(rwlock_t *lock);
- //解讀鎖
- void read_unlock(rwlock_t *lock);
- void read_unlock_irqrestrore(rwlock_t *lock,unsigned long flags);
- void read_unlock_irq(rwlock_t *lock, unsigned long flags);
- void read_unlock_bh(rwlock_t *lock);
- //加寫鎖
- void write_lock(rwlock_t *lock);
- int write_trylock(rwlock_t *lock);
- void write_lock_irqsave(rwlock_t *lock,unsigned long flags);
- void write_lock_irq(rwlock_t *lock, unsigned long flags);
- void write_lock_bh(rwlock_t *lock);
- //解寫鎖
- void write_unlock(rwlock_t *lock);
- void write_unlock_irqrestrore(rwlock_t *lock,unsigned long flags);
- void write_unlock_irq(rwlock_t *lock, unsigned long flags);
- void write_unlock_bh(rwlock_t *lock);
順序鎖
順序鎖可以看作讀寫鎖的升級版,讀寫鎖不允許同時存在讀者和寫者,順序鎖對這一情況進(jìn)行了改良,它允許寫者和讀者同時訪問臨界區(qū),不必再向讀寫鎖那樣讀者要讀必須等待寫者寫完,寫者要寫必須等待讀者讀完。不過,使用順序鎖的時候,臨界區(qū)不能有指針,因?yàn)閷懻呖赡軙薷闹羔樀闹赶颍绻x者去讀,就會Oops,此外,如果讀者讀過的數(shù)據(jù)被寫者改變了,讀者需要重新讀,以維持?jǐn)?shù)據(jù)是***的,雖然有這兩個約束,但是順序鎖提供了比讀寫鎖更大的靈活性。對于寫者+寫者的情況,順序鎖的機(jī)制和讀寫鎖一樣,必須等!
- 讀者 + 讀者 不互斥
- 讀者 + 寫者 不互斥 , 臨界區(qū)沒有指針+讀者需自己注意更新
- 寫者 + 寫者 互斥
- //include/linux/seqlock.h
- //定義順序鎖
- struct seqlock_t sl;//獲取順序鎖
- void write_seqlock(seqlock_t *sl);
- void write_tryseqlock(seqlock_t *sl);
- void write_seqlock_irqsave(lock,flags);
- //=local_irq_save() + write_seqlock()
- void write_seqlock_irq(seqlock_t *sl);
- //=local_irq_disable() + write_seqlock()
- void write_seqlock_bh(seqlock_t *sl);
- //local_bh_disable() + write_seqlock()
- //釋放順序鎖
- void write_sequnlock(seqlock_t *sl);
- void write_sequnlock_irqsave(lock,flags);
- //=local_irq_restore() + write_sequnlock()
- void write_sequnlock_irq(seqlock_t *sl);
- //=local_irq_enable() + write_sequnlock()
- void write_sequnlock_bh(seqlock_t *sl);
- //local_bh_enable() + write_sequnlock()
- //讀開始
- unsigned read_seqbegin(const seqlock_t *sl);
- read_seqbegin_irqsave(lock,flags);
- //=local_irq_save() + read_seqbegin();
- //重讀
- int read_seqretry(const seqlock_t *sl,unsigned iv);
- read_seqretry_irqrestore(lock,iv,flags);
- //=local_irq_restore() + read_seqretry();
RCU
RCU即Read-Copy Update,即讀者直接讀,寫者先拷貝再擇時更新,是另外一種讀寫鎖的升級版,這種機(jī)制在VFS層被大量使用。正如其名,讀者訪問臨界資源不需要鎖,從下面的rcu_read_lock的定義即可看出,寫者在寫之前先將臨界資源進(jìn)行備份,去修改這個副本,等所有的CPU都退出對這塊臨界區(qū)的引用后,再通過回調(diào)機(jī)制,將引用這塊資源的原指針指向已經(jīng)修改的備份。從中可以看出,在RCU機(jī)制下,讀者的開銷大大降低,也沒有順序鎖的指針問題,但是寫者的開銷很大,所以RCU適用于讀多寫少的臨界資源。如果寫操作很多,就有可能將讀操作節(jié)約的性能抵消掉,得不償失。
- 讀者 + 讀者 不互斥
- 讀者 + 寫者 不互斥 , 讀者自己注意更新
- 寫者 + 寫者 不互斥 ,寫者之間自己去同步
內(nèi)核會為每一個CPU維護(hù)兩個數(shù)據(jù)結(jié)構(gòu)-rcu_data和rcu_bh_data,他們用于保存回調(diào)函數(shù),函數(shù)call_rcu()把回調(diào)函數(shù)注冊到rcu_data,而call_rcu_bh()則把回調(diào)函數(shù)注冊到rcu_bh_data,在每一個數(shù)據(jù)結(jié)構(gòu)上,回調(diào)函數(shù)們會組成一個隊(duì)列。
使用RCU時,讀執(zhí)行單元必須提供一個信號給寫執(zhí)行單元以便寫執(zhí)行單元能夠確定數(shù)據(jù)可以被安全地釋放或修改的時機(jī)。內(nèi)核中有一個專門的垃圾收集器來探測讀執(zhí)行單元的信號,一旦所有的讀執(zhí)行單元都已經(jīng)發(fā)送信號告訴收集器自己都沒有使用RCU的數(shù)據(jù)結(jié)構(gòu),收集器就調(diào)用回調(diào)函數(shù)完成***的數(shù)據(jù)釋放或修改操作。
讀
讀即RCU中的R,從下面的宏定義可以看出,讀操作其實(shí)就是禁止內(nèi)核的搶占調(diào)度,并沒有使用一個鎖對象。
- //讀鎖定
- //include/linux/rcupdate.h
- rcu_read_lock(); //preempt_disbale()
- rcu_read_lock_bh(); //local_bh_disable()
- //讀解鎖
- rcu_read_unlock() //preempt_enable()
- rcu_read_unlock_bh(); //local_bh_enable()
同步
同步即是RCU寫操作的***一個步驟-Update,下面這個接口會則色寫執(zhí)行單元,直到所有的讀執(zhí)行單元已經(jīng)完成讀執(zhí)行單元臨界區(qū),寫執(zhí)行單元才可以繼續(xù)下一步操作。如果有多個RCU寫執(zhí)行單元調(diào)用該函數(shù),他們將在一個grace period(即所有的讀執(zhí)行單元已經(jīng)完成對臨界區(qū)的訪問)之后全部被喚醒。
- synchrosize_rcu()
掛起回調(diào)
下面這個接口也由RCU寫執(zhí)行單元調(diào)用,它不會使寫執(zhí)行單元阻塞,因而可以在中斷上下文或軟中斷中使用,該函數(shù)把func掛接到RCU回調(diào)函數(shù)鏈上,然后立即返回。函數(shù)sychrosize_rcu()其實(shí)也會調(diào)用call_rcu()。
- void call_rcu(struct rcu_head *head,void (*func)(struct rcu_head *rcu));
下面這個接口會將軟中斷的完成也當(dāng)作經(jīng)歷一個quiecent state(靜默狀態(tài)),因此如果寫執(zhí)行單元調(diào)用了該函數(shù),在進(jìn)程上下文的讀執(zhí)行單元必須使用rcu_read_lock_bh();
- void call_rcu_bh(struct rcu_head *head,void (*func)(struct rcu_head *rcu));
RCU機(jī)制被大量的運(yùn)用在內(nèi)核鏈表的讀寫中,下面這些就是內(nèi)核中使用RCU機(jī)制保護(hù)的數(shù)據(jù)結(jié)構(gòu),函數(shù)的功能和非RCU版本一樣,具體可以參考內(nèi)核文件"include/linux/list.h",只不過這里的操作都會使用RCU保護(hù)起來。
- void list_add_rcu(struct list_head *new, struct list_head *head);
- void list_add_tail_rcu(struct list_head *new,struct list_head *head);
- void list_del_rcu(struct list_head *entry);
- void list_replace_rcu(struct list_head *old,struct list_head *new);
- list_for_each_rcu(pos,head);
- list_for_each_safe_rcu(pos,n,head);
- list_for_each_entry_rcu(pos,head,member);
- void hlist_del_rcu(struct hlist_node *n);
- void hlist_add_head_rcu(struct hlist_node *n, struct hlist_head *h);
- list_for_each_rcu(pos,head);
- hlist_for_each_entry_rcu(tpos,pos,head,member);
信號量
自旋鎖一節(jié)提過,如果一個CPU不能獲取臨界資源,就會造成"原地自旋",所以自旋鎖保護(hù)的臨界區(qū)的執(zhí)行時間不能太長,但如果我們的確需要保護(hù)一段執(zhí)行時間比較長的臨界區(qū)呢?答案就是信號量
,信號量的底層依托于自旋鎖來實(shí)現(xiàn)其原子性,并進(jìn)一步將其提高到"進(jìn)程"的維度,稱為一種可以運(yùn)行在進(jìn)程上下文的"鎖",正是這種能運(yùn)行在進(jìn)程上下文的能力賦予了信號量和自旋鎖的諸多不同。
使用信號量,如果試圖獲取信號量的進(jìn)程獲取失敗,內(nèi)核就會將其調(diào)度為睡眠狀態(tài),執(zhí)行其他進(jìn)程,避免了CPU的忙等。不過,進(jìn)程上下文的切換也是有成本的,所以通常,信號量在內(nèi)核中都是只用于保護(hù)大塊臨界區(qū)。
此外,一個進(jìn)程一旦無法獲取期待的信號量就會進(jìn)入睡眠,所以信號量保護(hù)的臨界區(qū)可以有睡眠的代碼。在這方面,自旋鎖保護(hù)的區(qū)域是不能睡眠也不能執(zhí)行schedule()的,因?yàn)橐坏┮粋€搶到了鎖的CPU進(jìn)行了進(jìn)程上下文的切換或睡眠,那么其他等待這個自旋鎖的CPU就會一直在那忙等,就永遠(yuǎn)無法等到這個鎖,,形成死鎖,除非有其他進(jìn)程將其喚醒(通常都不會有)。
也正是由于信號量操作可能引起阻塞,所以信號量不能用于中斷上下文。總結(jié)一下剛才羅嗦這一段:
項(xiàng)目 | 信號量 | 自旋鎖 |
---|---|---|
臨界區(qū)時間 | 進(jìn)程切換時間更短 | 臨界區(qū)執(zhí)行時間更短 |
進(jìn)程上下文 | 臨界區(qū)可以睡眠或調(diào)度 | 臨界區(qū)不可以睡眠或調(diào)度 |
中斷上下文 | 只有down_trylock()可以 | 可以 |
傳統(tǒng)信號量
內(nèi)核的信號量和應(yīng)用層的信號量的使用方式類似,但沒有獲取信號量這一步驟,因?yàn)閮?nèi)核中中的信號量可以映射到所有調(diào)用這個模塊的用戶進(jìn)程的內(nèi)核空間。這些用戶進(jìn)程也就直接共享了一個信號量,所以也就沒有獲取信號量一說,相關(guān)的內(nèi)容我在"Linux IPC System V 信號量"一文中有所討論。
和應(yīng)用層的信號量一樣,內(nèi)核信號量也是用于對臨界資源的互斥/順序訪問,同樣,雖然在使用信號量的時候我們可以初始化為任意值,但實(shí)際操作上我們通常只初始化為1或0,下述是Linux內(nèi)核提供的信號量API。
- //include/linux/semaphore.h
- //定義并初始化semaphore對象
- struct semphore sem;//初始化信號量
- void sem_init(struct semaphore * sem,int val);
- init_MUTEX(sem);
- init_MUTEX_LOCKED(sem);
- DECLARE_MUTEX(sem);
- DECLARE_MUTEX_LOCKED(sem);//P操作
- //down()會導(dǎo)致睡眠,不能用于中斷上下文
- void down(struct semaphore *sem);
- //down_interruptible同樣會進(jìn)入休眠,但能被打斷
- int down_interruptible(struct semaphore *sem);
- //down_trylock不能獲得鎖時會立即返回,不會睡眠,可以用在中斷上下文
- int down_trylock(struct semaphore *sem);//V操作
- void up(struct semaphore *sem);
讀寫信號量
讀寫信號量與信號量的關(guān)系 和 讀寫自旋鎖與自旋鎖的關(guān)系類似,他們的互斥邏輯都是一樣的,這里不再贅述
- //定義并初始化讀寫信號量
- struct rw_semaphore my_rwsem;
- void init_rwsem(struct rw_semaphore *sem);//P讀信號量
- void down_read(struct rw_semaphore *sem);
- int down_read_trylock(struct rw_semaphore *sem);//V讀信號量
- void up_read(struct rw_semaphore *sem);//P寫信號量
- void down_write(struct rw_semaphore *sem);
- int down_write_trylock(struct rw_semaphore *sem);//V寫信號量
- void up_write(struct rw_semaphore *sem);
模板
- struct rw_semaphore my_rwsem;
- void init_rwsem(&my_rwsem);//讀前獲取讀信號量
- down_read(&my_rwsem); //若要非阻塞:
- down_read_trylock(&my_rwsem);/* 讀臨界區(qū) */
- //讀完釋放讀信號量
- up_read(&my_rwsem);
- //寫前獲取寫信號量
- down_write(&my_rwsem);
- //若要非阻塞:
- down_write_trylock(&my_rwsem); /* 寫臨界區(qū) */
- //寫完釋放寫信號量
- up_write(&my_rwsem);
完成量
完成量用于一個執(zhí)行單元等待另一個執(zhí)行單元執(zhí)行完某事,和傳統(tǒng)信號量一樣,主要是用來實(shí)現(xiàn)隊(duì)臨界區(qū)的順序/互斥訪問。但是完成量還提供一種喚醒一個或喚醒所有等待進(jìn)程的接口,有點(diǎn)類似與應(yīng)用層的條件變量。
- //定義并初始化完成量
- struct completion my_completion;
- init_completion(&my_completion);//或
- DECLARE_COMPLETION(my_completion)//等待
- completionvoid wait_for_completion(struct completion *c);//喚醒
- completionvoid complete(struct completion *c); //只喚醒一個等待的執(zhí)行單元
- void complete_all(struct completion *c); //釋放所有等待該完成量的執(zhí)行單元
互斥體
除了信號量,Linux內(nèi)核還提供了一種專門用于實(shí)現(xiàn)互斥的機(jī)制-互斥體,相關(guān)的內(nèi)核API如下:
- //include/linux/mutex.h
- //定義并初始化mutex對象
- struct mutex my_mutex;
- mutex_init(&my_mutex);//獲取
- mutexvoid mutex_lock(struct mutex *lock);
- int mutex_trylock(struct mutex *lock);
- int mutex_lock_interruptible(struct mutex *lock);//釋放
- mutexvoid mutex_unlock(struct mutex *lock);