細(xì)說Kestrel.scala中的PersistentQueue
本文是Scala代碼實(shí)例之Kestrel的第五部分,繼續(xù)講述PersistentQueue處理消息隊(duì)列并發(fā)請求的方式。
回顧一下之前我們讀過的兩個(gè)文件,Kestrel.scala, QueueCollection.scala。Kestrel.scala是啟動文件,并且通過一個(gè)actor,保持整個(gè)項(xiàng)目不會因?yàn)闆]有線程運(yùn)行而退出,同時(shí)注冊了一個(gè)acceptor,當(dāng)建立起新的鏈接的時(shí)候,訪問 KestrelHandler.scala(這個(gè)稍后我們再讀)。QueueCollection.scala,維護(hù)一個(gè)PersistentQueue的隊(duì)列,如果訪問的queue_name不存在,則創(chuàng)建一個(gè),如果存在,就對相應(yīng)的QueueCollection進(jìn)行操作。如果留心的話,我們還可以看到QueueCollection在啟動的時(shí)候,queue_name的來源是一個(gè)文件目錄。
我們就從這個(gè)入口繼續(xù)往下,看看PersistentQueue是如何處理消息隊(duì)列的并發(fā)請求的:
在前幾篇文章里面,我們曾經(jīng)提到過PersistentQueue有兩個(gè)“類”,一個(gè)是object PersistentQueue,一個(gè)是class PersistentQueue。而object在scala是一個(gè)單例模式,也就是singleton。也可以看做是只有static類型的java類?,F(xiàn)在讓我們關(guān)注一下,看看class PersistentQueue和object Persistent之間的關(guān)系是怎樣的。
剛開始的一段代碼有點(diǎn)嚇人:
- class OverlaySetting[T](base: => T) {
- @volatile private var local: Option[T] = None
- def set(value: Option[T]) = local = value
- def apply() = local.getOrElse(base)
- }
我們先跳過去,直接往下看,看到這里:
- def overlay[T](base: => T) = new OverlaySetting(base)
- // attempting to add an item after the queue reaches this size (in items) will fail.
- val maxItems = overlay(PersistentQueue.maxItems)
- // attempting to add an item after the queue reaches this size (in bytes) will fail.
- val maxSize = overlay(PersistentQueue.maxSize)
- ……
如果我們不細(xì)究overlay的內(nèi)容,這段代碼其實(shí)就是把object PersisitentQueue中的變量賦值給class PersistentQueue中,那么overlay究竟做了什么呢?其實(shí),overlay是將變量做了一個(gè)封裝,封裝在一個(gè)叫做OverlaySetting的類里面。這個(gè)類,根據(jù)我們之前對scala語法的了解,可以知道,它是一個(gè)OverlaySetting[T]的類,并且在創(chuàng)建的時(shí)候,需要帶入方法,方法沒有參數(shù),但是有一個(gè)返回值,類型就是T。(關(guān)于class類的語法規(guī)則,可以參考http://programming-scala.labs.oreilly.com/ch05.html#Constructors,不過里面的例子比OverlaySetting還復(fù)雜……-_-|||)
這個(gè)類在每次創(chuàng)建對象的時(shí)候,都會被賦值。我們也看到只有在使用apply方法的時(shí)候才會被調(diào)用(不過我沒有太想明白,如何通過函數(shù)的返回值來確定模板中的類型T,也許這就是Scala這種更加靈活的編譯算法,可以在new對象的時(shí)候,通過審查變量類型來獲取T的吧,畢竟Scala是一個(gè)靜態(tài)語言,如果是動態(tài)語言就不太成為一個(gè)問題了)。
這里面還存在一個(gè)Scala概念,就是方法=變量。當(dāng)然在很多動態(tài)語言里面就已經(jīng)這么做了。在Scala里面,我們可以把def看作是val的一種特殊寫法,def聲明的方法,也可以用 def func_name() = {} 這樣的語法規(guī)則,跟val基本就是一回事了。當(dāng)然,這一改變在Scala里面并不簡單是一個(gè)語法規(guī)則的問題,更進(jìn)一步的,所有的變量也都是類,所以我們可以把一個(gè)變量,看做一個(gè)類,也可以看做類的建構(gòu)函數(shù),返回的就是類本身……有點(diǎn)繞,不過這樣理解,就比較好理解為什么可以用常量,當(dāng)作沒有參數(shù)的方法調(diào)用了。
說了那么多,結(jié)論很簡單,maxSize是一個(gè)OverlaySetting[LONG]的類,如果maxSize沒有設(shè)置過,那么返回的就是object PersistentQueue里面的maxSize。LONG類型。
在主程序體里面,我們看到了Journal類,然后是調(diào)用 configure 方法,這個(gè)方法印證了我們的對OverlaySetting的解釋,它從配置文件里面把參數(shù)都讀出來賦值給class PersistentQueue里面的那些常量,用的是set。這里是一個(gè)Scala的語法細(xì)節(jié),它省略了一些不必要的”.”和”()”。
休息一下。我們開始討論在PersistentQueue里面的Actor
……
休息完畢
Scala中,消息傳遞的方式有一個(gè)特殊的語法結(jié)構(gòu):“Object ! MessageType” 就好像在源代碼里面出現(xiàn)的:“w.actor ! ItemArrived?!?,(關(guān)于Scala的Actor,詳細(xì)的語法說明在http://programming-scala.labs.oreilly.com/ch09.html可以看到,建議先看一下,好對actor有一個(gè)比較深入的了解)
我們發(fā)現(xiàn)PersistentQueue中Actor的實(shí)現(xiàn),跟語法說明里面的很不一樣,在語法說明里面的Actor都是作為一個(gè)獨(dú)立的線程出現(xiàn)的,而在PersistentQueue中,你甚至看不見一個(gè)對Actor的重載,但我們可以發(fā)現(xiàn)與Actor相關(guān)的幾個(gè)地方,一個(gè)是Waiter的定義,它是一個(gè)case class,并且有一個(gè)成員變量叫做actor,類型是Actor:
- private case class Waiter(actor: Actor)
- ……
- private val waiters = new mutable.ArrayBuffer[Waiter]
- ……
- val w = Waiter(Actor.self)
- waiters += w
- ……
需要注意:之前我們提過一個(gè)Scala的語法規(guī)則,那就是類后面的建構(gòu)函數(shù)的參數(shù),就是類中的成員變量!(不過這是在解釋,為什么在建構(gòu)函數(shù)里面會有private關(guān)鍵字時(shí)提到的……)所以,我們知道了一點(diǎn),就是每一個(gè)Waiter內(nèi)部都有一個(gè)actor,這些actor通過Actor.self共享了一個(gè)線程,當(dāng)然也和其他的PersistentQueue共享了一個(gè)Actor。這是有點(diǎn)讓人不習(xí)慣,因?yàn)檫@么要緊的一個(gè)線程的創(chuàng)建,竟然可以出現(xiàn)得那么隱蔽。甚至連一個(gè)大括號都沒有。
接下來,我們來看看Actor是怎么在PersistentQueue里面工作了——這有點(diǎn)難,因?yàn)樗臋C(jī)制有點(diǎn)復(fù)雜,不是簡單的象語法說明里面的那樣,是一個(gè)完整的獨(dú)立的函數(shù),而是在一些函數(shù)中,突然切入進(jìn)來,分享了Actor.self的一部分線程資源,就像下面代碼一樣:
- ……
- f operateReact(op: => Option[QItem], timeoutAbsolute: Long)(f: Option[QItem] => Unit): Unit = {
- operateOrWait(op, timeoutAbsolute) match {
- case (item, None) =>
- f(item)
- case (None, Some(w)) =>
- Actor.self.reactWithin((timeoutAbsolute - Time.now) max 0) {
- case ItemArrived => operateReact(op, timeoutAbsolute)(f)
- case TIMEOUT => synchronized {
- waiters -= w
- // race: someone could have done an add() between the timeout and grabbing the lock.
- Actor.self.reactWithin(0) {
- case ItemArrived => f(op)
- case TIMEOUT => f(op)
- }
- }
- }
- case _ => throw new RuntimeException()
- }
- ……
其中:
- Actor.self.reactWithin(0) {
- case ItemArrived => f(op)
- case TIMEOUT => f(op)
- }
就是Actor的一個(gè)語法,在一段時(shí)間里面等待消息,如果有消息就如何……,如果沒有消息(TIMEOUT),就如何……。但是在整個(gè)函數(shù)里面套用了兩層 Actor.self.reactWithin,有點(diǎn)讓人要暈菜的感覺,再加上之前有一個(gè)match…case的結(jié)構(gòu),調(diào)用了operateOrWait(op, timeoutAbsolute)方法。要了解整個(gè)消息處理的機(jī)制,就需要把這三個(gè)部分聯(lián)系起來看了。
先簡單看一下operateOrWait函數(shù),比較容易理解:
- private def operateOrWait(op: => Option[QItem], timeoutAbsolute: Long): (Option[QItem], Option[Waiter]) = synchronized {
- val item = op
- if (!item.isDefined && !closed && !paused && timeoutAbsolute > 0) {
- val w = Waiter(Actor.self)
- waiters += w
- (None, Some(w))
- } else {
- (item, None)
- }
- }
返回值是一個(gè)map,包括兩個(gè)被Option封裝的類型QItem和Waiter,從QItem.scala中可以知道(代碼很簡單),QItem就是把原始數(shù)據(jù)打了一個(gè)包,而Waiter之前我們也已經(jīng)說過了。程序體中的判斷是這樣的:如果item,也就是op這個(gè)參數(shù)沒有定義,并且PersistentQueue也沒有停止,關(guān)閉,而且處理時(shí)間AbsoluteTime不是0,那么就創(chuàng)建一個(gè)Waiter,返回(None, Some[Waiter]);如果不滿足這些條件,那么就直接返回(op, None)。簡單的說,就是如果系統(tǒng)還能等,就讓他等待正常一段時(shí)間然后操作,如果不能等,就直接返回操作指令。返回值只有兩種類型。
然后再看operateReact,如果返回的是時(shí)間參數(shù)是None(詳細(xì)的可以參考 actor .. case 的語法,地址是:http://programming-scala.labs.oreilly.com/ch03.html#MatchingOnCaseClasses),那么就直接執(zhí)行f(op)的函數(shù),把op這個(gè)方法,作為參數(shù)傳遞給f函數(shù)。如果返回的是一個(gè)時(shí)間戳,Some(w),那么我們就等待AbsoluteTime 到 Time.now()這段時(shí)間,如果在這段事件里面有ItemArrived事件發(fā)生,那么就處理一下,直到Time.now 等于或者大于 AbsoluteTime,那就會得到一個(gè)TIMEOUT,然后就退出了。(有一個(gè)異常的情況,需要清空一下事件隊(duì)列,通過reactWithin(0){})
這么理解這段actor還是不太清晰,那么讓我們回到上一層的調(diào)用??纯催@個(gè)f(op)到底是什么,然后我們看到了:
- def removeReact(timeoutAbsolute: Long, transaction: Boolean)(f: Option[QItem] => Unit): Unit = {
- operateReact(remove(transaction), timeoutAbsolute)(f)
- }
我們就知道op其實(shí)是一個(gè)remove的操作,并且返回remove得到的QItem對象。再往上一層到QueueCollection,我們看到:
- q.removeReact(if (timeout == 0) timeout else Time.now + timeout, transaction) {
- case None =>
- queueMisses.incr
- f(None)
- case Some(item) =>
- queueHits.incr
- f(Some(item))
- }
f方法的操作,如果之前的remove返回的是一個(gè)None,則記錄queueMess(未命中)添加1,如果返回的是一個(gè)QItem的值,那么就記錄queueHits(命中)添加1,并且,對這個(gè)QItem進(jìn)行操作(注意:這里的f是QueueCollection中remove帶入的那個(gè)方法,而不是前面提到的removeReact里面提到的f。
從QueueCollection的remove調(diào)用到***層PersistentQueue的operateReact調(diào)用,我們大致可以了解這么曲折的調(diào)用關(guān)系解決了一個(gè)什么問題——從消息隊(duì)列里面獲取QItem。
回顧一下QueueCollection其他的代碼,我們發(fā)現(xiàn),只有waiter.size > 0的時(shí)候,有新的QItem添加,才會發(fā)出ItemArrived事件。也就是說,只有有一個(gè)獲取消息隊(duì)列的進(jìn)程存在的時(shí)候,才會觸發(fā)ItemArrived事件。獲取消息隊(duì)列,則通過使用reactWithin,允許在一個(gè)規(guī)定的時(shí)間內(nèi),連續(xù)處理一系列的ItemArrived事件??碤ueueCollection的remove方法,我們還可以知道,當(dāng)啟動q.removeReact之前,首先會調(diào)用q.peek來檢查,隊(duì)列是不是為空,如果不是空的話,就直接返回隊(duì)列里面最前面的那個(gè)元素。所以我們可以把這個(gè)消息隊(duì)列理解成——如果消息隊(duì)列為空的情況下,讓獲取消息隊(duì)列的Client等待一段時(shí)間的機(jī)制,以降低反復(fù)進(jìn)行SOCKET連接帶來的不必要的耗損。
這個(gè)機(jī)制,可以讓我們比較好地理解,為什么Kestrel提示說,如果運(yùn)行多個(gè)獨(dú)立的進(jìn)程來處理消息隊(duì)列的時(shí)候,會讓這個(gè)消息隊(duì)列的處理變成一個(gè)缺乏時(shí)序,但是處理并發(fā)能力很強(qiáng)的集群。每個(gè)連接對應(yīng)的是一個(gè)Waiter,但是當(dāng)ItemArrived觸發(fā)的時(shí)候,只可能有其中的一個(gè)reactWithin得到了這個(gè)事件,發(fā)送給對應(yīng)的那個(gè)線程處理這個(gè)消息。
我現(xiàn)在手上的是Kestrel-1.1.2版本的代碼,走讀這部分代碼的時(shí)候,其實(shí)發(fā)現(xiàn)作者在寫這段代碼的時(shí)候,多了一些冗余的內(nèi)容——比如說removeReceive方法,從而看出作者在使用Scala的特性中,也是逐步地把代碼優(yōu)化成如今的樣子。畢竟Scala和Java之間的差別很大,如果做到Type Less, Do More。是需要一個(gè)逐步積累的過程,誰都不是天生就能把Scala寫得很好的,更何況是需要性能非常高的時(shí)候。
【相關(guān)閱讀】