自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

干貨|SpringBoot JMS(ActiveMQ)API實(shí)踐應(yīng)用詳解

開(kāi)發(fā) 前端
Active是一種開(kāi)源的,實(shí)現(xiàn)了JMS1.1規(guī)范的,面向消息(MOM)的中間件,為應(yīng)用程序提供高效的、可擴(kuò)展的、穩(wěn)定的和安全的企業(yè)級(jí)消息通信。AC-tiveMQ使用Apache提供的授權(quán),任何人都可以對(duì)其實(shí)現(xiàn)代碼進(jìn)行修改。

[[350573]]

 前言

Active是一種開(kāi)源的,實(shí)現(xiàn)了JMS1.1規(guī)范的,面向消息(MOM)的中間件,為應(yīng)用程序提供高效的、可擴(kuò)展的、穩(wěn)定的和安全的企業(yè)級(jí)消息通信。AC-tiveMQ使用Apache提供的授權(quán),任何人都可以對(duì)其實(shí)現(xiàn)代碼進(jìn)行修改。

ActiveMQ的設(shè)計(jì)目標(biāo)是提供標(biāo)準(zhǔn)的,面向消息的,能夠跨越多語(yǔ)言和多系統(tǒng)的應(yīng)用集成消息通信中間件。ActiveMQ實(shí)現(xiàn)了JMS標(biāo)準(zhǔn)并提供了很多附加的特性。本文將帶大家詳細(xì)介紹ActiveMQ的API的使用。

1. JMS的概念?

「什么是JMS呢:」

  • JMS---------JAVA Message Service
  • JAVA的消息服務(wù),是sun公司提供的接口,只是一個(gè)規(guī)范,這個(gè)規(guī)范就類(lèi)似于JDBC是一樣的,使用的時(shí)候是需要當(dāng)前規(guī)范的實(shí)現(xiàn)產(chǎn)品的。

「JMS能干什么呢:」

  • 能夠?qū)⑿畔l(fā)布到目的地
  • 可以從目的地來(lái)消費(fèi)這個(gè)消息

2、兩種通信模型

「隊(duì)列的通信概念:」

  • 特點(diǎn):當(dāng)我們同一個(gè)隊(duì)列有多個(gè)消費(fèi)者的時(shí)候,多個(gè)消費(fèi)者的數(shù)據(jù)之和才是原來(lái)隊(duì)列中的所有數(shù)據(jù)
  • 隊(duì)列的通信模型最大的適用場(chǎng)景:流量的消峰,高并發(fā)的處理

「主題的通信模型:」

  • 特點(diǎn):當(dāng)我們隊(duì)列有多個(gè)消費(fèi)者的時(shí)候,那么這多個(gè)消費(fèi)者消費(fèi)到的數(shù)據(jù)是一樣的
  • 主題消費(fèi)者通信模型的適用場(chǎng)景:微服務(wù)下服務(wù)之間的異步通信

3. MQ的實(shí)現(xiàn)產(chǎn)品

「實(shí)現(xiàn)產(chǎn)品:」

  • ActiveMQ
  • RabbitMQ
  • RockerMQ
  • Kafka(這個(gè)設(shè)計(jì)的初衷是做分布式的日志的,后來(lái)因?yàn)槿罩居袊?yán)格的順序問(wèn)題,這個(gè)時(shí)候人們就用Kafka來(lái)做消息隊(duì)列了)

4、JMS中常見(jiàn)的名詞

「常見(jiàn)的名詞:」

  • ActiveMQConnectionFactory:這個(gè)是創(chuàng)建連接的工廠
  • ConnectionFactory:連接的工廠
  • Connection:連接JAVA對(duì)MQ的一個(gè)連接
  • Destination:目的地
  • 生產(chǎn)者(Producer)
  • 消費(fèi)者(Consumer)
  • Session:會(huì)話(huà)(每一次對(duì)MQ的操作都稱(chēng)為一次會(huì)話(huà))
  • Queue:隊(duì)列
  • Topic:主題

5、什么是消息隊(duì)列

「消息隊(duì)列簡(jiǎn)單的說(shuō)就是用來(lái)存放臨時(shí)數(shù)據(jù)的地方:」

  • 生產(chǎn)者----------->存儲(chǔ)介質(zhì)上
  • 消費(fèi)者----------->存儲(chǔ)介質(zhì)上

「消息隊(duì)列類(lèi)似于快遞公司:」

  • 你可以將東西交給快遞公司
  • 目標(biāo)人也可以從快遞公司去取東西

6. ActiveMQ是什么

「含義:」

  • ActiveMQ就是一個(gè)JMS的實(shí)現(xiàn)產(chǎn)品,它能夠?qū)崿F(xiàn)JMS下的所有功能

7、ActiveMQ能干什么

「主要作用:」

  • 流量消峰處理
  • 微服務(wù)下模塊的異步通信
  • 處理高并發(fā)下的訂單
  • 處理第三方平臺(tái)的高并發(fā)
  • 協(xié)助消息表可以完成分布式事務(wù)的最終一致性

8、ActiveMQ的安裝

「ActiveMQ的安裝和配置:」

  1.  1、官網(wǎng)下載Linux版的ActiveMQ(最新版本為5.13.4) 
  2.        https://activemq.apache.org/download.html 
  3.  
  4.        2、解壓安裝 
  5.       tar -zxvf apache-activemq-5.13.4-bin.tar.gz 
  6.  
  7.        3、配置(這里采用默認(rèn)配置,無(wú)需修改) 
  8.       vim /usr/lical/activemq-1/conf/activemq.xml 
  9.  
  10.        4、啟動(dòng) 
  11.       cd /usr/local/activemq-1/bin 
  12. ./activemq start 
  13.  
  14.       5、打開(kāi)管理界面(管理界面可以查看并管理所有隊(duì)列及消息) 
  15.          http://192.168.1.100:8161 
  16.  
  17.        啟動(dòng)成功后,可以瀏覽 http://localhost:8161/admin/ 
  18.        默認(rèn)用戶(hù)名、密碼:admin/admin 
  19.        管理界面是用jetty做容器的,如果想修改管理界面的端口,可以編輯../conf/jetty.xml,找到下面這一段: 
  20.       <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"
  21.     <!-- the default port       number for the web console --> 
  22.     <property name="host" value="0.0.0.0"/> 
  23.     <property name="port" value="8161"/> 
  24.      </bean> 
  25.        用戶(hù)名/密碼是在 ../conf/jetty-realm.properties 里,比如要增加一個(gè)管理員jimmy/123456,可參考下面修改: 
  26.        1 
  27. 3admin: admin, admin 
  28. jimmy: 123456, admin 
  29. useruseruser 
  30.        注:管理界面有一個(gè)小坑,ActiveMQ 5.13.2與jdk1.8兼容性有點(diǎn)問(wèn)題,如果使用jdk1.8,管理界面進(jìn)入Queues標(biāo)簽頁(yè)時(shí),偶爾會(huì)報(bào)錯(cuò),但是并不影響消息正常收發(fā),只是無(wú)法從界面上查看隊(duì)列情況,如果出現(xiàn)該問(wèn)題,可將jdk版本降至1.7,同時(shí)最好清空data目錄下的所有數(shù)據(jù),再重啟activemq即可。 

9. ActiveMQ的API的使用

「AcatveMQ的API使用:」

 

  • 隊(duì)列的使用(生產(chǎn)者)
  1. package com.qy.mq.queue; 
  2.  
  3. import org.apache.activemq.ActiveMQConnectionFactory; 
  4. import org.apache.activemq.Message; 
  5.  
  6. import javax.jms.*; 
  7.  
  8. /** 
  9.  * @Auther: qianyu 
  10.  * @Date: 2020/11/04 14:12 
  11.  * @Description:生產(chǎn)者 
  12.  */ 
  13. public class Producer { 
  14.  
  15.     //準(zhǔn)備發(fā)布的這個(gè)地址 
  16.     private  static final String PATH="tcp://10.7.182.87:61616"
  17.     //ActiveMQ下的用戶(hù)名 
  18.     private static final String userName="admin"
  19.     //ActiveMQ下的密碼 
  20.     private static final String password="admin"
  21.  
  22.     public static void main(String[] args) throws JMSException { 
  23.         //第一步:創(chuàng)建連接的工廠 
  24.         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH); 
  25.         //通過(guò)這個(gè)工廠獲取連接 
  26.         Connection connection = activeMQConnectionFactory.createConnection(); 
  27.         //第三步:打開(kāi)這個(gè)連接 
  28.         connection.start(); 
  29.         //第四步:創(chuàng)建操作MQ的這個(gè)會(huì)話(huà) 
  30.         /** 
  31.          * 第一個(gè)參數(shù):是否使用事務(wù) 
  32.          * 第二個(gè)參數(shù):客戶(hù)端的應(yīng)答模式 
  33.          *     第一種:自動(dòng)應(yīng)答 
  34.          *     第二種:客戶(hù)端手動(dòng)應(yīng)答 
  35.          */ 
  36.         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
  37.         //需要發(fā)送消息的目的地(queue操作的是隊(duì)列) 
  38.         Destination destination=session.createQueue("wqqq"); 
  39.         //生產(chǎn)者來(lái)生產(chǎn)這個(gè)消息 
  40.         //要有生產(chǎn)者 
  41.         MessageProducer messageProducer = session.createProducer(destination); 
  42.  
  43.         //發(fā)送很多的消息到消息隊(duì)列中去 
  44. //        for (int i=0;i<100;i++){ 
  45.             //需要準(zhǔn)備發(fā)送的消息 
  46. //            TextMessage textMessage = session.createTextMessage("我是淺羽:"+i); 
  47.             //研究消息的類(lèi)型 
  48.  
  49.           /*  BytesMessage bytesMessage = session.createBytesMessage(); 
  50.             bytesMessage.setByteProperty("www",new Byte("123")); 
  51.  
  52.             //接下來(lái)就可以發(fā)送消息了 
  53.             messageProducer.send(bytesMessage);*/ 
  54.  
  55.         //創(chuàng)建map類(lèi)型的message 
  56.         /*MapMessage mapMessage = session.createMapMessage(); 
  57.         mapMessage.setInt("www1",123); 
  58.  
  59.         messageProducer.send(mapMessage);*/ 
  60.  
  61.         ObjectMessage objectMessage = session.createObjectMessage(new User(1, "qianyu""123")); 
  62.         messageProducer.send(objectMessage); 
  63.  
  64. //        } 
  65.     } 
  66.  
  • 隊(duì)列的使用(消費(fèi)者)
  1. package com.qy.mq.queue; 
  2.  
  3. import org.apache.activemq.ActiveMQConnectionFactory; 
  4.  
  5. import javax.jms.*; 
  6. import java.io.Serializable
  7.  
  8. /** 
  9.  * @Auther: qianyu 
  10.  * @Date: 2020/11/04 14:13 
  11.  * @Description:消費(fèi)者 
  12.  */ 
  13. public class Consumer { 
  14.  
  15.     //準(zhǔn)備發(fā)布的這個(gè)地址 
  16.     private  static final String PATH="tcp://10.7.182.87:61616"
  17.     //ActiveMQ下的用戶(hù)名 
  18.     private static final String userName="admin"
  19.     //ActiveMQ下的密碼 
  20.     private static final String password="admin"
  21.  
  22.     public static void main(String[] args) throws JMSException { 
  23.         //第一步:創(chuàng)建連接的工廠 
  24.         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH); 
  25.         //通過(guò)這個(gè)工廠獲取連接 
  26.         Connection connection = activeMQConnectionFactory.createConnection(); 
  27.         //第三步:打開(kāi)這個(gè)連接 
  28.         connection.start(); 
  29.         //第四步:創(chuàng)建操作MQ的這個(gè)會(huì)話(huà) 
  30.         /** 
  31.          * 第一個(gè)參數(shù):是否使用事務(wù) 
  32.          * 第二個(gè)參數(shù):客戶(hù)端的應(yīng)答模式 
  33.          *     第一種:自動(dòng)應(yīng)答 
  34.          *     第二種:客戶(hù)端手動(dòng)應(yīng)答 
  35.          */ 
  36.         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
  37.         //需要發(fā)送消息的目的地(queue操作的是隊(duì)列) 
  38.         Destination destination=session.createQueue("wqqq"); 
  39.         //創(chuàng)建我們的消費(fèi)者了 
  40.         MessageConsumer messageConsumer = session.createConsumer(destination); 
  41.         //接下來(lái)就可以接收我們的消息了 
  42.         //Message message = messageConsumer.receive(); 
  43.  
  44.         //接收消息并指定這個(gè)超時(shí)的時(shí)間 
  45. //      Message message = messageConsumer.receive(5000); 
  46.  
  47.         //接收消息沒(méi)有就不等待了 直接over了  不接收了 
  48. //      Message message = messageConsumer.receiveNoWait(); 
  49.  
  50.         //給定當(dāng)前的路徑設(shè)置監(jiān)聽(tīng)器 
  51.         messageConsumer.setMessageListener(new MessageListener() { 
  52.             @Override 
  53.             public void onMessage(Message message) { 
  54.  
  55.                /* BytesMessage bytesMessage= (BytesMessage) message; 
  56.  
  57.                 try { 
  58.                     System.out.println("獲取到的數(shù)據(jù)是:"+bytesMessage.getByteProperty("www")); 
  59.                 } catch (JMSException e) { 
  60.                     e.printStackTrace(); 
  61.                 }*/ 
  62.  
  63.  
  64.                /*MapMessage mapMessage= (MapMessage) message; 
  65.  
  66.                 try { 
  67.                     System.out.println("獲取到的數(shù)據(jù)是:"+mapMessage.getInt("www1")); 
  68.                 } catch (JMSException e) { 
  69.                     e.printStackTrace(); 
  70.                 }*/ 
  71.  
  72.                //測(cè)試對(duì)象類(lèi)型的消息的發(fā)送和接收 
  73.                ObjectMessage objectMessage= (ObjectMessage) message; 
  74.                 try { 
  75.                     User user = (User) objectMessage.getObject(); 
  76.  
  77.                     System.out.println("接收到的數(shù)據(jù)是:"+user); 
  78.                 } catch (JMSException e) { 
  79.                     e.printStackTrace(); 
  80.                 } 
  81.  
  82.  
  83.               /*  //我們知道是一個(gè)字符串類(lèi)型的消息 
  84.                 TextMessage textMessage= (TextMessage) message; 
  85.                 //接下來(lái)就可以打印這個(gè)消息了 
  86.                 try { 
  87.                     System.out.println("消費(fèi)者1---接收到的消息是:"+textMessage.getText()); 
  88.                 } catch (JMSException e) { 
  89.                     e.printStackTrace(); 
  90.                 }*/ 
  91.  
  92.                 try { 
  93.                     //這句話(huà)就表示的是客戶(hù)端來(lái)手動(dòng)的進(jìn)行應(yīng)答 
  94.                     message.acknowledge(); 
  95.                 } catch (JMSException e) { 
  96.                     e.printStackTrace(); 
  97.                 } 
  98.  
  99.             } 
  100.         }); 
  101.     } 
  • 主題模型的生產(chǎn)者
  1. package com.qy.mq.topic; 
  2.  
  3. import org.apache.activemq.ActiveMQConnectionFactory; 
  4.  
  5. import javax.jms.*; 
  6.  
  7. /** 
  8.  * @Auther: qianyu 
  9.  * @Date: 2020/11/04 15:17 
  10.  * @Description: 
  11.  */ 
  12. public class Producer { 
  13.     //準(zhǔn)備發(fā)布的這個(gè)地址 
  14.     private  static final String PATH="tcp://10.7.182.87:61616"
  15.     //ActiveMQ下的用戶(hù)名 
  16.     private static final String userName="admin"
  17.     //ActiveMQ下的密碼 
  18.     private static final String password="admin"
  19.  
  20.     public static void main(String[] args) throws JMSException { 
  21.         //第一步:創(chuàng)建連接的工廠 
  22.         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH); 
  23.         //通過(guò)這個(gè)工廠獲取連接 
  24.         Connection connection = activeMQConnectionFactory.createConnection(); 
  25.         //第三步:打開(kāi)這個(gè)連接 
  26.         connection.start(); 
  27.         //第四步:創(chuàng)建操作MQ的這個(gè)會(huì)話(huà) 
  28.         /** 
  29.          * 第一個(gè)參數(shù):是否使用事務(wù) 
  30.          * 第二個(gè)參數(shù):客戶(hù)端的應(yīng)答模式 
  31.          *     第一種:自動(dòng)應(yīng)答 
  32.          *     第二種:客戶(hù)端手動(dòng)應(yīng)答 
  33.          */ 
  34.         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
  35.         //需要發(fā)送消息的目的地(下面創(chuàng)建的就應(yīng)該是主題模型的地址) 
  36.         Destination destination=session.createTopic("topic222"); 
  37.         //生產(chǎn)者來(lái)生產(chǎn)這個(gè)消息 
  38.         //要有生產(chǎn)者 
  39.         MessageProducer messageProducer = session.createProducer(destination); 
  40.  
  41.         //發(fā)送很多的消息到消息隊(duì)列中去 
  42.         for (int i=0;i<100;i++){ 
  43.             //需要準(zhǔn)備發(fā)送的消息 
  44.             TextMessage textMessage = session.createTextMessage("我是淺羽:"+i); 
  45.             //接下來(lái)就可以發(fā)送消息了 
  46.             messageProducer.send(textMessage); 
  47.         } 
  48.     } 
  • 主題模型的消費(fèi)者
  1. package com.qy.mq.topic; 
  2.  
  3. import org.apache.activemq.ActiveMQConnectionFactory; 
  4.  
  5. import javax.jms.*; 
  6.  
  7. /** 
  8.  * @Auther: qianyu 
  9.  * @Date: 2020/11/04 15:19 
  10.  * @Description: 
  11.  */ 
  12. public class Consumer { 
  13.  
  14.     //準(zhǔn)備發(fā)布的這個(gè)地址 
  15.     private  static final String PATH="tcp://10.7.182.87:61616"
  16.     //ActiveMQ下的用戶(hù)名 
  17.     private static final String userName="admin"
  18.     //ActiveMQ下的密碼 
  19.     private static final String password="admin"
  20.  
  21.     public static void main(String[] args) throws JMSException { 
  22.         //第一步:創(chuàng)建連接的工廠 
  23.         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH); 
  24.         //通過(guò)這個(gè)工廠獲取連接 
  25.         Connection connection = activeMQConnectionFactory.createConnection(); 
  26.         //第三步:打開(kāi)這個(gè)連接 
  27.         connection.start(); 
  28.         //第四步:創(chuàng)建操作MQ的這個(gè)會(huì)話(huà) 
  29.         /** 
  30.          * 第一個(gè)參數(shù):是否使用事務(wù) 
  31.          * 第二個(gè)參數(shù):客戶(hù)端的應(yīng)答模式 
  32.          *     第一種:自動(dòng)應(yīng)答 
  33.          *     第二種:客戶(hù)端手動(dòng)應(yīng)答 
  34.          */ 
  35.         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
  36.         //需要發(fā)送消息的目的地(queue操作的是隊(duì)列) 
  37.         Destination destination=session.createTopic("topic222"); 
  38.         //創(chuàng)建我們的消費(fèi)者了 
  39.         MessageConsumer messageConsumer = session.createConsumer(destination); 
  40.         //接下來(lái)就可以接收我們的消息了 
  41.  
  42.         //給定當(dāng)前的路徑設(shè)置監(jiān)聽(tīng)器 
  43.         messageConsumer.setMessageListener(new MessageListener() { 
  44.             @Override 
  45.             public void onMessage(Message message) { 
  46.  
  47.                 //我們知道是一個(gè)字符串類(lèi)型的消息 
  48.                 TextMessage textMessage= (TextMessage) message; 
  49.                 //接下來(lái)就可以打印這個(gè)消息了 
  50.                 try { 
  51.                     System.out.println("消費(fèi)者1---接收到的消息是:"+textMessage.getText()); 
  52.                 } catch (JMSException e) { 
  53.                     e.printStackTrace(); 
  54.                 } 
  55.  
  56.                 try { 
  57.                     message.acknowledge(); 
  58.                 } catch (JMSException e) { 
  59.                     e.printStackTrace(); 
  60.                 } 
  61.  
  62.             } 
  63.         }); 
  64.     } 

本篇關(guān)于ActiveMQ的介紹就先到這里結(jié)束了,后續(xù)會(huì)出更多關(guān)于ActiveMQ系列更多文章,謝謝大家支持!

責(zé)任編輯:姜華 來(lái)源: 淺羽的IT小屋
相關(guān)推薦

2024-04-07 07:53:12

SpringWeb技術(shù)WebSocket

2012-05-25 15:35:43

JMSJava

2009-06-25 15:33:13

Java消息服務(wù)JMS

2024-03-19 08:45:45

WebSocketSpring應(yīng)用開(kāi)發(fā)

2023-03-23 09:33:22

Android移動(dòng)開(kāi)發(fā)

2024-08-26 15:35:40

2022-03-22 22:05:39

區(qū)塊鏈支付模式

2022-05-12 11:41:16

開(kāi)發(fā)框架程序

2025-02-28 09:20:00

Future開(kāi)發(fā)代碼

2019-11-11 17:34:16

前端開(kāi)發(fā)技術(shù)

2019-01-15 09:10:17

邊緣計(jì)算數(shù)據(jù)中心IT

2023-09-08 23:30:19

2019-01-21 14:20:26

Java開(kāi)發(fā)代碼

2013-06-13 09:21:31

RESTful APIRESTfulAPI

2021-03-05 11:52:50

LinuxSPI驅(qū)動(dòng)詳解

2024-04-08 07:28:27

PiniaVue3狀態(tài)管理庫(kù)

2020-03-30 20:14:53

ActiveMQ設(shè)計(jì)實(shí)踐

2017-06-13 09:17:17

手機(jī)QQQzone

2020-10-27 13:19:33

架構(gòu)APICloud多端

2016-12-27 08:49:55

API設(shè)計(jì)策略
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)