干貨|SpringBoot JMS(ActiveMQ)API實(shí)踐應(yīng)用詳解
前言
Active是一種開(kāi)源的,實(shí)現(xiàn)了JMS1.1規(guī)范的,面向消息(MOM)的中間件,為應(yīng)用程序提供高效的、可擴(kuò)展的、穩(wěn)定的和安全的企業(yè)級(jí)消息通信。AC-tiveMQ使用Apache提供的授權(quán),任何人都可以對(duì)其實(shí)現(xiàn)代碼進(jìn)行修改。
ActiveMQ的設(shè)計(jì)目標(biāo)是提供標(biāo)準(zhǔn)的,面向消息的,能夠跨越多語(yǔ)言和多系統(tǒng)的應(yīng)用集成消息通信中間件。ActiveMQ實(shí)現(xiàn)了JMS標(biāo)準(zhǔn)并提供了很多附加的特性。本文將帶大家詳細(xì)介紹ActiveMQ的API的使用。
1. JMS的概念?
「什么是JMS呢:」
- JMS---------JAVA Message Service
- JAVA的消息服務(wù),是sun公司提供的接口,只是一個(gè)規(guī)范,這個(gè)規(guī)范就類(lèi)似于JDBC是一樣的,使用的時(shí)候是需要當(dāng)前規(guī)范的實(shí)現(xiàn)產(chǎn)品的。
「JMS能干什么呢:」
- 能夠?qū)⑿畔l(fā)布到目的地
- 可以從目的地來(lái)消費(fèi)這個(gè)消息
2、兩種通信模型
「隊(duì)列的通信概念:」
- 特點(diǎn):當(dāng)我們同一個(gè)隊(duì)列有多個(gè)消費(fèi)者的時(shí)候,多個(gè)消費(fèi)者的數(shù)據(jù)之和才是原來(lái)隊(duì)列中的所有數(shù)據(jù)
- 隊(duì)列的通信模型最大的適用場(chǎng)景:流量的消峰,高并發(fā)的處理
「主題的通信模型:」
- 特點(diǎn):當(dāng)我們隊(duì)列有多個(gè)消費(fèi)者的時(shí)候,那么這多個(gè)消費(fèi)者消費(fèi)到的數(shù)據(jù)是一樣的
- 主題消費(fèi)者通信模型的適用場(chǎng)景:微服務(wù)下服務(wù)之間的異步通信
3. MQ的實(shí)現(xiàn)產(chǎn)品
「實(shí)現(xiàn)產(chǎn)品:」
- ActiveMQ
- RabbitMQ
- RockerMQ
- Kafka(這個(gè)設(shè)計(jì)的初衷是做分布式的日志的,后來(lái)因?yàn)槿罩居袊?yán)格的順序問(wèn)題,這個(gè)時(shí)候人們就用Kafka來(lái)做消息隊(duì)列了)
4、JMS中常見(jiàn)的名詞
「常見(jiàn)的名詞:」
- ActiveMQConnectionFactory:這個(gè)是創(chuàng)建連接的工廠
- ConnectionFactory:連接的工廠
- Connection:連接JAVA對(duì)MQ的一個(gè)連接
- Destination:目的地
- 生產(chǎn)者(Producer)
- 消費(fèi)者(Consumer)
- Session:會(huì)話(huà)(每一次對(duì)MQ的操作都稱(chēng)為一次會(huì)話(huà))
- Queue:隊(duì)列
- Topic:主題
5、什么是消息隊(duì)列
「消息隊(duì)列簡(jiǎn)單的說(shuō)就是用來(lái)存放臨時(shí)數(shù)據(jù)的地方:」
- 生產(chǎn)者----------->存儲(chǔ)介質(zhì)上
- 消費(fèi)者----------->存儲(chǔ)介質(zhì)上
「消息隊(duì)列類(lèi)似于快遞公司:」
- 你可以將東西交給快遞公司
- 目標(biāo)人也可以從快遞公司去取東西
6. ActiveMQ是什么
「含義:」
- ActiveMQ就是一個(gè)JMS的實(shí)現(xiàn)產(chǎn)品,它能夠?qū)崿F(xiàn)JMS下的所有功能
7、ActiveMQ能干什么
「主要作用:」
- 流量消峰處理
- 微服務(wù)下模塊的異步通信
- 處理高并發(fā)下的訂單
- 處理第三方平臺(tái)的高并發(fā)
- 協(xié)助消息表可以完成分布式事務(wù)的最終一致性
8、ActiveMQ的安裝
「ActiveMQ的安裝和配置:」
- 1、官網(wǎng)下載Linux版的ActiveMQ(最新版本為5.13.4)
- https://activemq.apache.org/download.html
- 2、解壓安裝
- tar -zxvf apache-activemq-5.13.4-bin.tar.gz
- 3、配置(這里采用默認(rèn)配置,無(wú)需修改)
- vim /usr/lical/activemq-1/conf/activemq.xml
- 4、啟動(dòng)
- cd /usr/local/activemq-1/bin
- ./activemq start
- 5、打開(kāi)管理界面(管理界面可以查看并管理所有隊(duì)列及消息)
- http://192.168.1.100:8161
- 啟動(dòng)成功后,可以瀏覽 http://localhost:8161/admin/
- 默認(rèn)用戶(hù)名、密碼:admin/admin
- 管理界面是用jetty做容器的,如果想修改管理界面的端口,可以編輯../conf/jetty.xml,找到下面這一段:
- <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
- <!-- the default port number for the web console -->
- <property name="host" value="0.0.0.0"/>
- <property name="port" value="8161"/>
- </bean>
- 用戶(hù)名/密碼是在 ../conf/jetty-realm.properties 里,比如要增加一個(gè)管理員jimmy/123456,可參考下面修改:
- 1
- 2
- 3admin: admin, admin
- jimmy: 123456, admin
- user: user, user
- 注:管理界面有一個(gè)小坑,ActiveMQ 5.13.2與jdk1.8兼容性有點(diǎn)問(wèn)題,如果使用jdk1.8,管理界面進(jìn)入Queues標(biāo)簽頁(yè)時(shí),偶爾會(huì)報(bào)錯(cuò),但是并不影響消息正常收發(fā),只是無(wú)法從界面上查看隊(duì)列情況,如果出現(xiàn)該問(wèn)題,可將jdk版本降至1.7,同時(shí)最好清空data目錄下的所有數(shù)據(jù),再重啟activemq即可。
9. ActiveMQ的API的使用
「AcatveMQ的API使用:」
- 隊(duì)列的使用(生產(chǎn)者)
- package com.qy.mq.queue;
- import org.apache.activemq.ActiveMQConnectionFactory;
- import org.apache.activemq.Message;
- import javax.jms.*;
- /**
- * @Auther: qianyu
- * @Date: 2020/11/04 14:12
- * @Description:生產(chǎn)者
- */
- public class Producer {
- //準(zhǔn)備發(fā)布的這個(gè)地址
- private static final String PATH="tcp://10.7.182.87:61616";
- //ActiveMQ下的用戶(hù)名
- private static final String userName="admin";
- //ActiveMQ下的密碼
- private static final String password="admin";
- public static void main(String[] args) throws JMSException {
- //第一步:創(chuàng)建連接的工廠
- ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
- //通過(guò)這個(gè)工廠獲取連接
- Connection connection = activeMQConnectionFactory.createConnection();
- //第三步:打開(kāi)這個(gè)連接
- connection.start();
- //第四步:創(chuàng)建操作MQ的這個(gè)會(huì)話(huà)
- /**
- * 第一個(gè)參數(shù):是否使用事務(wù)
- * 第二個(gè)參數(shù):客戶(hù)端的應(yīng)答模式
- * 第一種:自動(dòng)應(yīng)答
- * 第二種:客戶(hù)端手動(dòng)應(yīng)答
- */
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- //需要發(fā)送消息的目的地(queue操作的是隊(duì)列)
- Destination destination=session.createQueue("wqqq");
- //生產(chǎn)者來(lái)生產(chǎn)這個(gè)消息
- //要有生產(chǎn)者
- MessageProducer messageProducer = session.createProducer(destination);
- //發(fā)送很多的消息到消息隊(duì)列中去
- // for (int i=0;i<100;i++){
- //需要準(zhǔn)備發(fā)送的消息
- // TextMessage textMessage = session.createTextMessage("我是淺羽:"+i);
- //研究消息的類(lèi)型
- /* BytesMessage bytesMessage = session.createBytesMessage();
- bytesMessage.setByteProperty("www",new Byte("123"));
- //接下來(lái)就可以發(fā)送消息了
- messageProducer.send(bytesMessage);*/
- //創(chuàng)建map類(lèi)型的message
- /*MapMessage mapMessage = session.createMapMessage();
- mapMessage.setInt("www1",123);
- messageProducer.send(mapMessage);*/
- ObjectMessage objectMessage = session.createObjectMessage(new User(1, "qianyu", "123"));
- messageProducer.send(objectMessage);
- // }
- }
- }
- 隊(duì)列的使用(消費(fèi)者)
- package com.qy.mq.queue;
- import org.apache.activemq.ActiveMQConnectionFactory;
- import javax.jms.*;
- import java.io.Serializable;
- /**
- * @Auther: qianyu
- * @Date: 2020/11/04 14:13
- * @Description:消費(fèi)者
- */
- public class Consumer {
- //準(zhǔn)備發(fā)布的這個(gè)地址
- private static final String PATH="tcp://10.7.182.87:61616";
- //ActiveMQ下的用戶(hù)名
- private static final String userName="admin";
- //ActiveMQ下的密碼
- private static final String password="admin";
- public static void main(String[] args) throws JMSException {
- //第一步:創(chuàng)建連接的工廠
- ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
- //通過(guò)這個(gè)工廠獲取連接
- Connection connection = activeMQConnectionFactory.createConnection();
- //第三步:打開(kāi)這個(gè)連接
- connection.start();
- //第四步:創(chuàng)建操作MQ的這個(gè)會(huì)話(huà)
- /**
- * 第一個(gè)參數(shù):是否使用事務(wù)
- * 第二個(gè)參數(shù):客戶(hù)端的應(yīng)答模式
- * 第一種:自動(dòng)應(yīng)答
- * 第二種:客戶(hù)端手動(dòng)應(yīng)答
- */
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- //需要發(fā)送消息的目的地(queue操作的是隊(duì)列)
- Destination destination=session.createQueue("wqqq");
- //創(chuàng)建我們的消費(fèi)者了
- MessageConsumer messageConsumer = session.createConsumer(destination);
- //接下來(lái)就可以接收我們的消息了
- //Message message = messageConsumer.receive();
- //接收消息并指定這個(gè)超時(shí)的時(shí)間
- // Message message = messageConsumer.receive(5000);
- //接收消息沒(méi)有就不等待了 直接over了 不接收了
- // Message message = messageConsumer.receiveNoWait();
- //給定當(dāng)前的路徑設(shè)置監(jiān)聽(tīng)器
- messageConsumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- /* BytesMessage bytesMessage= (BytesMessage) message;
- try {
- System.out.println("獲取到的數(shù)據(jù)是:"+bytesMessage.getByteProperty("www"));
- } catch (JMSException e) {
- e.printStackTrace();
- }*/
- /*MapMessage mapMessage= (MapMessage) message;
- try {
- System.out.println("獲取到的數(shù)據(jù)是:"+mapMessage.getInt("www1"));
- } catch (JMSException e) {
- e.printStackTrace();
- }*/
- //測(cè)試對(duì)象類(lèi)型的消息的發(fā)送和接收
- ObjectMessage objectMessage= (ObjectMessage) message;
- try {
- User user = (User) objectMessage.getObject();
- System.out.println("接收到的數(shù)據(jù)是:"+user);
- } catch (JMSException e) {
- e.printStackTrace();
- }
- /* //我們知道是一個(gè)字符串類(lèi)型的消息
- TextMessage textMessage= (TextMessage) message;
- //接下來(lái)就可以打印這個(gè)消息了
- try {
- System.out.println("消費(fèi)者1---接收到的消息是:"+textMessage.getText());
- } catch (JMSException e) {
- e.printStackTrace();
- }*/
- try {
- //這句話(huà)就表示的是客戶(hù)端來(lái)手動(dòng)的進(jìn)行應(yīng)答
- message.acknowledge();
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
- 主題模型的生產(chǎn)者
- package com.qy.mq.topic;
- import org.apache.activemq.ActiveMQConnectionFactory;
- import javax.jms.*;
- /**
- * @Auther: qianyu
- * @Date: 2020/11/04 15:17
- * @Description:
- */
- public class Producer {
- //準(zhǔn)備發(fā)布的這個(gè)地址
- private static final String PATH="tcp://10.7.182.87:61616";
- //ActiveMQ下的用戶(hù)名
- private static final String userName="admin";
- //ActiveMQ下的密碼
- private static final String password="admin";
- public static void main(String[] args) throws JMSException {
- //第一步:創(chuàng)建連接的工廠
- ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
- //通過(guò)這個(gè)工廠獲取連接
- Connection connection = activeMQConnectionFactory.createConnection();
- //第三步:打開(kāi)這個(gè)連接
- connection.start();
- //第四步:創(chuàng)建操作MQ的這個(gè)會(huì)話(huà)
- /**
- * 第一個(gè)參數(shù):是否使用事務(wù)
- * 第二個(gè)參數(shù):客戶(hù)端的應(yīng)答模式
- * 第一種:自動(dòng)應(yīng)答
- * 第二種:客戶(hù)端手動(dòng)應(yīng)答
- */
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- //需要發(fā)送消息的目的地(下面創(chuàng)建的就應(yīng)該是主題模型的地址)
- Destination destination=session.createTopic("topic222");
- //生產(chǎn)者來(lái)生產(chǎn)這個(gè)消息
- //要有生產(chǎn)者
- MessageProducer messageProducer = session.createProducer(destination);
- //發(fā)送很多的消息到消息隊(duì)列中去
- for (int i=0;i<100;i++){
- //需要準(zhǔn)備發(fā)送的消息
- TextMessage textMessage = session.createTextMessage("我是淺羽:"+i);
- //接下來(lái)就可以發(fā)送消息了
- messageProducer.send(textMessage);
- }
- }
- }
- 主題模型的消費(fèi)者
- package com.qy.mq.topic;
- import org.apache.activemq.ActiveMQConnectionFactory;
- import javax.jms.*;
- /**
- * @Auther: qianyu
- * @Date: 2020/11/04 15:19
- * @Description:
- */
- public class Consumer {
- //準(zhǔn)備發(fā)布的這個(gè)地址
- private static final String PATH="tcp://10.7.182.87:61616";
- //ActiveMQ下的用戶(hù)名
- private static final String userName="admin";
- //ActiveMQ下的密碼
- private static final String password="admin";
- public static void main(String[] args) throws JMSException {
- //第一步:創(chuàng)建連接的工廠
- ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH);
- //通過(guò)這個(gè)工廠獲取連接
- Connection connection = activeMQConnectionFactory.createConnection();
- //第三步:打開(kāi)這個(gè)連接
- connection.start();
- //第四步:創(chuàng)建操作MQ的這個(gè)會(huì)話(huà)
- /**
- * 第一個(gè)參數(shù):是否使用事務(wù)
- * 第二個(gè)參數(shù):客戶(hù)端的應(yīng)答模式
- * 第一種:自動(dòng)應(yīng)答
- * 第二種:客戶(hù)端手動(dòng)應(yīng)答
- */
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- //需要發(fā)送消息的目的地(queue操作的是隊(duì)列)
- Destination destination=session.createTopic("topic222");
- //創(chuàng)建我們的消費(fèi)者了
- MessageConsumer messageConsumer = session.createConsumer(destination);
- //接下來(lái)就可以接收我們的消息了
- //給定當(dāng)前的路徑設(shè)置監(jiān)聽(tīng)器
- messageConsumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- //我們知道是一個(gè)字符串類(lèi)型的消息
- TextMessage textMessage= (TextMessage) message;
- //接下來(lái)就可以打印這個(gè)消息了
- try {
- System.out.println("消費(fèi)者1---接收到的消息是:"+textMessage.getText());
- } catch (JMSException e) {
- e.printStackTrace();
- }
- try {
- message.acknowledge();
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
本篇關(guān)于ActiveMQ的介紹就先到這里結(jié)束了,后續(xù)會(huì)出更多關(guān)于ActiveMQ系列更多文章,謝謝大家支持!