Libtask源碼解析之鎖
本文轉(zhuǎn)載自微信公眾號(hào)「編程雜技」,作者theanarkh 。轉(zhuǎn)載本文請(qǐng)聯(lián)系編程雜技公眾號(hào)。
libtask中其實(shí)不需要鎖,因?yàn)閘ibtask中協(xié)程是非搶占式的,不存在競(jìng)態(tài)條件。但是libtask還是實(shí)現(xiàn)了一套鎖的機(jī)制。我們看一下這個(gè)鎖機(jī)制的實(shí)現(xiàn)。首先我們看一下結(jié)構(gòu)體。
- struct QLock
- {
- // 鎖持有者
- Task *owner;
- // 等待該鎖的隊(duì)列
- Tasklist waiting;
- };
接著我們看一下鎖的操作。
加鎖
- static int _qlock(QLock *l, int block)
- {
- // 鎖沒(méi)有持有者,則置當(dāng)前協(xié)程為持有者,直接返回,1表示加鎖成功
- if(l->owner == nil){
- l->owner = taskrunning;
- return 1;
- }
- // 非阻塞,則直接返回,0表示加鎖失敗
- if(!block)
- return 0;
- // 插入等待鎖隊(duì)列
- addtask(&l->waiting, taskrunning);
- taskstate("qlock");
- // 切換到其他協(xié)程
- taskswitch();
- // 切換回來(lái)時(shí),如果持有鎖的協(xié)程不是當(dāng)前協(xié)程,則異常退出,因?yàn)橹挥谐钟墟i才會(huì)被切換回來(lái),見(jiàn)unqlock
- if(l->owner != taskrunning){
- fprint(2, "qlock: owner=%p self=%p oops\n", l->owner, taskrunning);
- abort();
- }
- return 1;
- }
如果當(dāng)前鎖沒(méi)有持有者,則當(dāng)前協(xié)程X就變成鎖的持有者,否則把協(xié)程X插入等待鎖隊(duì)列中,然后讓出cpu,切換到其他協(xié)程。當(dāng)后續(xù)鎖被釋放并被協(xié)程X持有時(shí),協(xié)程X就會(huì)被喚醒繼續(xù)持續(xù)。加鎖可以分為阻塞和非阻塞兩種模式。非阻塞就是加鎖失敗也不會(huì)切換協(xié)程。
- // 阻塞式加鎖
- void qlock(QLock *l)
- {
- _qlock(l, 1);
- }
- // 非阻塞式加鎖
- int
- canqlock(QLock *l)
- {
- return _qlock(l, 0);
- }
釋放鎖
接下來(lái)我們看一下釋放鎖的邏輯
- // 釋放鎖
- void qunlock(QLock *l)
- {
- Task *ready;
- // 鎖并沒(méi)有持有者,異常退出
- if(l->owner == 0){
- fprint(2, "qunlock: owner=0\n");
- abort();
- }
- // 如果還有協(xié)程在等待該鎖,則置為持有者,并且從等待隊(duì)列中刪除,然后修改狀態(tài)為就緒并加入就緒隊(duì)列
- if((l->owner = ready = l->waiting.head) != nil){
- deltask(&l->waiting, ready);
- taskready(ready);
- }
- }
當(dāng)鎖被釋放時(shí),如果還有協(xié)程在等待該鎖,則從等待隊(duì)列中摘取一個(gè)節(jié)點(diǎn),然后變成鎖的持有者并從等待隊(duì)列中刪除。最后插入就緒隊(duì)列等待調(diào)度。以上是一種互斥鎖的實(shí)現(xiàn)。下面我們?cè)賮?lái)看一下讀寫(xiě)鎖機(jī)制,讀寫(xiě)鎖也是互斥的,但是在某些情況下也可以共享。我們看一下讀寫(xiě)鎖的數(shù)據(jù)結(jié)構(gòu)。
- struct RWLock
- {
- // 正在讀的讀者個(gè)數(shù)
- int readers;
- // 當(dāng)前正在寫(xiě)的寫(xiě)者,只有一個(gè)
- Task *writer;
- // 等待讀和寫(xiě)的隊(duì)列
- Tasklist rwaiting;
- Tasklist wwaiting;
- };
接著我看一下加鎖邏輯。
加讀鎖
- // 加讀鎖
- static int _rlock(RWLock *l, int block)
- {
- /*
- 沒(méi)有正在寫(xiě)并且沒(méi)有等待寫(xiě),則加鎖成功,并且讀者數(shù)加一
- */
- if(l->writer == nil && l->wwaiting.head == nil){
- l->readers++;
- return 1;
- }
- // 非阻塞則直接返回
- if(!block)
- return 0;
- // 插入等待讀隊(duì)列
- addtask(&l->rwaiting, taskrunning);
- taskstate("rlock");
- // 切換上下文
- taskswitch();
- // 切換回來(lái)了,說(shuō)明加鎖成功
- return 1;
- }
當(dāng)且僅當(dāng)沒(méi)有正在寫(xiě)的寫(xiě)者和等待寫(xiě)的寫(xiě)者時(shí),才能加讀鎖成功,否則根據(jù)加鎖模式進(jìn)行下一步處理,直接返回加鎖失敗或者插入等待隊(duì)列,然后切換到其他協(xié)程。我們看到當(dāng)有一個(gè)等待寫(xiě)的協(xié)程時(shí)(l->wwaiting.head != nil),則后續(xù)的讀者就無(wú)法加鎖成功,而是被插入等待隊(duì)列,否則可能會(huì)引起寫(xiě)者饑餓。
加寫(xiě)鎖
- // 加寫(xiě)鎖
- static int _wlock(RWLock *l, int block)
- {
- // 沒(méi)有正在寫(xiě)并且沒(méi)有正在讀,則加鎖成功,并置寫(xiě)者為當(dāng)前協(xié)程
- if(l->writer == nil && l->readers == 0){
- l->writer = taskrunning;
- return 1;
- }
- // 非阻塞則直接返回
- if(!block)
- return 0;
- // 加入等待寫(xiě)隊(duì)列
- addtask(&l->wwaiting, taskrunning);
- taskstate("wlock");
- // 切換
- taskswitch();
- // 切換回來(lái)說(shuō)明拿到鎖了
- return 1;
- }
當(dāng)且僅當(dāng)沒(méi)有正在寫(xiě)的寫(xiě)者和沒(méi)有正在讀的讀者時(shí),才能加寫(xiě)鎖成功。否則類(lèi)似加讀鎖一樣處理。
釋放讀鎖
- // 釋放讀鎖
- void runlock(RWLock *l)
- {
- Task *t;
- // 讀者減一,如果等于0并且有等待寫(xiě)的協(xié)程,則隊(duì)列第一個(gè)協(xié)程持有該鎖
- if(--l->readers == 0 && (t = l->wwaiting.head) != nil){
- deltask(&l->wwaiting, t);
- l->writer = t;
- taskready(t);
- }
- }
持有讀鎖,說(shuō)明當(dāng)前肯定沒(méi)有正在寫(xiě)的寫(xiě)者,但是可能有等待寫(xiě)的寫(xiě)者和等待讀的讀者(因?yàn)橛械却龑?xiě)的寫(xiě)者導(dǎo)致無(wú)法加鎖成功)。當(dāng)釋放讀鎖時(shí),如果還有其他讀者,則其他讀者可以繼續(xù)持有鎖,因?yàn)樽x者可以共享讀鎖,而寫(xiě)者保持原來(lái)狀態(tài)。如果這時(shí)候沒(méi)有讀者但是有等待寫(xiě)的寫(xiě)者,則從隊(duì)列中選擇第一個(gè)節(jié)點(diǎn)成為鎖的持有者,其他的寫(xiě)者則繼續(xù)等待,因?yàn)閷?xiě)者不能共享寫(xiě)鎖。
釋放寫(xiě)鎖
- // 釋放寫(xiě)鎖
- void wunlock(RWLock *l)
- {
- Task *t;
- // 沒(méi)有正在寫(xiě),異常退出
- if(l->writer == nil){
- fprint(2, "wunlock: not locked\n");
- abort();
- }
- // 置空,沒(méi)有協(xié)程正在寫(xiě)
- l->writer = nil;
- // 有正在讀,異常退出,寫(xiě)的時(shí)候,是無(wú)法讀的
- if(l->readers != 0){
- fprint(2, "wunlock: readers\n");
- abort();
- }
- // 釋放寫(xiě)鎖時(shí),優(yōu)先讓讀者持有鎖,因?yàn)樽x者可以共享持有鎖,提高并發(fā)
- // 讀可以共享,把等待讀的協(xié)程都加入就緒隊(duì)列,并持有鎖
- while((t = l->rwaiting.head) != nil){
- deltask(&l->rwaiting, t);
- l->readers++;
- taskready(t);
- }
- // 釋放寫(xiě)鎖時(shí),如果又沒(méi)有讀者,并且有等待寫(xiě)的協(xié)程,則隊(duì)列的第一個(gè)等待寫(xiě)的協(xié)程持有鎖
- if(l->readers == 0 && (t = l->wwaiting.head) != nil){
- deltask(&l->wwaiting, t);
- l->writer = t;
- taskready(t);
- }
- }
持有寫(xiě)鎖,可能有等待寫(xiě)的寫(xiě)者和等待讀的讀者。這里是讀者優(yōu)先持有鎖,因?yàn)樽x者可以共享持有鎖,提高并發(fā),如果沒(méi)有讀者,則再判斷寫(xiě)者。
總結(jié):?jiǎn)渭兊幕コ怄i是比較簡(jiǎn)單的,讀寫(xiě)鎖就相對(duì)復(fù)雜一點(diǎn),主要是要根據(jù)讀鎖和寫(xiě)鎖的特性制定一些策略,比如避免饑餓問(wèn)題。libtask的方式是,加寫(xiě)鎖的時(shí)候,當(dāng)無(wú)法持有鎖的時(shí)候,申請(qǐng)者就會(huì)被插入等待等待隊(duì)列。這個(gè)是沒(méi)有什么好說(shuō)的,加讀者的時(shí)候,情況就復(fù)雜了點(diǎn),如果這時(shí)候有讀者正在持有鎖,理論上,申請(qǐng)者也可以持有鎖,因?yàn)樽x鎖是共享的,但是單純這樣處理的話,可能會(huì)導(dǎo)致等待寫(xiě)的寫(xiě)者一直拿不到鎖,所以這里需要判斷是否有等待寫(xiě)的寫(xiě)者,如果有則當(dāng)前申請(qǐng)者則不能再持有讀鎖,而是要加入等待隊(duì)列。那么在釋放鎖的時(shí)候,當(dāng)釋放讀鎖時(shí),優(yōu)先讓等待寫(xiě)的寫(xiě)者持有鎖,再到等待讀的讀者持有鎖。同樣,當(dāng)釋放寫(xiě)鎖時(shí),優(yōu)先讓讀者持有鎖,這樣就能比較好地平衡讀者和寫(xiě)者持有鎖的機(jī)會(huì)。