Java消息服務(wù)JMS詳解
Java消息服務(wù)(JMS Java Message Services)提供了點(diǎn)對(duì)點(diǎn)模式(Point-to-Point Queue)和發(fā)布-訂閱模式(Publish-Subscribe Topics).
Queue僅允許一個(gè)消息傳送給一個(gè)客戶(一對(duì)一):
Java消息服務(wù)JMS的接收者和發(fā)送者之間不存在時(shí)間上的依賴關(guān)系。不論發(fā)送者發(fā)送消息時(shí)接收者是否在運(yùn)行,接收者都可以提取信息。接收者對(duì)于成功處理的消息給出回執(zhí)。
Topics可以有多個(gè)客戶端(一對(duì)多,多對(duì)多):
向某個(gè)話題訂閱的客戶程序只能收到那些在它訂閱之后發(fā)布的消息。為了接收到消息,訂閱者必須保持活動(dòng)狀態(tài)。因此,發(fā)布者和訂閱者之間存在時(shí)間上的依賴關(guān)系。
點(diǎn)對(duì)點(diǎn)消息模式通過(guò)一個(gè)消息隊(duì)列(Queue)實(shí)現(xiàn),消息的生產(chǎn)者向隊(duì)列寫入消息,消息的訂閱者從隊(duì)列提取消息。發(fā)布-訂閱消息模式通過(guò)一個(gè)話題(Topic)節(jié)點(diǎn)構(gòu)成的層次結(jié)構(gòu)實(shí)現(xiàn),消息的生產(chǎn)者向這個(gè)層次結(jié)構(gòu)發(fā)布消息,消息的訂閱者向這個(gè)結(jié)構(gòu)訂閱消息。
消息驅(qū)動(dòng)的Bean只有一個(gè)Bean類。從某些方面看,JMS消息驅(qū)動(dòng)的Bean類似于無(wú)狀態(tài)會(huì)話Bean:消息驅(qū)動(dòng)的Bean不為特定的客戶保留數(shù)據(jù)或?qū)υ挔顟B(tài)。
- @MessageDriven(activationConfig={
- @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"),
- @ActivationConfigProperty(propertyName="destination",propertyValue="queue/jms")
- })
@MessageDriven注釋指明這是一個(gè)消息驅(qū)動(dòng)Bean,并使用@ActivationConfigProperty注釋配置消息的各種屬性,其 中destinationType屬性指定消息的類型,消息有兩種類型topics 和queues,下面是這兩種消息類型的介紹:
Topics 可以有多個(gè)客戶端。用topic發(fā)布允許一對(duì)多,或多對(duì)多通訊通道。消息的產(chǎn)生者被叫做publisher, Java消息服務(wù)接受者叫做subscriber。destinationType屬性對(duì)應(yīng)值:javax.jms.Topic
Queue 僅僅允許一個(gè)消息傳送給一個(gè)客戶。一個(gè)發(fā)送者將消息放入消息隊(duì)列,接受者從隊(duì)列中抽取并得到消息,消息就會(huì)在隊(duì)列中消失。第一個(gè)接受者抽取并得到消息后,其他人就不能再得到它。destinationType屬性對(duì)應(yīng)值:javax.jms.Queue destination屬性用作指定消息路徑,消息驅(qū)動(dòng)Bean在發(fā)布時(shí),如果路徑不存在,容器會(huì)自動(dòng)創(chuàng)建該路徑,當(dāng)容器關(guān)閉時(shí)該路徑會(huì)自動(dòng)被刪除
當(dāng)一個(gè)消息到達(dá)queue/jms隊(duì)列時(shí),就會(huì)觸發(fā)onMessage方法,消息作為一個(gè)參數(shù)傳入.
- package com.julycn.jms;
- import javax.ejb.ActivationConfigProperty;
- import javax.ejb.MessageDriven;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageListener;
- import javax.jms.TextMessage;
- @MessageDriven(activationConfig = {
- @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
- @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/jms") })
- public class MessageQueue implements MessageListener {
- public MessageQueue() {
- }
- public void onMessage(Message message) {
- TextMessage tmsg = (TextMessage) message;
- try {
- System.out.println(tmsg.getText());
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }
- package com.julycn.client;
- import javax.jms.JMSException;
- import javax.jms.Queue;
- import javax.jms.QueueConnection;
- import javax.jms.QueueConnectionFactory;
- import javax.jms.QueueSender;
- import javax.jms.QueueSession;
- import javax.jms.TextMessage;
- import javax.naming.InitialContext;
- import javax.naming.NamingException;
- public class MessageQueueClient {
- public static void main(String[] args) {
- QueueConnection conn;
- QueueSession session;
- Queue queue;
- QueueSender sender;
- TextMessage msg;
- try {
- InitialContext ctx = new InitialContext();
- QueueConnectionFactory qcf = (QueueConnectionFactory) ctx
- .lookup("ConnectionFactory");
- conn = qcf.createQueueConnection();
- session = conn.createQueueSession(false,
- QueueSession.AUTO_ACKNOWLEDGE);
- queue = (Queue) ctx.lookup("queue/jms");
- msg = session.createTextMessage("你好,好久不見(jiàn)!");
- sender = session.createSender(queue);
- sender.send(msg);
- sender.close();
- } catch (NamingException e) {
- e.printStackTrace();
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }
(1) 得到一個(gè)JNDI初始化上下文(Context);
例子對(duì)應(yīng)代碼:
Properties props = new Properties();
props.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
props.setProperty("java.naming.provider.url", "localhost:1099");
props.setProperty("java.naming.factory.url.pkgs", "org.jboss.naming");
InitialContext ctx = new InitialContext(props);
注意:可以寫在代碼中,也可以寫在jndi.properties文件中.
(2) 根據(jù)上下文來(lái)查找一個(gè)連接工廠TopicConnectFactory/ QueueConnectionFactory (有兩種連接工廠,根據(jù)是topic/queue來(lái)使用相應(yīng)的類型);
例子對(duì)應(yīng)代碼:
QueueConnectionFactory qcf =(QueueConnectionFactory) ctx.lookup("ConnectionFactory");
(3) 從連接工廠得到一個(gè)連接(Connect 有兩種[TopicConnection/ QueueConnection]);
例子對(duì)應(yīng)代碼:conn = qcf.createQueueConnection();
(4) 通過(guò)連接來(lái)建立一個(gè)會(huì)話(Session);
例子對(duì)應(yīng)代碼:session= conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
這句代碼意思是:建立不需要事務(wù)的并且能自動(dòng)接收J(rèn)ava消息服務(wù)收條的會(huì)話,在非事務(wù)Session 中,JMS消息傳遞的方式有三種:
Session.AUTO_ACKNOWLEDGE :當(dāng)客戶機(jī)調(diào)用的receive方法成功返回,或當(dāng)MessageListenser 成功處理了消息,session將會(huì)自動(dòng)接收消息的收條。
Session.CLIENT_ACKNOWLEDGE :客戶機(jī)通過(guò)調(diào)用消息的acknowledge方法來(lái)接收消息。接收發(fā)生在session層。接收到一個(gè)被消費(fèi)的消息時(shí),將自動(dòng)接收該session已經(jīng) 消費(fèi)的所有消息。例如:如果消息的消費(fèi)者消費(fèi)了10條消息,然后接收15 個(gè)被傳遞的消息,則前面的10 個(gè)消息的收據(jù)都會(huì)在這15 個(gè)消息中被接收。
Session.DUPS_ACKNOWLEDGE :指示session緩慢接收消息。
(5) 查找目的地(Topic/ Queue);
例子對(duì)應(yīng)代碼:queue =(Queue) ctx.lookup("queue/jms");
(6) 根據(jù)會(huì)話以及目的地來(lái)建立消息制造者(TopicPublisher/QueueSender)和消費(fèi)者(TopicSubscriber/QueueReceiver).
例子對(duì)應(yīng)代碼:
msg = session.createTextMessage("你好,好久不見(jiàn)!");
sender = session.createSender(queue);
sender.send(msg);
備注:如果運(yùn)行時(shí)出現(xiàn)javax.naming.NameNotFoundException: jms not bound , 是因?yàn)镴Boss不會(huì)自已建立一個(gè)Queue對(duì)象,因此,需要手工來(lái)配置Queue對(duì)象。可以<JBoss5.x安裝目錄>\server\default\deploy目錄中建立一個(gè)xxx-service.xml文件,其中xxx可以任意取值,但必須跟“-service”后綴,例如,abc-service.xml。該文件可以放在deploy或其子目錄(可以是多層子目錄)中。該文件的內(nèi)容如下:
- <?xml version="1.0" encoding="UTF-8"?>
- <server>
- <mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.mq.destination:service=Queue,name=jms">
- <depends optional-attribute-name="DestinationManager">
- jboss.mq:service=DestinationManager</depends>
- </mbean>
- </server>
<mbean>元素的name屬性值中的name必須是jms,要與queue/jms中的/后面的部分一致.
【編輯推薦】