自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

什么鬼,面試官竟然讓我用Redis實(shí)現(xiàn)一個(gè)消息隊(duì)列!???

存儲(chǔ) 存儲(chǔ)軟件 Redis
眾所周知,redis是一個(gè)高性能的分布式key-value存儲(chǔ)系統(tǒng),在NoSQL數(shù)據(jù)庫市場(chǎng)上,redis自己就占據(jù)了將近半壁江山,足以見到其強(qiáng)大之處。同時(shí),由于redis的單線程特性,我們可以將其用作為一個(gè)消息隊(duì)列。

[[284302]]

眾所周知,redis是一個(gè)高性能的分布式key-value存儲(chǔ)系統(tǒng),在NoSQL數(shù)據(jù)庫市場(chǎng)上,redis自己就占據(jù)了將近半壁江山,足以見到其強(qiáng)大之處。同時(shí),由于redis的單線程特性,我們可以將其用作為一個(gè)消息隊(duì)列。本篇文章就來講講如何將redis整合到spring boot中,并用作消息隊(duì)列的……

一、什么是消息隊(duì)列

“消息隊(duì)列”是在消息的傳輸過程中保存消息的容器。——《百度百科》

消息我們可以理解為在計(jì)算機(jī)中或在整個(gè)計(jì)算機(jī)網(wǎng)絡(luò)中傳遞的數(shù)據(jù)。

隊(duì)列是我們?cè)趯W(xué)習(xí)數(shù)據(jù)結(jié)構(gòu)的時(shí)候?qū)W習(xí)的基本數(shù)據(jù)結(jié)構(gòu)之一,它具有先進(jìn)先出的特性。

所以,消息隊(duì)列就是一個(gè)保存消息的容器,它具有先進(jìn)先出的特性。

為什么會(huì)出現(xiàn)消息隊(duì)列?

  1. 異步:常見的B/S架構(gòu)下,客戶端向服務(wù)器發(fā)送請(qǐng)求,但是服務(wù)器處理這個(gè)消息需要花費(fèi)的時(shí)間很長(zhǎng)的時(shí)間,如果客戶端一直等待服務(wù)器處理完消息,會(huì)造成客戶端的系統(tǒng)資源浪費(fèi);而使用消息隊(duì)列后,服務(wù)器直接將消息推送到消息隊(duì)列中,由專門的處理消息程序處理消息,這樣客戶端就不必花費(fèi)大量時(shí)間等待服務(wù)器的響應(yīng)了;
  2. 解耦:傳統(tǒng)的軟件開發(fā)模式,模塊之間的調(diào)用是直接調(diào)用,這樣的系統(tǒng)很不利于系統(tǒng)的擴(kuò)展,同時(shí),模塊之間的相互調(diào)用,數(shù)據(jù)之間的共享問題也很大,每個(gè)模塊都要時(shí)時(shí)刻刻考慮其他模塊會(huì)不會(huì)掛了;使用消息隊(duì)列以后,模塊之間不直接調(diào)用,而是通過數(shù)據(jù),且當(dāng)某個(gè)模塊掛了以后,數(shù)據(jù)仍舊會(huì)保存在消息隊(duì)列中。最典型的就是生產(chǎn)者-消費(fèi)者模式,本案例使用的就是該模式;
  3. 削峰填谷:某一時(shí)刻,系統(tǒng)的并發(fā)請(qǐng)求暴增,遠(yuǎn)遠(yuǎn)超過了系統(tǒng)的最大處理能力后,如果不做任何處理,系統(tǒng)會(huì)崩潰;使用消息隊(duì)列以后,服務(wù)器把請(qǐng)求推送到消息隊(duì)列中,由專門的處理消息程序以合理的速度消費(fèi)消息,降低服務(wù)器的壓力。

下面一張圖我們來簡(jiǎn)單了解一下消息隊(duì)列

 

由上圖可以看到,消息隊(duì)列充當(dāng)了一個(gè)中間人的角色,我們可以通過操作這個(gè)消息隊(duì)列來保證我們的系統(tǒng)穩(wěn)定。

二、環(huán)境準(zhǔn)備

Java環(huán)境:jdk1.8

spring boot版本:2.2.1.RELEASE

redis-server版本:3.2.100

三、相關(guān)依賴

這里只展示與redis相關(guān)的依賴,

  1. <dependency> 
  2.  
  3.     <groupId>org.springframework.boot</groupId> 
  4.  
  5.     <artifactId>spring-boot-starter-data-redis</artifactId> 
  6.  
  7. </dependency> 
  8.  
  9. <dependency> 
  10.  
  11.     <groupId>org.springframework.integration</groupId> 
  12.  
  13.     <artifactId>spring-integration-redis</artifactId> 
  14.  
  15. </dependency> 

這里解釋一下這兩個(gè)依賴:

  • 第一個(gè)依賴是對(duì)redis NoSQL的支持
  • 第二個(gè)依賴是spring integration與redis的結(jié)合,這里添加這個(gè)代碼主要是為了實(shí)現(xiàn)分布式鎖

四、配置文件

這里只展示與redis相關(guān)的配置

  1. # redis所在的的地址 
  2.  
  3. spring.redis.host=localhost 
  4.  
  5. # redis數(shù)據(jù)庫索引,從0開始,可以從redis的可視化客戶端查看 
  6.  
  7. spring.redis.database=1 
  8.  
  9. # redis的端口,默認(rèn)為6379 
  10.  
  11. spring.redis.port=6379 
  12.  
  13. # redis的密碼 
  14.  
  15. spring.redis.password
  16.  
  17. # 連接redis的超時(shí)時(shí)間(ms),默認(rèn)是2000 
  18.  
  19. spring.redis.timeout=5000 
  20.  
  21. # 連接池最大連接數(shù) 
  22.  
  23. spring.redis.jedis.pool.max-active=16 
  24.  
  25. # 連接池最小空閑連接 
  26.  
  27. spring.redis.jedis.pool.min-idle=0 
  28.  
  29. # 連接池最大空閑連接 
  30.  
  31. spring.redis.jedis.pool.max-idle=16 
  32.  
  33. # 連接池最大阻塞等待時(shí)間(負(fù)數(shù)表示沒有限制) 
  34.  
  35. spring.redis.jedis.pool.max-wait=-1 
  36.  
  37. # 連接redis的客戶端名 
  38.  
  39. spring.redis.client-name=mall 

五、代碼配置

redis用作消息隊(duì)列,其在spring boot中的主要表現(xiàn)為一RedisTemplate.convertAndSend()方法和一個(gè)MessageListener接口。所以我們要在IOC容器中注入一個(gè)RedisTemplate和一個(gè)實(shí)現(xiàn)了MessageListener接口的類。話不多說,先看代碼

配置RedisTemplate

配置RedisTemplate的主要目的是配置序列化方式以解決亂碼問題,同時(shí)合理配置序列化方式還能降低一點(diǎn)性能開銷。

  1. /** 
  2.  
  3.  * 配置RedisTemplate,解決亂碼問題 
  4.  
  5.  */ 
  6.  
  7. @Bean 
  8.  
  9. public RedisTemplate&lt;String, Object&gt; redisTemplate(RedisConnectionFactory factory) { 
  10.  
  11.     LOGGER.debug("redis序列化配置開始"); 
  12.  
  13.     RedisTemplate&lt;String, Object&gt; template = new RedisTemplate&lt;&gt;(); 
  14.  
  15.     template.setConnectionFactory(factory); 
  16.  
  17.     // string序列化方式 
  18.  
  19.     RedisSerializer serializer = new GenericJackson2JsonRedisSerializer(); 
  20.  
  21.     // 設(shè)置默認(rèn)序列化方式 
  22.  
  23.     template.setDefaultSerializer(serializer); 
  24.  
  25.     template.setKeySerializer(new StringRedisSerializer()); 
  26.  
  27.     template.setHashValueSerializer(serializer); 
  28.  
  29.     LOGGER.debug("redis序列化配置結(jié)束"); 
  30.  
  31.     return template; 
  32.  

代碼第12行,我們配置默認(rèn)的序列化方式為GenericJackson2JsonRedisSerializer代碼第13行,我們配置鍵的序列化方式為StringRedisSerializer代碼第14行,我們配置哈希表的值的序列化方式為GenericJackson2JsonRedisSerializer

RedisTemplate幾種序列化方式的簡(jiǎn)要介紹

 

六、redis隊(duì)列監(jiān)聽器(消費(fèi)者)

上面說了,與redis隊(duì)列監(jiān)聽器相關(guān)的類為一個(gè)名為MessageListener的接口,下面是該接口的源碼

public interface MessageListener { void onMessage(Message message, @Nullable byte[] pattern);}

可以看到,該接口僅有一個(gè)onMessage(Message message, @Nullable byte[] pattern)方法,該方法便是監(jiān)聽到隊(duì)列中消息后的回調(diào)方法。下面解釋一下這兩個(gè)參數(shù):

  • message:redis消息類,該類中僅有兩個(gè)方法
    • byte[] getBody()以二進(jìn)制形式獲取消息體
    • byte[] getChannel()以二進(jìn)制形式獲取消息通道
  • pattern:二進(jìn)制形式的消息通道,和message.getChannel()返回值相同

介紹完接口,我們來實(shí)現(xiàn)一個(gè)簡(jiǎn)單的redis隊(duì)列監(jiān)聽器

  1. @Component 
  2.  
  3. public class RedisListener implement MessageListener{ 
  4.  
  5.     private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class); 
  6.  
  7.  
  8.  
  9.     @Override 
  10.  
  11.     public void onMessage(Message message,byte[] pattern){ 
  12.  
  13.         LOGGER.debug("從消息通道={}監(jiān)聽到消息",new String(pattern)); 
  14.  
  15.         LOGGER.debug("從消息通道={}監(jiān)聽到消息",new String(message.getChannel())); 
  16.  
  17.         LOGGER.debug("元消息={}",new String(message.getBody())); 
  18.  
  19.         // 新建一個(gè)用于反序列化的對(duì)象,注意這里的對(duì)象要和前面配置的一樣 
  20.  
  21.         // 因?yàn)槲仪懊嬖O(shè)置的默認(rèn)序列化方式為GenericJackson2JsonRedisSerializer 
  22.  
  23.         // 所以這里的實(shí)現(xiàn)方式為GenericJackson2JsonRedisSerializer 
  24.  
  25.         RedisSerializer serializer=new GenericJackson2JsonRedisSerializer(); 
  26.  
  27.         LOGGER.debug("反序列化后的消息={}",serializer.deserialize(message.getBody())); 
  28.  
  29.     } 
  30.  

代碼很簡(jiǎn)單,就是輸出參數(shù)中包含的關(guān)鍵信息。需要注意的是,RedisSerializer的實(shí)現(xiàn)要與上面配置的序列化方式一致。

隊(duì)列監(jiān)聽器實(shí)現(xiàn)完以后,我們還需要將這個(gè)監(jiān)聽器添加到redis隊(duì)列監(jiān)聽器容器中,代碼如下:

  1. @Bean 
  2.  
  3. public public RedisMessageListenerContainer container(RedisConnectionFactory factory) { 
  4.  
  5.     RedisMessageListenerContainer container = new RedisMessageListenerContainer(); 
  6.  
  7.     container.setConnectionFactory(factory); 
  8.  
  9.     container.addMessageListener(redisListener, new PatternTopic("demo-channel")); 
  10.  
  11.     return container; 
  12.  

這幾行代碼大概意思就是新建一個(gè)Redis消息監(jiān)聽器容器,然后將監(jiān)聽器和管道名想綁定,最后返回這個(gè)容器。

這里要注意的是,這個(gè)管道名和下面將要說的推送消息時(shí)的管道名要一致,不然監(jiān)聽器監(jiān)聽不到消息。

七、redis隊(duì)列推送服務(wù)(生產(chǎn)者)

上面我們配置了RedisTemplate將要在這里使用到。

代碼如下:

  1. @Service 
  2.  
  3. public class Publisher{ 
  4.  
  5.     @Autowrite 
  6.  
  7.     private RedisTemplate redis; 
  8.  
  9.  
  10.  
  11.     public void publish(Object msg){ 
  12.  
  13.         redis.convertAndSend("demo-channel",msg); 
  14.  
  15.     } 
  16.  

關(guān)鍵代碼為第7行,redis.convertAndSend()這個(gè)方法的作用為,向某個(gè)通道(參數(shù)1)推送一條消息(第二個(gè)參數(shù))。

這里還是要注意上面所說的,生產(chǎn)者和消費(fèi)者的通道名要相同。

至此,消息隊(duì)列的生產(chǎn)者和消費(fèi)者已經(jīng)全部編寫完成。

八、遇到的問題及解決辦法

1、spring boot使用log4j2日志框架問題

在我添加了spring-boot-starter-log4j2依賴并在spring-boot-starter-web中排除了spring-boot-starter-logging后,運(yùn)行項(xiàng)目,還是會(huì)提示下面的錯(cuò)誤:

  1. SLF4J: Class path contains multiple SLF4J bindings. 
  2.  
  3. SLF4J: Found binding in [jar:file:.....m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
  4.  
  5. SLF4J: Found binding in [jar:file:.....m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
  6.  
  7. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 
  8.  
  9. SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder] 

這個(gè)錯(cuò)誤就是maven中有多個(gè)日志框架導(dǎo)致的。后來通過依賴分析,發(fā)現(xiàn)在spring-boot-starter-data-redis中,也依賴了spring-boot-starter-logging,解決辦法也很簡(jiǎn)單,下面貼出詳細(xì)代碼

  1. <dependency> 
  2.  
  3.     <groupId>org.springframework.boot</groupId> 
  4.  
  5.     <artifactId>spring-boot-starter-data-redis</artifactId> 
  6.  
  7.     <exclusions> 
  8.  
  9.         <exclusion> 
  10.  
  11.             <groupId>org.springframework.boot</groupId> 
  12.  
  13.             <artifactId>spring-boot-starter-logging</artifactId> 
  14.  
  15.         </exclusion> 
  16.  
  17.     </exclusions> 
  18.  
  19. </dependency> 
  20.  
  21. <dependency> 
  22.  
  23.     <groupId>org.springframework.boot</groupId> 
  24.  
  25.     <artifactId>spring-boot-starter-web</artifactId> 
  26.  
  27.     <exclusions> 
  28.  
  29.         <exclusion> 
  30.  
  31.             <groupId>org.springframework.boot</groupId> 
  32.  
  33.             <artifactId>spring-boot-starter-logging</artifactId> 
  34.  
  35.         </exclusion> 
  36.  
  37.     </exclusions> 
  38.  
  39. </dependency> 
  40.  
  41. <dependency> 
  42.  
  43.     <groupId>org.springframework.boot</groupId> 
  44.  
  45.     <artifactId>spring-boot-starter-log4j2</artifactId> 
  46.  
  47. </dependency> 
  48.  
  49. <dependency> 
  50.  
  51.     <groupId>org.springframework.integration</groupId> 
  52.  
  53.     <artifactId>spring-integration-redis</artifactId> 
  54.  
  55. </dependency> 

2、redis隊(duì)列監(jiān)聽器線程安全問題

redis隊(duì)列監(jiān)聽器的監(jiān)聽機(jī)制是:使用一個(gè)線程監(jiān)聽隊(duì)列,隊(duì)列有未消費(fèi)的消息則取出消息并生成一個(gè)新的線程來消費(fèi)消息。如果你還記得,我開頭說的是由于redis單線程特性,因此我們用它來做消息隊(duì)列,但是如果監(jiān)聽器每次接受一個(gè)消息就生成新的線程來消費(fèi)信息的話,這樣就完全沒有使用到redis的單線程特性,同時(shí)還會(huì)產(chǎn)生線程安全問題。

單一消費(fèi)者(一個(gè)通道只有一個(gè)消費(fèi)者)的解決辦法

最簡(jiǎn)單的辦法莫過于為onMessage()方法加鎖,這樣簡(jiǎn)單粗暴卻很有用,不過這種方式無法控制隊(duì)列監(jiān)聽的速率,且無限制的創(chuàng)造線程最終會(huì)導(dǎo)致系統(tǒng)資源被占光。那如何解決這種情況呢?線程池。在將監(jiān)聽器添加到容器的配置的時(shí)候,RedisMessageListenerContainer類中有一個(gè)方法setTaskExecutor(Executor taskExecutor)可以為監(jiān)聽容器配置線程池。配置線程池以后,所有的線程都會(huì)由該線程池產(chǎn)生,由此,我們可以通過調(diào)節(jié)線程池來控制隊(duì)列監(jiān)聽的速率。

多個(gè)消費(fèi)者(一個(gè)通道有多個(gè)消費(fèi)者)的解決辦法

單一消費(fèi)者的問題相比于多個(gè)消費(fèi)者來說還是較為簡(jiǎn)單,因?yàn)镴ava內(nèi)置的鎖都是只能控制自己程序的運(yùn)行,不能干擾其他的程序的運(yùn)行;然而現(xiàn)在很多時(shí)候我們都是在分布式環(huán)境下進(jìn)行開發(fā),這時(shí)處理多個(gè)消費(fèi)者的情況就很有意義了。

那么這種問題如何解決呢?分布式鎖。

下面來簡(jiǎn)要科普一下什么是分布式鎖:

分布式鎖是指在分布式環(huán)境下,同一時(shí)間只有一個(gè)客戶端能夠從某個(gè)共享環(huán)境中(例如redis)獲取到鎖,只有獲取到鎖的客戶端才能執(zhí)行程序。

然后分布式鎖一般要滿足:排他性(即同一時(shí)間只有一個(gè)客戶端能夠獲取到鎖)、避免死鎖(即超時(shí)后自動(dòng)釋放)、高可用(即獲取或釋放鎖的機(jī)制必須高可用且性能佳)

上面講依賴的時(shí)候,我們導(dǎo)入了一個(gè)spring-integration-redis依賴,這個(gè)依賴?yán)锩姘撕芏鄬?shí)用的工具類,而我們接下來要講的分布式鎖就是這個(gè)依賴下面的一個(gè)工具包RedisLockRegistry。

首先講一下如何使用,導(dǎo)入了依賴以后,首先配置一個(gè)Bean

  1. @Bean 
  2.  
  3. public RedisLockRegistry redisLockRegistry(RedisConnectionFactory factory) { 
  4.  
  5.     return new RedisLockRegistry(factory, "demo-lock",60); 
  6.  

RedisLockRegistry的構(gòu)造函數(shù),第一個(gè)參數(shù)是redis連接池,第二個(gè)參數(shù)是鎖的前綴,即取出的鎖,鍵名為“demo-lock:KEY_NAME”,第三個(gè)參數(shù)為鎖的過期時(shí)間(秒),默認(rèn)為60秒,當(dāng)持有鎖超過該時(shí)間后自動(dòng)過期。

使用鎖的方法,下面是對(duì)監(jiān)聽器的修改

  1. @Component 
  2.  
  3. public class RedisListener implement MessageListener{ 
  4.  
  5.     @Autowrite 
  6.  
  7.     private RedisLockRegistry redisLockRegistry; 
  8.  
  9.     private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class); 
  10.  
  11.     @Override 
  12.  
  13.     public void onMessage(Message message,byte[] pattern){ 
  14.  
  15.         Lock lock=redisLockRegistry.obtain("lock"); 
  16.  
  17.         try{ 
  18.  
  19.             lock.lock(); //上鎖 
  20.  
  21.             LOGGER.debug("從消息通道={}監(jiān)聽到消息",new String(pattern)); 
  22.  
  23.             LOGGER.debug("從消息通道={}監(jiān)聽到消息",new String(message.getChannel())); 
  24.  
  25.             LOGGER.debug("元消息={}",new String(message.getBody())); 
  26.  
  27.             // 新建一個(gè)用于反序列化的對(duì)象,注意這里的對(duì)象要和前面配置的一樣 
  28.  
  29.             // 因?yàn)槲仪懊嬖O(shè)置的默認(rèn)序列化方式為GenericJackson2JsonRedisSerializer 
  30.  
  31.             // 所以這里的實(shí)現(xiàn)方式為GenericJackson2JsonRedisSerializer 
  32.  
  33.             RedisSerializer serializer=new GenericJackson2JsonRedisSerializer(); 
  34.  
  35.             LOGGER.debug("反序列化后的消息={}",serializer.deserialize(message.getBody())); 
  36.  
  37.         } catch (Exception e) { 
  38.  
  39.             e.printStackTrace(); 
  40.  
  41.         } finally { 
  42.  
  43.             lock.unlock(); //解鎖 
  44.  
  45.         } 
  46.  
  47.     } 
  48.  

上面代碼的代碼比起前面的監(jiān)聽器代碼,只是多了一個(gè)注入的RedisLockRegistry,一個(gè)通過redisLockRegistry.obtain()方法獲取鎖,一個(gè)加鎖一個(gè)解鎖,然后這就完成了分布式鎖的使用。注意這個(gè)獲取鎖的方法redisLockRegistry.obtain(),其返回的是一個(gè)名為RedisLock的鎖,這是一個(gè)私有內(nèi)部類,它實(shí)現(xiàn)了Lock接口,因此我們不能從代碼外部創(chuàng)建一個(gè)他的實(shí)例,只能通過obtian()方法來獲取這個(gè)鎖。

以上就是本文的全部?jī)?nèi)容。本文來自作者投稿,原作者:小胖兒,轉(zhuǎn)載請(qǐng)注明出處及作者。

【本文是51CTO專欄作者Hollis的原創(chuàng)文章,作者微信公眾號(hào)Hollis(ID:hollischuang)】

 

戳這里,看該作者更多好文

責(zé)任編輯:武曉燕 來源: 51CTO專欄
相關(guān)推薦

2020-09-02 07:52:03

AOP測(cè)試環(huán)境

2021-09-28 13:42:55

Chrome Devwebsocket網(wǎng)絡(luò)協(xié)議

2024-08-05 01:26:54

2022-10-09 08:38:17

消息隊(duì)列面試官模式

2019-04-15 14:40:46

消息隊(duì)列Java編程

2022-04-08 08:26:03

JavaHTTP請(qǐng)求

2024-01-26 13:16:00

RabbitMQ延遲隊(duì)列docker

2017-03-16 15:27:10

面試官測(cè)試技術(shù)

2021-11-02 09:05:25

Redis

2024-09-03 09:20:45

2020-07-02 07:52:11

RedisHash映射

2024-05-29 14:34:07

2020-08-17 07:40:19

消息隊(duì)列

2022-07-13 17:47:54

布局Flex代碼

2024-05-28 10:14:31

JavaScrip模板引擎

2022-07-06 13:48:24

RedisSentinel機(jī)制

2024-04-09 10:40:04

2024-10-22 16:39:07

2020-05-22 08:11:48

線程池JVM面試

2021-06-09 07:55:19

NodeEventEmitte驅(qū)動(dòng)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)