自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

基于Redis實現(xiàn)消息隊列的實踐

開發(fā) 前端
不支持消費者確認(rèn)機制,穩(wěn)定性不能得到保證,例如當(dāng)消費者獲取到消息之后,還沒來得及執(zhí)行就宕機了。因為沒有消費者確認(rèn)機制,Redis 就會誤以為消費者已經(jīng)執(zhí)行了,因此就不會重復(fù)發(fā)送未被正常消費的消息了,這樣整體的 Redis 穩(wěn)定性就被沒有辦法得到保障了。

為什么要基于Redis實現(xiàn)消費隊列?

消息隊列是一種典型的發(fā)布/訂閱模式,是專門為異步化應(yīng)用和分布式系統(tǒng)設(shè)計的,具有高性能、穩(wěn)定性及可伸縮性的特點,是開發(fā)分布式系統(tǒng)和應(yīng)用系統(tǒng)必備的技術(shù)之一。目前,針對不同的業(yè)務(wù)場景,比較成熟可靠的消息中間件產(chǎn)品有RocketMQ、Kafka、RabbitMq等,基于Redis再去實現(xiàn)一個消息隊列少有提及,那么已經(jīng)有很成熟的產(chǎn)品可以選擇,還有必要再基于Redis自己來實現(xiàn)一個消息隊列嗎?基于Redis實現(xiàn)的消息隊列有什么特別的地方嗎?

先來回顧一個Redis有哪些特性:

  1. 速度快:Redis是基于內(nèi)存的key-value類型的數(shù)據(jù)庫,數(shù)據(jù)都存放在內(nèi)存中,使得讀寫速度非常快,能夠達(dá)到每秒數(shù)十萬次的讀寫操作。
  2. 鍵值對的數(shù)據(jù)結(jié)構(gòu):Redis中的數(shù)據(jù)以鍵值對的形式存儲,使得查詢和操作數(shù)據(jù)非常方便和高效。
  3. 功能豐富:Redis具有許多實用的功能,例如鍵過期、發(fā)布訂閱、Lua腳本、事務(wù)和管道等。這些功能使得Redis能夠廣泛應(yīng)用于各種場景,如緩存、消息系統(tǒng)等。
  4. 持久化:Redis提供了兩種持久化方案,即RDB(根據(jù)時間生成數(shù)據(jù)快照)和AOF(以追加方式記錄每次寫操作)。兩種方案可以互相配合,確保數(shù)據(jù)的安全性。
  5. 主從復(fù)制:Redis支持主從復(fù)制功能,可以輕松實現(xiàn)數(shù)據(jù)備份和擴展。主節(jié)點會將其數(shù)據(jù)復(fù)制給從節(jié)點,從而實現(xiàn)數(shù)據(jù)的冗余和備份。
  6. 高可用和分布式:Redis從2.8版本開始提供了高可用實現(xiàn)哨兵模式,可以保證節(jié)點的故障發(fā)現(xiàn)和故障自動轉(zhuǎn)移。此外,Redis從3.0版本開始支持集群模式,可以輕松實現(xiàn)數(shù)據(jù)的分布式存儲和擴展。

總結(jié)一下:redis的特點就是:快、簡單、穩(wěn)定;

以RocketMQ為代表,作為專業(yè)的消息中間件而言,有哪些特性呢:

  1. 高性能、高可靠:RocketMQ采用分布式架構(gòu),能夠高效地處理大量消息,同時也具有高可靠性的特性,能夠保證消息的不丟失和正確傳遞。
  2. 高實時:RocketMQ支持消息的實時傳遞,能夠滿足實時交易系統(tǒng)的需求,為系統(tǒng)提供及時、準(zhǔn)確的消息。
  3. 事務(wù)消息:RocketMQ支持事務(wù)消息,能夠在消息發(fā)送和接收過程中保持事務(wù)的一致性,確保消息的可靠性和系統(tǒng)的穩(wěn)定性。
  4. 順序消息:RocketMQ可以保證消息的有序性,無論是在一個生產(chǎn)者還是多個生產(chǎn)者之間,都能保證消息按照發(fā)送順序進(jìn)行消費。
  5. 批量消息:RocketMQ支持批量消息,能夠一次性發(fā)送多條消息,提高消息發(fā)送效率。
  6. 定時消息:RocketMQ支持定時消息,能夠在指定的時間將消息發(fā)送到指定的Topic,滿足定時任務(wù)的需求。
  7. 消息回溯:RocketMQ支持消息回溯,能夠根據(jù)需要將消息重新發(fā)送到指定的Topic,便于調(diào)試和錯誤處理。
  8. 多種消息模式:RocketMQ支持發(fā)布/訂閱、點對點、群聊等多種消息模式,適用于不同的業(yè)務(wù)場景。
  9. 可擴展性:RocketMQ采用分布式架構(gòu),能夠方便地擴展消息處理能力,支持多個生產(chǎn)者和消費者同時處理消息。
  10. 多語言支持:RocketMQ提供多種語言的客戶端庫,支持包括Java、Python、C++等在內(nèi)的多種編程語言。

總結(jié)一下:RocketMQ的特點就是除了性能非常高、系統(tǒng)本身的功能比較專業(yè)、完善,能適應(yīng)非常多的場景;

從上述分析可以看出,Redis隊列和MQ消息隊列各有優(yōu)勢,Redis的最大特點就是快,所以基于Redis的消息隊列相比MQ消息隊列而言,更適合實時處理,但是基于Redis的消息隊列更易受服務(wù)器內(nèi)存限制;而RocketMQ消息隊列作為專業(yè)的消息中間件產(chǎn)品,功能更完善,更適合應(yīng)用于比較復(fù)雜的業(yè)務(wù)場景,可以實現(xiàn)離線消息發(fā)送、消息可靠投遞以及消息的安全性,但MQ消息隊列的讀寫性能略低于Redis隊列。在技術(shù)選型時,除了上述的因素外,還有一個需要注意:大多數(shù)系統(tǒng)都會引入Redis作為基礎(chǔ)的緩存中間件使用,如果要選用RocketMQ的話,還需要額外再申請資源進(jìn)行部署。

很多時候,所謂的優(yōu)點和缺點,只是針對特定場景而言,如果場景不一樣了,優(yōu)點可能會變成缺點,缺點也可能會變成優(yōu)點。因此,除了專業(yè)的消息中間件外,基于Redis實現(xiàn)一個消息隊列也是有必要的,在某些特殊的業(yè)務(wù)場景,比如一些并發(fā)量不是很高的管理系統(tǒng),某些業(yè)務(wù)流程需要異步化處理,這時選擇基于Redis自己實現(xiàn)一個消息隊列,也是一個比較好的選擇。這也是本篇文章主要分享的內(nèi)容。

消息隊列的基礎(chǔ)知識:

什么是隊列?

隊列(Queue)是一種數(shù)據(jù)結(jié)構(gòu),遵循先進(jìn)先出(FIFO)的原則。在隊列中,元素被添加到末尾(入隊),并從開頭移除(出隊)。

圖片

Java中有哪些隊列?

  1. LinkedList:LinkedList實現(xiàn)了Deque接口,可以作為隊列(FIFO)或棧(LIFO)使用。它是一個雙向鏈表,所以插入和刪除操作具有很高的效率。
  2. ArrayDeque:ArrayDeque也是一個雙端隊列,具有高效的插入和刪除操作。與LinkedList相比,ArrayDeque通常在大多數(shù)操作中表現(xiàn)得更快,因為它在內(nèi)部使用動態(tài)數(shù)組。
  3. PriorityQueue:PriorityQueue是一個優(yōu)先隊列,它保證隊列頭部總是最小元素。你可以自定義元素的排序規(guī)則。
  4. ConcurrentLinkedQueue:ConcurrentLinkedQueue是一個線程安全的隊列,它使用無鎖算法進(jìn)行并發(fā)控制。它適用于高并發(fā)場景,但在低并發(fā)場景中可能比其他隊列慢。
  5. LinkedBlockingQueue:LinkedBlockingQueue是一個線程安全的阻塞隊列,它使用鏈表數(shù)據(jù)結(jié)構(gòu)來存儲數(shù)據(jù)。當(dāng)隊列為空時,獲取元素的操作將會被阻塞;當(dāng)隊列已滿時,插入元素的操作將會被阻塞。
  6. ArrayBlockingQueue:ArrayBlockingQueue是一個線程安全的阻塞隊列,它使用數(shù)組數(shù)據(jù)結(jié)構(gòu)來存儲數(shù)據(jù)。與LinkedBlockingQueue相比,ArrayBlockingQueue的容量是固定的。
  7. PriorityBlockingQueue:PriorityBlockingQueue是一個線程安全的優(yōu)先阻塞隊列。與PriorityQueue類似,它保證隊列頭部總是最小元素。
  8. SynchronousQueue:SynchronousQueue是一個線程安全的阻塞隊列,它只包含一個元素。當(dāng)隊列為空時,獲取元素的操作將會被阻塞;當(dāng)隊列已滿時,插入元素的操作將會被阻塞。
  9. DelayQueue:DelayQueue是一個無界阻塞隊列,用于放置實現(xiàn)了Delayed接口的對象,其中的對象只能在其到期時才能從隊列中取走。

LinkedBlockingQueue

以LinkedBlockingQueue為例,其使用方法是這樣的:

創(chuàng)建了一個生產(chǎn)者線程和一個消費者線程,生產(chǎn)者線程和消費者線程分別對同一個LinkedBlockingQueue對象進(jìn)行操作。生產(chǎn)者線程通過調(diào)用put()方法將元素添加到隊列中,而消費者線程通過調(diào)用take()方法從隊列中取出元素。這兩個方法都會阻塞線程,直到隊列中有元素可供取出或有空間可供添加元素。

import java.util.concurrent.LinkedBlockingQueue;  
  
public class LinkedBlockingQueueExample {  
    public static void main(String[] args) {  
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();  
  
        // 生產(chǎn)者線程  
        new Thread(() -> {  
            for (int i = 0; i < 10; i++) {  
                try {  
                    queue.put("Element " + i);  
                    System.out.println("Produced: Element " + i);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        }).start();  
        // 消費者線程  
        new Thread(() -> {  
            for (int i = 0; i < 10; i++) {  
                try {  
                    String element = queue.take();  
                    System.out.println("Consumed: " + element);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        }).start();  
    }  
}


基于Redis實現(xiàn)消息隊列的幾種方式

基于List數(shù)據(jù)類型

  • List 類型實現(xiàn)的方式最為簡單和直接,它主要是通過 lpush、rpop 存入和讀取實現(xiàn)消息隊列的,如下圖所示:

圖片圖片

  • lpush 可以把最新的消息存儲到消息隊列(List 集合)的首部,而 rpop 可以讀取消息隊列的尾部,這樣就實現(xiàn)了先進(jìn)先出;
  • 優(yōu)點:使用 List 實現(xiàn)消息隊列的優(yōu)點是消息可以被持久化,List 可以借助 Redis 本身的持久化功能,AOF 或者是 RDB 或混合持久化的方式,用于把數(shù)據(jù)保存至磁盤,這樣當(dāng) Redis 重啟之后,消息不會丟失。
  • 缺點:基于List類型實現(xiàn)的消息隊列不支持重復(fù)消費、沒有按照主題訂閱的功能、不支持消費消息確認(rèn)等功能,如果確實需要,需要自己實現(xiàn)。

基于Zset數(shù)據(jù)類型

  • 基于ZSet數(shù)據(jù)類型實現(xiàn)消息隊列,是利用 zadd 和 zrangebyscore 來實現(xiàn)存入和讀取消息的。
  • 優(yōu)點:和基于List數(shù)據(jù)類型差不多,同樣具備持久化的功能,不同的是消息數(shù)據(jù)存儲的結(jié)構(gòu)類型不一樣;
  • 缺點:List 存在的問題它也同樣存在,不支持重復(fù)消費,沒有主題訂閱功能,不支持消費消息確認(rèn),并且使用 ZSet 還不能存儲相同元素的值。因為它是有序集合,有序集合的存儲元素值是不能重復(fù)的,但分值可以重復(fù),也就是說當(dāng)消息值重復(fù)時,只能存儲一條信息在 ZSet 中。

基于發(fā)布訂閱模式

  • 基于發(fā)布訂閱模式,是使用Pattern Subscribe 的功能實現(xiàn)主題訂閱的功能,也就是 。因此我們可以使用一個消費者“queue_*”來訂閱所有以“queue_”開頭的消息隊列,如下圖所示:
  • 優(yōu)點:可以按照主題訂閱方式
  • 缺點:

無法持久化保存消息,如果 Redis 服務(wù)器宕機或重啟,那么所有的消息將會丟失;

發(fā)布訂閱模式是“發(fā)后既忘”的工作模式,如果有訂閱者離線重連之后就不能消費之前的歷史消息;

不支持消費者確認(rèn)機制,穩(wěn)定性不能得到保證,例如當(dāng)消費者獲取到消息之后,還沒來得及執(zhí)行就宕機了。因為沒有消費者確認(rèn)機制,Redis 就會誤以為消費者已經(jīng)執(zhí)行了,因此就不會重復(fù)發(fā)送未被正常消費的消息了,這樣整體的 Redis 穩(wěn)定性就被沒有辦法得到保障了。

基于Stream類型

基于Stream 類型實現(xiàn):使用 Stream 的 xadd 和 xrange 來實現(xiàn)消息的存入和讀取了,并且 Stream 提供了 xack 手動確認(rèn)消息消費的命令,用它我們就可以實現(xiàn)消費者確認(rèn)的功能了,使用命令如下:

127.0.0.1:6379> xack mq group1 1580959593553-0
(integer) 1

消費確認(rèn)增加了消息的可靠性,一般在業(yè)務(wù)處理完成之后,需要執(zhí)行 ack 確認(rèn)消息已經(jīng)被消費完成,整個流程的執(zhí)行如下圖所示:

其中“Group”為群組,消費者也就是接收者需要訂閱到群組才能正常獲取到消息。

以上就是基于Redis實現(xiàn)消息隊列的幾種方式的簡單對比介紹,下面主要是分享一下基于Redis的List數(shù)據(jù)類型實現(xiàn),其他幾種方式,有興趣的小伙可以自己嘗試一下。

基于Redis的List數(shù)據(jù)類型實現(xiàn)消費隊列的工作原理是什么?

Redis基于List結(jié)構(gòu)實現(xiàn)隊列的原理主要依賴于List的push和pop操作。

在Redis中,你可以使用LPUSH命令將一個或多個元素推入列表的左邊,也就是列表頭部。同樣,你可以使用RPUSH命令將一個或多個元素推入列表的右邊,也就是列表尾部。

對于隊列來說,新元素總是從隊列的頭部進(jìn)入,而讀取操作總是從隊列的尾部開始。因此,當(dāng)你想將一個新元素加入隊列時,你可以使用LPUSH命令。當(dāng)你想從隊列中取出一個元素時,你可以使用RPOP命令。

此外,Redis還提供了BRPOP命令,這是一個阻塞的RPOP版本。如果給定列表內(nèi)沒有任何元素可供彈出的話,將阻塞連接直到等待超時或發(fā)現(xiàn)可彈出元素為止。

需要注意的是,雖然Redis能夠提供原子性的push和pop操作,但是在并發(fā)環(huán)境下使用隊列時,仍然需要考慮線程安全和并發(fā)控制的問題。你可能需要使用Lua腳本或者其他機制來確保并發(fā)操作的正確性。

總的來說,Redis通過提供List數(shù)據(jù)結(jié)構(gòu)以及一系列相關(guān)命令,可以很方便地實現(xiàn)隊列的功能。

下面是Redis關(guān)于List數(shù)據(jù)結(jié)構(gòu)操作的命令主要包括以下幾種:

  • LPUSH key value:將一個或多個值插入到列表的頭部。
  • RPUSH key value:將一個或多個值插入到列表的尾部。
  • LPOP key:移除并獲取列表的第一個元素。
  • RPOP key:移除并獲取列表的最后一個元素。
  • LRANGE key start stop:獲取指定索引范圍內(nèi)的元素。
  • LINDEX key index:獲取指定索引位置的元素。
  • LLEN key:獲取列表的長度。
  • LREM key count value:移除列表中指定數(shù)量的特定元素。
  • BRPOP key [key ...] timeout:移出并獲取列表的最后一個元素,如果列表沒有元素會阻塞直到等待超時或發(fā)現(xiàn)可彈出元素為止。

基于Redis的List數(shù)據(jù)類型實現(xiàn)延遲消息隊列實戰(zhàn)

需求描述

以一個實際需求為例,演示一個基于Redis的延遲隊列是怎么使用的?

有一個XX任務(wù)管理的功能,主要的業(yè)務(wù)過程:

1、創(chuàng)建任務(wù)后;

2、不斷檢查任務(wù)的狀態(tài),任務(wù)的狀態(tài)有三種:待執(zhí)行、執(zhí)行中、執(zhí)行完成;

3、如果任務(wù)狀態(tài)是執(zhí)行完成后,主動獲取任務(wù)執(zhí)行結(jié)果,對任務(wù)執(zhí)行結(jié)果進(jìn)行處理;如果任務(wù)狀態(tài)是待執(zhí)行、執(zhí)行中,則延遲5秒后,再次查詢?nèi)蝿?wù)執(zhí)行狀態(tài);

圖片圖片

實現(xiàn)方案

1、依賴引入

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-redis</artifactId>
    <version>1.4.7.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.23.1</version>
</dependency>

2、定義三個延遲隊列BeforeQueue、RunningQueue、CompleteQueue,對隊列的任務(wù)進(jìn)行存取,BeforeQueue用于對待執(zhí)行狀態(tài)的任務(wù)的存取,Running用于對執(zhí)行中狀態(tài)的任務(wù)的存取,CompleteQueue用于對執(zhí)行完成狀態(tài)的任務(wù)的存取,在三個任務(wù)隊列中,取出元素是阻塞的,即如果隊列中沒有新的任務(wù),當(dāng)前線程會一直阻塞等待,直到有新的任務(wù)進(jìn)入;如果是隊列中還有元素,則遵循先進(jìn)先出的原則逐個取出進(jìn)行處理;

@Component
@Slf4j
public class BeforeQueue {
    @Autowired
    private RedissonClient redissonClient;


    /**
     * <p>取出元素</p>
     * <p>如果隊列中沒有元素,就阻塞等待,直</p>
     * @return
     */
    public Object take(){
        RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue1");
        Object obj = null;
        try {
            obj = queue1.take();
            log.info("從myqueue1取出元素:{}",obj.toString());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return obj;
    }


    /**
     * <p>放入元素</p>
     * @param obj
     */
    public void offer(Object obj){
        RBlockingDeque<Object> queue1 = redissonClient.getBlockingDeque("queue1");
        RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1);
        delayedQueue.offer(obj,5, TimeUnit.SECONDS);
        log.info("向myqueue1設(shè)置元素:{}",obj.toString());
    }
}
@Component
@Slf4j
public class RunningQueue {
    @Autowired
    private RedissonClient redissonClient;


    public Object take(){
        RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue2");
        Object obj = null;
        try {
            obj = queue1.take();
            log.info("從myqueue2取出元素:{}",obj.toString());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return obj;
    }


    public void offer(Object obj){
        RBlockingDeque<Object> queue1 = redissonClient.getBlockingDeque("queue2");
        RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1);
        delayedQueue.offer(obj,5, TimeUnit.SECONDS);
        log.info("向myqueue2設(shè)置元素:{}",obj.toString());
    }
}
@Component
@Slf4j
public class CompleteQueue {
    @Autowired
    private RedissonClient redissonClient;


    public Object take(){
        RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue3");
        Object obj = null;
        try {
            obj = queue1.take();
            log.info("從CompleteQueue取出元素:{}",obj.toString());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return obj;
    }


    public void offer(Object obj){
        RBlockingQueue<Object> queue1 = redissonClient.getBlockingDeque("queue3");
        RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1);
        delayedQueue.offer(obj,5, TimeUnit.SECONDS);
        log.info("向CompleteQueue設(shè)置元素:{}",obj.toString());
    }
}

3、定義三個監(jiān)聽器BeforeQueueListener、RunningQueueListener、CompleteQueueListener,監(jiān)聽器的主要作用主要就是負(fù)責(zé)監(jiān)聽三個隊列中是否有新的任務(wù) 元素進(jìn)入,如果有,則立即取出消費;如果沒有,則阻塞等待新的元素進(jìn)入,具體的實現(xiàn)邏輯是:新創(chuàng)建的任務(wù)會先放置到BeforeQueue中,BeforeQueueListener監(jiān)聽到有新的任務(wù)進(jìn)入,會取出任務(wù)作一些業(yè)務(wù)處理,業(yè)務(wù)處理完一放入到RunningQueue中,RunningQueueListener監(jiān)聽到有新的任務(wù)進(jìn)入,會取出任務(wù)再進(jìn)行處理,這里的處理主要是查詢?nèi)蝿?wù)執(zhí)行狀態(tài),查詢狀態(tài)結(jié)果主要分兩種情況:1、執(zhí)行中、待執(zhí)行狀態(tài),則把任務(wù)重新放入RunningQueue隊列中,延遲5秒;2、執(zhí)行完成狀態(tài),則把任務(wù)放置到CompleteQueue中;CompleteQueueListener監(jiān)聽到有新的任務(wù)進(jìn)入后,會主動獲取任務(wù)執(zhí)行結(jié)果,作最后業(yè)務(wù)處理;

4、監(jiān)聽器在在處理隊列中的數(shù)據(jù)相關(guān)的業(yè)務(wù)時,如果發(fā)生異常,則需要把取出的元素再重新入入到當(dāng)前隊列中,等待下一輪的重試;

@Component
@Slf4j
public class BeforeQueueListener implements Listener{
    @Autowired
    private BeforeQueue beforeQueue;
    @Autowired
    private RunningQueue runningQueue;
    @Override
    public void start() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    log.info("監(jiān)聽器進(jìn)入阻塞:BeforeQueueListener");
                    Object obj = beforeQueue.take();
                    if (ObjectUtil.isNotNull(obj)) {
                        try {
                            log.info("開始休眠1s模擬業(yè)務(wù)處理:BeforeQueueListener,元素:{}",obj.toString());
                            Thread.currentThread().sleep(1000);
                            log.info("業(yè)務(wù)處理完成:BeforeQueueListener,元素:{}",obj.toString());
                            runningQueue.offer(obj);
                        } catch (InterruptedException e) {
                            log.error("業(yè)務(wù)處理發(fā)生異常,重置元素到BeforeQueue隊列中");
                            log.error(e.getMessage());
                            beforeQueue.offer(obj);
                        }


                    }
                }
            }
        }).start();
    }
}
@Component
@Slf4j
public class RunningQueueListener implements Listener {


    @Autowired
    private RunningQueue runningQueue;
    @Autowired
    private CompleteQueue completeQueue;


    @Override
    public void start() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    log.info("監(jiān)聽器進(jìn)入阻塞:RunningQueueListener");
                    Object obj = runningQueue.take();
                    if (ObjectUtil.isNotNull(obj)) {
                        try {
                            log.info("開始休眠1s模擬業(yè)務(wù)處理:RunningQueueListener,元素:{}", obj.toString());
                            Thread.currentThread().sleep(1000);
                            Random random = new Random();
                            int i = random.nextInt(2);
                            if (i==0) {
                                test();
                            }
                            log.info("業(yè)務(wù)處理完成:RunningQueueListener,元素:{}", obj.toString());
                            completeQueue.offer(obj);
                        } catch (Exception e) {
                            log.error("業(yè)務(wù)處理發(fā)生異常,重置元素到RunningQueue隊列中");
                            log.error(e.getMessage());
                            runningQueue.offer(obj);
                        }
                    }
                }
            }
        }).start();
    }


    public void test(){
        try {
            int i=1/0;
        } catch (Exception e) {
           throw  new RuntimeException("除數(shù)異常");
        }
    }


}
@Component
@Slf4j
public class CompleteQueueListener implements Listener{


    @Autowired
    private CompleteQueue completeQueue;
    @Override
    public void start() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    log.info("監(jiān)聽器進(jìn)入阻塞:CompleteQueueListener");
                    Object obj = completeQueue.take();
                    if (ObjectUtil.isNotNull(obj)) {
                        try {
                            log.info("開始休眠1s模擬業(yè)務(wù)處理:CompleteQueueListener,元素:{}",obj.toString());
                            Thread.currentThread().sleep(1000);
                            log.info("業(yè)務(wù)處理完成:listener3,元素:{}",obj.toString());
                        } catch (InterruptedException e) {
                            log.error("業(yè)務(wù)處理發(fā)生異常,重置元素到CompleteQueue隊列中");
                            log.error(e.getMessage());
                            completeQueue.offer(obj);
                        }
                       log.info("CompleteQueueListener任務(wù)結(jié)束,元素:{}",obj.toString());
                    }
                }
            }
        }).start();
    }
}

5、利用Springboot的擴展點ApplicationRunner,在項目啟動完成后,分別啟動BeforeQueueListener、RunningQueueListener、CompleteQueueListener,讓三個監(jiān)聽器進(jìn)入阻塞監(jiān)聽狀態(tài)

@Component
public class MyRunner implements ApplicationRunner {
    @Autowired
    private ApplicationContext applicationContext;
    @Override
    public void run(ApplicationArguments args) throws Exception {
        Map<String, Listener> beansOfType = applicationContext.getBeansOfType(Listener.class);
        for (String s : beansOfType.keySet()) {
            Listener listener = beansOfType.get(s);
            listener.start();
        }


    }
}

結(jié)果驗證

圖片圖片

一個比較有意思的問題

日志丟失的問題

三個任務(wù)隊列分別有三個線程來進(jìn)行阻塞監(jiān)聽,即如果任務(wù)隊列中有任務(wù)元素,則取出進(jìn)行處理;如果沒有,則阻塞等待,主線程只負(fù)責(zé)把任務(wù)設(shè)置到任務(wù)隊列中,出現(xiàn)的問題是:控制臺的日志輸出顯示任務(wù)元素已經(jīng)放置到第一個BeforeQueue中,按照預(yù)期的結(jié)果應(yīng)該是,控制臺的日志輸出會顯示,從BeforeQueue取出元素進(jìn)行業(yè)務(wù)處理、以及業(yè)務(wù)處理的日志,然后放置到RunningQueue中,再從RunningQueue中取出進(jìn)行業(yè)務(wù)處理,接著放置到CompleteQueue隊列中,最后從CompleteQueue中取出進(jìn)行業(yè)務(wù)處理,最后結(jié)束;實際情況是:總是缺少從BeforeQueue取出元素進(jìn)行業(yè)務(wù)處理、以及業(yè)務(wù)處理的日志,其他的日志輸出都很正常、執(zhí)行結(jié)果也正常;

問題原因

經(jīng)過排查分析,最后找到了原因:

是logback線程安全問題, Logback 的大部分組件都是線程安全的,但某些特定的配置可能會導(dǎo)致線程安全問題。例如,如果你在同一個 Appender 中處理多個線程的日志事件,那么可能會出現(xiàn)線程安全問題,導(dǎo)致某些日志事件丟失。

解決方法

問題原因找到了,其實解決方法也就找到,具體就是logback的異步日志,logback.xml配置如下:

<?xml versinotallow="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
    <!-- 日志存放路徑 -->
    <property name="log.path" value="logs/"/>
    <!-- 日志輸出格式 -->
    <property name="console.log.pattern"
              value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %magenta(${PID:-}) - %green([%-21thread]) %cyan(%-35logger{30}) %msg%n"/>
    <!-- 控制臺輸出 -->
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${console.log.pattern}</pattern>
            <charset>utf-8</charset>
        </encoder>
    </appender>
    <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
        <queueSize>500</queueSize>
        <discardingThreshold>0</discardingThreshold>
        <neverBlock>true</neverBlock>
        <appender-ref ref="console" />
    </appender>
    <!--系統(tǒng)操作日志-->
    <root level="info">
        <appender-ref ref="ASYNC" />
    </root>
</configuration>

文章中展示了關(guān)鍵性代碼,示例全部代碼地址:https://gitcode.net/fox9916/redisson-demo.git

責(zé)任編輯:武曉燕 來源: 凡夫貶夫
相關(guān)推薦

2024-03-22 12:10:39

Redis消息隊列數(shù)據(jù)庫

2022-01-21 19:22:45

RedisList命令

2022-01-15 07:20:18

Redis List 消息隊列

2024-10-25 08:41:18

消息隊列RedisList

2022-04-12 11:15:31

Redis消息隊列數(shù)據(jù)庫

2022-02-28 08:42:49

RedisStream消息隊列

2023-09-12 14:58:00

Redis

2024-05-11 07:29:48

Redis延遲隊列優(yōu)化

2024-05-10 11:35:22

Redis延時隊列數(shù)據(jù)庫

2022-02-11 16:35:41

Redis6.0代碼命令

2021-04-14 13:32:50

Redis輕量級分布式

2022-08-04 10:32:04

Redis命令

2024-04-19 08:32:07

Redis緩存數(shù)據(jù)庫

2017-10-11 15:08:28

消息隊列常見

2024-05-08 14:49:22

Redis延遲隊列業(yè)務(wù)

2025-03-12 07:55:46

2022-05-19 17:50:31

bookie集群延遲消息存儲服務(wù)

2017-04-27 10:07:52

框架設(shè)計實現(xiàn)

2017-08-16 16:30:01

CMQ消息實踐

2024-11-14 11:56:45

點贊
收藏

51CTO技術(shù)棧公眾號