從Java走進Scala:一步步教你使用Scala Actor
在 前一篇文章 中,我討論了構建并發(fā)代碼的重要性(無論是否是 Scala 代碼),還討論了在編寫并發(fā)代碼時開發(fā)人員面對的一些問題,包括不要鎖住太多東西、不要鎖住太少東西、避免死鎖、避免生成太多線程等等。
51CTO編輯推薦:Scala編程語言專題
這些理論問題太沉悶了。為了避免讀者覺得失望,我與您一起研究了 Scala 的一些并發(fā)構造,首先是在 Scala 中直接使用 Java 語言的并發(fā)庫的基本方法,然后討論 Scala API 中的 MailBox 類型。盡管這兩種方法都是可行的,但是它們并不是 Scala 實現(xiàn)并發(fā)性的主要機制。
真正提供并發(fā)性的是 Scala 的 actor。
什么是 “actor”?
“actor” 實現(xiàn)在稱為 actor 的執(zhí)行實體之間使用消息傳遞進行協(xié)作(注意,這里有意避免使用 “進程”、“線程” 或 “機器” 等詞匯)。盡管它聽起來與 RPC 機制有點兒相似,但是它們是有區(qū)別的。RPC 調(diào)用(比如 Java RMI 調(diào)用)會在調(diào)用者端阻塞,直到服務器端完成處理并發(fā)送回某種響應(返回值或異常),而消息傳遞方法不會阻塞調(diào)用者,因此可以巧妙地避免死鎖。
僅僅傳遞消息并不能避免錯誤的并發(fā)代碼的所有問題。另外,這種方法還有助于使用 “不共享任何東西” 編程風格,也就是說不同的 actor 并不訪問共享的數(shù)據(jù)結構(這有助于促進封裝 actor,無論 actor 是 JVM 本地的,還是位于其他地方) — 這樣就完全不需要同步了。畢竟,如果不共享任何東西,并發(fā)執(zhí)行就不涉及任何需要同步的東西。
這不算是對 actor 模型的正規(guī)描述,而且毫無疑問,具有更正規(guī)的計算機科學背景的人會找到各種更嚴謹?shù)拿枋龇椒?,能夠描?actor 的所有細節(jié)。但是對于本文來說,這個描述已經(jīng)夠了。在網(wǎng)上可以找到更詳細更正規(guī)的描述,還有一些學術文章詳細討論了 actor 背后的概念(請您自己決定是否要深入學習這些概念)。現(xiàn)在,我們來看看 Scala actors API。
#p#
Scala actor
使用 actor 根本不困難,只需使用 Actor 類的 actor 方法創(chuàng)建一個 actor,見清單 1:
清單 1. 開拍!
- import scala.actors._, Actor._
- package com.tedneward.scalaexamples.scala.V4
- {
- object Actor1
- {
- def main(args : Array[String]) =
- {
- val badActor =
- actor
- {
- receive
- {
- case msg => System.out.println(msg)
- }
- }
- badActor ! "Do ya feel lucky, punk?"
- }
- }
- }
這里同時做了兩件事。
首先,我們從 Scala Actors 庫的包中導入了這個庫,然后從庫中直接導入了 Actor 類的成員;第二步并不是完全必要的,因為在后面的代碼中可以使用 Actor.actor 替代 actor,但是這么做能夠表明 actor 是語言的內(nèi)置結構并(在一定程度上)提高代碼的可讀性。
下一步是使用 actor 方法創(chuàng)建 actor 本身,這個方法通過參數(shù)接收一個代碼塊。在這里,代碼塊執(zhí)行一個簡單的 receive(稍后討論)。結果是一個 actor,它被存儲在一個值引用中,供以后使用。
請記住,除了消息之外,actor 不使用其他通信方法。使用 ! 的代碼行實際上是一個向 badActor 發(fā)送消息的方法,這可能不太直觀。Actor 內(nèi)部還包含另一個 MailBox 元素(已討論);! 方法接收傳遞過來的參數(shù)(在這里是一個字符串),把它發(fā)送給郵箱,然后立即返回。
消息交付給 actor 之后,actor 通過調(diào)用它的 receive 方法來處理消息;這個方法從郵箱中取出第一個可用的消息,把它交付給一個模式匹配塊。注意,因為這里沒有指定模式匹配的類型,所以任何消息都是匹配的,而且消息被綁定到 msg 名稱(為了打印它)。
一定要注意一點:對于可以發(fā)送的類型,沒有任何限制 — 不一定要像前面的示例那樣發(fā)送字符串。實際上,基于 actor 的設計常常使用 Scala case 類攜帶實際消息本身,這樣就可以根據(jù) case 類的參數(shù)/成員的類型提供隱式的 “命令” 或 “動作”,或者向動作提供數(shù)據(jù)。
例如,假設希望 actor 用兩個不同的動作來響應發(fā)送的消息;新的實現(xiàn)可能與清單 2 相似:
清單 2. 嗨,我是導演!
- object Actor2
- {
- case class Speak(line : String);
- case class Gesture(bodyPart : String, action : String);
- case class NegotiateNewContract;
- def main(args : Array[String]) =
- {
- val badActor =
- actor
- {
- receive
- {
- case NegotiateNewContract =>
- System.out.println("I won't do it for less than $1 million!")
- case Speak(line) =>
- System.out.println(line)
- case Gesture(bodyPart, action) =>
- System.out.println("(" + action + "s " + bodyPart + ")")
- case _ =>
- System.out.println("Huh? I'll be in my trailer.")
- }
- }
- badActor ! NegotiateNewContract
- badActor ! Speak("Do ya feel lucky, punk?")
- badActor ! Gesture("face", "grimaces")
- badActor ! Speak("Well, do ya?")
- }
- }
到目前為止,看起來似乎沒問題,但是在運行時,只協(xié)商了新合同;在此之后,JVM 終止了。初看上去,似乎是生成的線程無法足夠快地響應消息,但是要記住在 actor 模型中并不處理線程,只處理消息傳遞。這里的問題其實非常簡單:一次接收使用一個消息,所以無論隊列中有多少個消息正在等待處理都無所謂,因為只有一次接收,所以只交付一個消息。
糾正這個問題需要對代碼做以下修改,見清單 3:
◆把 receive 塊放在一個接近無限的循環(huán)中。
◆創(chuàng)建一個新的 case 類來表示什么時候處理全部完成了。
清單 3. 現(xiàn)在我是一個更好的導演!
- object Actor2
- {
- case class Speak(line : String);
- case class Gesture(bodyPart : String, action : String);
- case class NegotiateNewContract;
- case class ThatsAWrap;
- def main(args : Array[String]) =
- {
- val badActor =
- actor
- {
- var done = false
- while (! done)
- {
- receive
- {
- case NegotiateNewContract =>
- System.out.println("I won't do it for less than $1 million!")
- case Speak(line) =>
- System.out.println(line)
- case Gesture(bodyPart, action) =>
- System.out.println("(" + action + "s " + bodyPart + ")")
- case ThatsAWrap =>
- System.out.println("Great cast party, everybody! See ya!")
- done = true
- case _ =>
- System.out.println("Huh? I'll be in my trailer.")
- }
- }
- }
- badActor ! NegotiateNewContract
- badActor ! Speak("Do ya feel lucky, punk?")
- badActor ! Gesture("face", "grimaces")
- badActor ! Speak("Well, do ya?")
- badActor ! ThatsAWrap
- }
- }
這下行了!使用 Scala actor 就這么容易。
#p#
并發(fā)地執(zhí)行動作
上面的代碼沒有反映出并發(fā)性 — 到目前為止給出的代碼更像是另一種異步的方法調(diào)用形式,您看不出區(qū)別。(從技術上說,在第二個示例中引入接近無限循環(huán)之前的代碼中,可以猜出有一定的并發(fā)性存在,但這只是偶然的證據(jù),不是明確的證明)。
為了證明在幕后確實有多個線程存在,我們深入研究一下前一個示例:
清單 4. 我要拍特寫了
- object Actor3
- {
- case class Speak(line : String);
- case class Gesture(bodyPart : String, action : String);
- case class NegotiateNewContract;
- case class ThatsAWrap;
- def main(args : Array[String]) =
- {
- def ct =
- "Thread " + Thread.currentThread().getName() + ": "
- val badActor =
- actor
- {
- var done = false
- while (! done)
- {
- receive
- {
- case NegotiateNewContract =>
- System.out.println(ct + "I won't do it for less than $1 million!")
- case Speak(line) =>
- System.out.println(ct + line)
- case Gesture(bodyPart, action) =>
- System.out.println(ct + "(" + action + "s " + bodyPart + ")")
- case ThatsAWrap =>
- System.out.println(ct + "Great cast party, everybody! See ya!")
- done = true
- case _ =>
- System.out.println(ct + "Huh? I'll be in my trailer.")
- }
- }
- }
- System.out.println(ct + "Negotiating...")
- badActor ! NegotiateNewContract
- System.out.println(ct + "Speaking...")
- badActor ! Speak("Do ya feel lucky, punk?")
- System.out.println(ct + "Gesturing...")
- badActor ! Gesture("face", "grimaces")
- System.out.println(ct + "Speaking again...")
- badActor ! Speak("Well, do ya?")
- System.out.println(ct + "Wrapping up")
- badActor ! ThatsAWrap
- }
- }
運行這個新示例,就會非常明確地發(fā)現(xiàn)確實有兩個不同的線程:
◆main 線程(所有 Java 程序都以它開始)
◆Thread-2 線程,它是 Scala Actors 庫在幕后生成的
因此,在啟動第一個 actor 時,本質(zhì)上已經(jīng)開始了多線程執(zhí)行。
但是,習慣這種新的執(zhí)行模型可能有點兒困難,因為這是一種全新的并發(fā)性考慮方式。例如,請考慮 前一篇文章 中的 Producer/Consumer 模型。那里有大量代碼,尤其是在 Drop 類中,我們可以清楚地看到線程之間,以及線程與保證所有東西同步的監(jiān)視器之間有哪些交互活動。為了便于參考,我在這里給出前一篇文章中的 V3 代碼:
清單 5. ProdConSample,v3 (Scala)
- package com.tedneward.scalaexamples.scala.V3
- {
- import concurrent.MailBox
- import concurrent.ops._
- object ProdConSample
- {
- class Drop
- {
- private val m = new MailBox()
- private case class Empty()
- private case class Full(x : String)
- m send Empty() // initialization
- def put(msg : String) : Unit =
- {
- m receive
- {
- case Empty() =>
- m send Full(msg)
- }
- }
- def take() : String =
- {
- m receive
- {
- case Full(msg) =>
- m send Empty(); msg
- }
- }
- }
- def main(args : Array[String]) : Unit =
- {
- // Create Drop
- val drop = new Drop()
- // Spawn Producer
- spawn
- {
- val importantInfo : Array[String] = Array(
- "Mares eat oats",
- "Does eat oats",
- "Little lambs eat ivy",
- "A kid will eat ivy too"
- );
- importantInfo.foreach((msg) => drop.put(msg))
- drop.put("DONE")
- }
- // Spawn Consumer
- spawn
- {
- var message = drop.take()
- while (message != "DONE")
- {
- System.out.format("MESSAGE RECEIVED: %s%n", message)
- message = drop.take()
- }
- }
- }
- }
- }
盡管看到 Scala 如何簡化這些代碼很有意思,但是它實際上與原來的 Java 版本沒有概念性差異?,F(xiàn)在,看看如果把 Producer/Consumer 示例的基于 actor 的版本縮減到最基本的形式,它會是什么樣子:
清單 6. Take 1,開拍!生產(chǎn)!消費!
- object ProdConSample1
- {
- case class Message(msg : String)
- def main(args : Array[String]) : Unit =
- {
- val consumer =
- actor
- {
- var done = false
- while (! done)
- {
- receive
- {
- case msg =>
- System.out.println("Received message! -> " + msg)
- done = (msg == "DONE")
- }
- }
- }
- consumer ! "Mares eat oats"
- consumer ! "Does eat oats"
- consumer ! "Little lambs eat ivy"
- consumer ! "Kids eat ivy too"
- consumer ! "DONE"
- }
- }
第一個版本確實簡短多了,而且在某些情況下可能能夠完成所需的所有工作;但是,如果運行這段代碼并與以前的版本做比較,就會發(fā)現(xiàn)一個重要的差異 — 基于 actor 的版本是一個多位置緩沖區(qū),而不是我們以前使用的單位置緩沖。這看起來是一項改進,而不是缺陷,但是我們要通過對比確認這一點。我們來創(chuàng)建 Drop 的基于 actor 的版本,在這個版本中所有對 put() 的調(diào)用必須由對 take() 的調(diào)用進行平衡。
幸運的是,Scala Actors 庫很容易模擬這種功能。希望讓 Producer 一直阻塞,直到 Consumer 接收了消息;實現(xiàn)的方法很簡單:讓 Producer 一直阻塞,直到它從 Consumer 收到已經(jīng)接收消息的確認。從某種意義上說,這就是以前的基于監(jiān)視器的代碼所做的,那個版本通過對鎖對象使用監(jiān)視器發(fā)送這種信號。
#p#
在 Scala Actors 庫中,最容易的實現(xiàn)方法是使用 !? 方法而不是 ! 方法(這樣就會一直阻塞到收到確認時)。(在 Scala Actors 實現(xiàn)中,每個 Java 線程都是一個 actor,所以回復會發(fā)送到與 main 線程隱式關聯(lián)的郵箱)。這意味著 Consumer 需要發(fā)送某種確認;這要使用隱式繼承的 reply(它還繼承 receive 方法),見清單 7:
清單 7. Take 2,開拍!
- object ProdConSample2
- {
- case class Message(msg : String)
- def main(args : Array[String]) : Unit =
- {
- val consumer =
- actor
- {
- var done = false
- while (! done)
- {
- receive
- {
- case msg =>
- System.out.println("Received message! -> " + msg)
- done = (msg == "DONE")
- reply("RECEIVED")
- }
- }
- }
- System.out.println("Sending....")
- consumer !? "Mares eat oats"
- System.out.println("Sending....")
- consumer !? "Does eat oats"
- System.out.println("Sending....")
- consumer !? "Little lambs eat ivy"
- System.out.println("Sending....")
- consumer !? "Kids eat ivy too"
- System.out.println("Sending....")
- consumer !? "DONE"
- }
- }
如果喜歡使用 spawn 把 Producer 放在 main() 之外的另一個線程中(這非常接近最初的代碼),那么代碼可能像清單 8 這樣:
清單 8. Take 4,開拍!
- object ProdConSampleUsingSpawn
- {
- import concurrent.ops._
- def main(args : Array[String]) : Unit =
- {
- // Spawn Consumer
- val consumer =
- actor
- {
- var done = false
- while (! done)
- {
- receive
- {
- case msg =>
- System.out.println("MESSAGE RECEIVED: " + msg)
- done = (msg == "DONE")
- reply("RECEIVED")
- }
- }
- }
- // Spawn Producer
- spawn
- {
- val importantInfo : Array[String] = Array(
- "Mares eat oats",
- "Does eat oats",
- "Little lambs eat ivy",
- "A kid will eat ivy too",
- "DONE"
- );
- importantInfo.foreach((msg) => consumer !? msg)
- }
- }
- }
無論從哪個角度來看,基于 actor 的版本都比原來的版本簡單多了。讀者只要讓 actor 和隱含的郵箱自己發(fā)揮作用即可。
但是,這并不簡單。actor 模型完全顛覆了考慮并發(fā)性和線程安全的整個過程;在以前的模型中,我們主要關注共享的數(shù)據(jù)結構(數(shù)據(jù)并發(fā)性),而現(xiàn)在主要關注操作數(shù)據(jù)的代碼本身的結構(任務并發(fā)性),盡可能少共享數(shù)據(jù)。請注意 Producer/Consumer 示例的不同版本的差異。在以前的示例中,并發(fā)功能是圍繞 Drop 類(有界限的緩沖區(qū))顯式編寫的。在本文中的版本中,Drop 甚至沒有出現(xiàn),重點在于兩個 actor(線程)以及它們之間的交互(通過不共享任何東西的消息)。
當然,仍然可以用 actor 構建以數(shù)據(jù)為中心的并發(fā)構造;只是必須采用稍有差異的方式。請考慮一個簡單的 “計數(shù)器” 對象,它使用 actor 消息傳達 “increment” 和 “get” 操作,見清單 9:
清單 9. Take 5,計數(shù)!
- object CountingSample
- {
- case class Incr
- case class Value(sender : Actor)
- case class Lock(sender : Actor)
- case class UnLock(value : Int)
- class Counter extends Actor
- {
- override def act(): Unit = loop(0)
- def loop(value: int): Unit = {
- receive {
- case Incr() => loop(value + 1)
- case Value(a) => a ! value; loop(value)
- case Lock(a) => a ! value
- receive { case UnLock(v) => loop(v) }
- case _ => loop(value)
- }
- }
- }
- def main(args : Array[String]) : Unit =
- {
- val counter = new Counter
- counter.start()
- counter ! Incr()
- counter ! Incr()
- counter ! Incr()
- counter ! Value(self)
- receive { case cvalue => Console.println(cvalue) }
- counter ! Incr()
- counter ! Incr()
- counter ! Value(self)
- receive { case cvalue => Console.println(cvalue) }
- }
- }
#p#
為了進一步擴展 Producer/Consumer 示例,清單 10 給出一個在內(nèi)部使用 actor 的 Drop 版本(這樣,其他 Java 類就可以使用這個 Drop,而不需要直接調(diào)用 actor 的方法):
清單 10. 在內(nèi)部使用 actor 的 Drop
- object ActorDropSample
- {
- class Drop
- {
- private case class Put(x: String)
- private case object Take
- private case object Stop
- private val buffer =
- actor
- {
- var data = ""
- loop
- {
- react
- {
- case Put(x) if data == "" =>
- data = x; reply()
- case Take if data != "" =>
- val r = data; data = ""; reply(r)
- case Stop =>
- reply(); exit("stopped")
- }
- }
- }
- def put(x: String) { buffer !? Put(x) }
- def take() : String = (buffer !? Take).asInstanceOf[String]
- def stop() { buffer !? Stop }
- }
- def main(args : Array[String]) : Unit =
- {
- import concurrent.ops._
- // Create Drop
- val drop = new Drop()
- // Spawn Producer
- spawn
- {
- val importantInfo : Array[String] = Array(
- "Mares eat oats",
- "Does eat oats",
- "Little lambs eat ivy",
- "A kid will eat ivy too"
- );
- importantInfo.foreach((msg) => { drop.put(msg) })
- drop.put("DONE")
- }
- // Spawn Consumer
- spawn
- {
- var message = drop.take()
- while (message != "DONE")
- {
- System.out.format("MESSAGE RECEIVED: %s%n", message)
- message = drop.take()
- }
- drop.stop()
- }
- }
- }
可以看到,這需要更多代碼(和更多的線程,因為每個 actor 都在一個線程池內(nèi)部起作用),但是這個版本的 API 與以前的版本相同,它把所有與并發(fā)性相關的代碼都放在 Drop 內(nèi)部,這正是 Java 開發(fā)人員所期望的。
actor 還有更多特性。
在規(guī)模很大的系統(tǒng)中,讓每個 actor 都由一個 Java 線程支持是非常浪費資源的,尤其是在 actor 的等待時間比處理時間長的情況下。在這些情況下,基于事件的 actor 可能更合適;這種 actor 實際上放在一個閉包中,閉包捕捉 actor 的其他動作。也就是說,現(xiàn)在并不通過線程狀態(tài)和寄存器表示代碼塊(函數(shù))。當一個消息到達 actor 時(這時顯然需要活動的線程),觸發(fā)閉包,閉包在它的活動期間借用一個活動的線程,然后通過回調(diào)本身終止或進入 “等待” 狀態(tài),這樣就會釋放線程。(請參見 參考資料 中 Haller/Odersky 的文章)。
在 Scala Actors 庫中,這要使用 react 方法而不是前面使用的 receive。使用 react 的關鍵是在形式上 react 不能返回,所以 react 中的實現(xiàn)必須重復調(diào)用包含 react 塊的代碼塊。簡便方法是使用 loop 結構創(chuàng)建一個接近無限的循環(huán)。這意味著 清單 10 中的 Drop 實現(xiàn)實際上只通過借用調(diào)用者的線程執(zhí)行操作,這會減少執(zhí)行所有操作所需的線程數(shù)。(在實踐中,我還沒有見過在簡單的示例中出現(xiàn)這種效果,所以我想我們只能暫且相信 Scala 設計者的說法)。
在某些情況下,可能選擇通過派生基本的 Actor 類(在這種情況下,必須定義 act 方法,否則類仍然是抽象的)創(chuàng)建一個新類,它隱式地作為 actor 執(zhí)行。盡管這是可行的,但是這種思想在 Scala 社區(qū)中不受歡迎;在一般情況下,我在這里描述的方法(使用 Actor 對象中的 actor 方法)是創(chuàng)建 actor 的首選方法。
結束語
因為 actor 編程需要與 “傳統(tǒng)” 對象編程不同的風格,所以在使用 actor 時要記住幾點。
首先,actor 的主要能力來源于消息傳遞風格,而不采用阻塞-調(diào)用風格,這是它的主要特點。(有意思的是,也有使用消息傳遞作為核心機制的面向?qū)ο笳Z言。最知名的兩個例子是 Objective-C 和 Smalltalk,還有 ThoughtWorker 的 Ola Bini 新創(chuàng)建的 Ioke)。如果創(chuàng)建直接或間接擴展 Actor 的類,那么要確保對對象的所有調(diào)用都通過消息傳遞進行。
第二,因為可以在任何時候交付消息,而且更重要的是,在發(fā)送和接收之間可能有相當長的延遲,所以一定要確保消息攜帶正確地處理它們所需的所有狀態(tài)。這種方式會:
讓代碼更容易理解(因為消息攜帶處理所需的所有狀態(tài))。
減少 actor 訪問某些地方的共享狀態(tài)的可能性,從而減少發(fā)生死鎖或其他并發(fā)性問題的機會。
第三,actor 應該不會阻塞,您從前面的內(nèi)容應該能夠看出這一點。從本質(zhì)上說,阻塞是導致死鎖的原因;代碼可能產(chǎn)生的阻塞越少,發(fā)生死鎖的可能性就越低。
很有意思的是,如果您熟悉 Java Message Service (JMS) API,就會發(fā)現(xiàn)我給出的這些建議在很大程度上也適用于 JMS — 畢竟,actor 消息傳遞風格只是在實體之間傳遞消息,JMS 消息傳遞也是在實體之間傳遞消息。它們的差異在于,JMS 消息往往比較大,在層和進程級別上操作;而 actor 消息往往比較小,在對象和線程級別上操作。如果您掌握了 JMS,actor 也不難掌握。
actor 并不是解決所有并發(fā)性問題的萬靈藥,但是它們?yōu)閼贸绦蚧驇齑a的建模提供了一種新的方式,所用的構造相當簡單明了。盡管它們的工作方式有時與您預期的不一樣,但是一些行為正是我們所熟悉的 — 畢竟,我們在最初使用對象時也有點不習慣,只要經(jīng)過努力,您也會掌握并喜歡上 actor。
本文來自IBMDW中國:《面向 Java 開發(fā)人員的 Scala 指南: 深入了解 Scala 并發(fā)性》。
【相關閱讀】