孔乙己:Kotlin生產(chǎn)者消費(fèi)者問(wèn)題的八種解法
本文轉(zhuǎn)載自微信公眾號(hào)「AndroidPub」,作者fundroid。轉(zhuǎn)載本文請(qǐng)聯(lián)系A(chǔ)ndroidPub公眾號(hào)。
生產(chǎn)者和消費(fèi)者問(wèn)題是線程模型中的經(jīng)典問(wèn)題:生產(chǎn)者和消費(fèi)者在同一時(shí)間段內(nèi)共用同一個(gè)緩沖區(qū)(Buffer),生產(chǎn)者往 Buffer 中添加產(chǎn)品,消費(fèi)者從 Buffer 中取走產(chǎn)品,當(dāng) Buffer 為空時(shí),消費(fèi)者阻塞,當(dāng) Buffer 滿時(shí),生產(chǎn)者阻塞。
Kotlin 中有多種方法可以實(shí)現(xiàn)多線程的生產(chǎn)/消費(fèi)模型(大多也適用于Java)
- Synchronized
- ReentrantLock
- BlockingQueue
- Semaphore
- PipedXXXStream
- RxJava
- Coroutine
- Flow
1. Synchronized
Synchronized 是最最基本的線程同步工具,配合 wait/notify 可以實(shí)現(xiàn)實(shí)現(xiàn)生產(chǎn)消費(fèi)問(wèn)題。
- val buffer = LinkedList<Data>()
- val MAX = 5 //buffer最大size
- val lock = Object()
- fun produce(data: Data) {
- sleep(2000) // mock produce
- synchronized(lock) {
- while (buffer.size >= MAX) {
- // 當(dāng)buffer滿時(shí),停止生產(chǎn)
- // 注意此處使用while不能使用if,因?yàn)橛锌赡苁潜涣硪粋€(gè)生產(chǎn)線程而非消費(fèi)線程喚醒,所以要再次檢查buffer狀態(tài)
- // 如果生產(chǎn)消費(fèi)兩把鎖,則不必?fù)?dān)心此問(wèn)題
- lock.wait()
- }
- buffer.push(data)
- // notify方法只喚醒其中一個(gè)線程,選擇哪個(gè)線程取決于操作系統(tǒng)對(duì)多線程管理的實(shí)現(xiàn)。
- // notifyAll會(huì)喚醒所有等待中線程,哪一個(gè)線程將會(huì)第一個(gè)處理取決于操作系統(tǒng)的實(shí)現(xiàn),但是都有機(jī)會(huì)處理。
- // 此處使用notify有可能喚醒的是另一個(gè)生產(chǎn)線程從而造成死鎖,所以必須使用notifyAll
- lock.notifyAll()
- }
- }
- fun consume() {
- synchronized(lock) {
- while (buffer.isEmpty())
- lock.wait() // 暫停消費(fèi)
- buffer.removeFirst()
- lock.notifyAll()
- }
- sleep(2000) // mock consume
- }
- @Test
- fun test() {
- // 同時(shí)啟動(dòng)多個(gè)生產(chǎn)、消費(fèi)線程
- repeat(10) {
- Thread { produce(Data()) }.start()
- }
- repeat(10) {
- Thread { consume() }.start()
- }
- }
2. ReentrantLock
Lock 相對(duì)于 Synchronized 好處是當(dāng)有多個(gè)生產(chǎn)線/消費(fèi)線程時(shí),我們可以通過(guò)定義多個(gè) condition 精確指定喚醒哪一個(gè)。下面的例子展示 Lock 配合 await/single 替換前面 Synchronized 寫法。
- val buffer = LinkedList<Data>()
- val MAX = 5 //buffer最大size
- val lock = ReentrantLock()
- val condition = lock.newCondition()
- fun produce(data: Data) {
- sleep(2000) // mock produce
- lock.lock()
- while (buffer.size >= 5)
- condition.await()
- buffer.push(data)
- condition.signalAll()
- lock.unlock()
- }
- fun consume() {
- lock.lock()
- while (buffer.isEmpty())
- condition.await()
- buffer.removeFirst()
- condition.singleAll()
- lock.unlock()
- sleep(2000) // mock consume
- }
3. BlockingQueue (阻塞隊(duì)列)
BlockingQueue在達(dá)到臨界條件時(shí),再進(jìn)行讀寫會(huì)自動(dòng)阻塞當(dāng)前線程等待鎖的釋放,天然適合這種生產(chǎn)/消費(fèi)場(chǎng)景。
- val buffer = LinkedBlockingQueue<Data>(5)
- fun produce(data: Data) {
- sleep(2000) // mock produce
- buffer.put(data) //buffer滿時(shí)自動(dòng)阻塞
- }
- fun consume() {
- buffer.take() // buffer空時(shí)自動(dòng)阻塞
- sleep(2000) // mock consume
- }
注意 BlockingQueue 的有三組讀/寫方法,只有一組有阻塞效果,不要用錯(cuò)。
方法 | 說(shuō)明 |
---|---|
add(o)/remove(o) | add 方法在添加元素的時(shí)候,若超出了隊(duì)列的長(zhǎng)度會(huì)直接拋出異常 |
offer(o)/poll(o) | offer 在添加元素時(shí),如果發(fā)現(xiàn)隊(duì)列已滿無(wú)法添加的話,會(huì)直接返回false |
put(o)/take(o) | put 向隊(duì)尾添加元素的時(shí)候發(fā)現(xiàn)隊(duì)列已經(jīng)滿了會(huì)發(fā)生阻塞一直等待空間,以加入元素 |
4. Semaphore(信號(hào)量)
Semaphore 是 JUC 提供的一種共享鎖機(jī)制,可以進(jìn)行擁塞控制,此特性可用來(lái)控制 buffer 的大小。
- // canProduce: 可以生產(chǎn)數(shù)量(即buffer可用的數(shù)量),生產(chǎn)者調(diào)用acquire,減少permit數(shù)目
- val canProduce = Semaphore(5)
- // canConsumer:可以消費(fèi)數(shù)量,生產(chǎn)者調(diào)用release,增加permit數(shù)目
- val canConsume = Semaphore(5)
- // 控制buffer訪問(wèn)互斥
- val mutex = Semaphore(0)
- val buffer = LinkedList<Data>()
- fun produce(data: Data) {
- if (canProduce.tryAcquire()) {
- sleep(2000) // mock produce
- mutex.acquire()
- buffer.push(data)
- mutex.release()
- canConsume.release() //通知消費(fèi)端新增加了一個(gè)產(chǎn)品
- }
- }
- fun consume() {
- if (canConsume.tryAcquire()) {
- sleep(2000) // mock consume
- mutex.acquire()
- buffer.removeFirst()
- mutex.release()
- canProduce.release() //通知生產(chǎn)端可以再追加生產(chǎn)
- }
- }
5. PipedXXXStream (管道)
Java 里的管道輸入/輸出流 PipedInputStream / PipedOutputStream 實(shí)現(xiàn)了類似管道的功能,用于不同線程之間的相互通信,輸入流中有一個(gè)緩沖數(shù)組,當(dāng)緩沖數(shù)組為空的時(shí)候,輸入流 PipedInputStream 所在的線程將阻塞。
- val pis: PipedInputStream = PipedInputStream()
- val pos: PipedOutputStream by lazy {
- PipedOutputStream().apply {
- pis.connect(this) //輸入輸出流之間建立連接
- }
- }
- fun produce(data: ContactsContract.Data) {
- while (true) {
- sleep(2000)
- pos.use { // Kotlin 使用 use 方便的進(jìn)行資源釋放
- it.write(data.getBytes())
- it.flush()
- }
- }
- }
- fun consume() {
- while (true) {
- sleep(2000)
- pis.use {
- val byteArray = ByteArray(1024)
- it.read(byteArray)
- }
- }
- }
- @Test
- fun Test() {
- repeat(10) {
- Thread { produce(Data()) }.start()
- }
- repeat(10) {
- Thread { consume() }.start()
- }
- }
6. RxJava
RxJava 從概念上,可以將 Observable/Subject 作為生產(chǎn)者, Subscriber 作為消費(fèi)者, 但是無(wú)論 Subject 或是 Observable 都缺少 Buffer 溢出時(shí)的阻塞機(jī)制,難以獨(dú)立實(shí)現(xiàn)生產(chǎn)者/消費(fèi)者模型。
Flowable 的背壓機(jī)制,可以用來(lái)控制 buffer 數(shù)量,并在上下游之間建立通信, 配合 Atomic 可以變向?qū)崿F(xiàn)單生產(chǎn)者/單消費(fèi)者場(chǎng)景,(不適用于多生產(chǎn)者/多消費(fèi)者場(chǎng)景)。
- class Producer : Flowable<Data>() {
- override fun subscribeActual(subscriber: org.reactivestreams.Subscriber<in Data>) {
- subscriber.onSubscribe(object : Subscription {
- override fun cancel() {
- //...
- }
- private val outStandingRequests = AtomicLong(0)
- override fun request(n: Long) { //收到下游通知,開(kāi)始生產(chǎn)
- outStandingRequests.addAndGet(n)
- while (outStandingRequests.get() > 0) {
- sleep(2000)
- subscriber.onNext(Data())
- outStandingRequests.decrementAndGet()
- }
- }
- })
- }
- }
- class Consumer : DefaultSubscriber<Data>() {
- override fun onStart() {
- request(1)
- }
- override fun onNext(i: Data?) {
- sleep(2000) //mock consume
- request(1) //通知上游可以增加生產(chǎn)
- }
- override fun onError(throwable: Throwable) {
- //...
- }
- override fun onComplete() {
- //...
- }
- }
- @Test
- fun test_rxjava() {
- try {
- val testProducer = Producer)
- val testConsumer = Consumer()
- testProducer
- .subscribeOn(Schedulers.computation())
- .observeOn(Schedulers.single())
- .blockingSubscribe(testConsumer)
- } catch (t: Throwable) {
- t.printStackTrace()
- }
- }
7. Coroutine Channel
協(xié)程中的 Channel 具有擁塞控制機(jī)制,可以實(shí)現(xiàn)生產(chǎn)者消費(fèi)者之間的通信??梢园?Channel 理解為一個(gè)協(xié)程版本的阻塞隊(duì)列,capacity 指定隊(duì)列容量。
- val channel = Channel<Data>(capacity = 5)
- suspend fun produce(data: ContactsContract.Contacts.Data) = run {
- delay(2000) //mock produce
- channel.send(data)
- }
- suspend fun consume() = run {
- delay(2000)//mock consume
- channel.receive()
- }
- @Test
- fun test_channel() {
- repeat(10) {
- GlobalScope.launch {
- produce(Data())
- }
- }
- repeat(10) {
- GlobalScope.launch {
- consume()
- }
- }
- }
此外,Coroutine 提供了 produce 方法,在聲明 Channel 的同時(shí)生產(chǎn)數(shù)據(jù),寫法上更簡(jiǎn)單,適合單消費(fèi)者單生產(chǎn)者的場(chǎng)景:
- fun CoroutineScope.produce(): ReceiveChannel<Data> = produce {
- repeat(10) {
- delay(2000) //mock produce
- send(Data())
- }
- }
- @Test
- fun test_produce() {
- GlobalScope.launch {
- produce.consumeEach {
- delay(2000) //mock consume
- }
- }
- }
8. Coroutine Flow
Flow 跟 RxJava 一樣,因?yàn)槿鄙?Buffer 溢出時(shí)的阻塞機(jī)制,不適合處理生產(chǎn)消費(fèi)問(wèn)題,其背壓機(jī)制也比較簡(jiǎn)單,無(wú)法像 RxJava 那樣收到下游通知。但是 Flow 后來(lái)發(fā)布了 SharedFlow, 作為帶緩沖的熱流,提供了 Buffer 溢出策略,可以用作生產(chǎn)者/消費(fèi)者之間的同步。
- val flow : MutableSharedFlow<Data> = MutableSharedFlow(
- extraBufferCapacity = 5 //緩沖大小
- , onBufferOverflow = BufferOverflow.SUSPEND // 緩沖溢出時(shí)的策略:掛起
- )
- @Test
- fun test() {
- GlobalScope.launch {
- repeat(10) {
- delay(2000) //mock produce
- sharedFlow.emit(Data())
- }
- }
- GlobalScope.launch {
- sharedFlow.collect {
- delay(2000) //mock consume
- }
- }
- }
注意 SharedFlow 也只能用在單生產(chǎn)者/單消費(fèi)者場(chǎng)景。
總結(jié)
生產(chǎn)者/消費(fèi)者問(wèn)題,其本質(zhì)核心還是多線程讀寫共享資源(Buffer)時(shí)的同步問(wèn)題,理論上只要具有同步機(jī)制的多線程框架,例如線程鎖、信號(hào)量、阻塞隊(duì)列、協(xié)程 Channel等,都是可以實(shí)現(xiàn)生產(chǎn)消費(fèi)模型的。
另外,RxJava 和 Flow 雖然也是多線程框架,但是缺少Buffer溢出時(shí)的阻塞機(jī)制,不適用于生產(chǎn)/消費(fèi)場(chǎng)景,更適合在純響應(yīng)式場(chǎng)景中使用。