ActiveMQ:JMS開源框架入門介紹
介紹基本的JMS概念與開源的JMS框架ActiveMQ應(yīng)用,內(nèi)容涵蓋一下幾點(diǎn):
- 基本的JMS概念
- JMS的消息模式
- 介紹ActiveMQ
- 一個(gè)基于ActiveMQ的JMS例子程序
一:JMS基本概念
1. JMS的目標(biāo)
為企業(yè)級(jí)的應(yīng)用提供一種智能的消息系統(tǒng),JMS定義了一整套的企業(yè)級(jí)的消息概念與工具,盡可能最小化的Java語言概念去構(gòu)建最大化企業(yè)消息應(yīng)用。統(tǒng)一已經(jīng)存在的企業(yè)級(jí)消息系統(tǒng)功能。
2. 提供者
JMS提供者是指那些完全完成JMS功能與管理功能的JMS消息廠商,理論上JMS提供者完成。
JMS消息產(chǎn)品必須是100%的純Java語言實(shí)現(xiàn),可以運(yùn)行在跨平臺(tái)的架構(gòu)與操作系統(tǒng)上,當(dāng)前一些JMS廠商包括IBM,Oracle, JBoss社區(qū) (JBoss Community), Apache 社區(qū)(ApacheCommunity)。
3. JMS應(yīng)用程序, 一個(gè)完整的JMS應(yīng)用應(yīng)該實(shí)現(xiàn)以下功能:
- JMS 客戶端 – Java語言開發(fā)的接受與發(fā)送消息的程序
- 非JMS客戶端 – 基于消息系統(tǒng)的本地API實(shí)現(xiàn)而不是JMS
- 消息 – 應(yīng)用程序用來相互交流信息的載體
- 被管理對(duì)象–預(yù)先配置的JMS對(duì)象,JMS管理員創(chuàng)建,被客戶端運(yùn)用。如鏈接工廠,主題等
- JMS提供者–完成JMS功能與管理功能的消息系統(tǒng)
二:JMS的消息模式
1.點(diǎn)對(duì)點(diǎn)的消息模式(Point to Point Messaging)
下面的JMS對(duì)象在點(diǎn)對(duì)點(diǎn)消息模式中是必須的:
a.隊(duì)列(Queue) – 一個(gè)提供者命名的隊(duì)列對(duì)象,客戶端將會(huì)使用這個(gè)命名的隊(duì)列對(duì)象
b.隊(duì)列鏈接工廠(QueueConnectionFactory) – 客戶端使用隊(duì)列鏈接工廠創(chuàng)建鏈接隊(duì)列
ConnectionQueue來取得與JMS點(diǎn)對(duì)點(diǎn)消息提供者的鏈接。
c. 鏈接隊(duì)列(ConnectionQueue) – 一個(gè)活動(dòng)的鏈接隊(duì)列存在在客戶端與點(diǎn)對(duì)點(diǎn)消息提供者之間,客戶用它創(chuàng)建一個(gè)或者多個(gè)JMS隊(duì)列會(huì)話(QueueSession)
d. 隊(duì)列會(huì)話(QueueSession) – 用來創(chuàng)建隊(duì)列消息的發(fā)送者與接受者(QueueSenderand QueueReceiver)
e.消息發(fā)送者(QueueSender 或者M(jìn)essageProducer)– 發(fā)送消息到已經(jīng)聲明的隊(duì)列
f.消息接受者(QueueReceiver或者M(jìn)essageConsumer) – 接受已經(jīng)被發(fā)送到指定隊(duì)列的消息
2.發(fā)布訂閱模式(publish – subscribe Mode)
a.主題Topic(Destination) – 一個(gè)提供者命名的主題對(duì)象,客戶端將會(huì)使用這個(gè)命名的主題對(duì)象
b.主題鏈接工廠(TopciConnectionFactory) – 客戶端使用主題鏈接工廠創(chuàng)建鏈接主題
ConnectionTopic來取得與JMS消息Pub/Sub提供者的鏈接。
c.鏈接主題(ConnectionTopic) – 一個(gè)活動(dòng)的鏈接主題存在發(fā)布者與訂閱者之間
d.會(huì)話(TopicSession) – 用來創(chuàng)建主題消息的發(fā)布者與訂閱者 (TopicPublisher and TopicSubscribers)
e.消息發(fā)送者M(jìn)essageProducer) – 發(fā)送消息到已經(jīng)聲明的主題
f.消息接受者(MessageConsumer) – 接受已經(jīng)被發(fā)送到指定主題的消息
三:介紹ActiveMQ
ActiveMQ是apache社區(qū)完成的JMS開源消息組件,客戶端支持多種語言調(diào)用,包括Java,C++, C#,
Perl, Python等。支持Spring配置集成等。更多信息訪問這里:
http://activemq.apache.org/index.html
四:基于ActiveMQ的Publish/subscribe模式Demo程序
消息Broker,JMSprovider
- import java.net.URI;
- import java.net.URISyntaxException;
- import javax.jms.Connection;
- import javax.jms.ConnectionFactory;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.MessageProducer;
- import javax.jms.Session;
- import javax.jms.TextMessage;
- import javax.naming.Context;
- import javax.naming.InitialContext;
- import javax.naming.NamingException;
- import org.apache.activemq.broker.BrokerFactory;
- import org.apache.activemq.broker.BrokerService;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- /**
- * refer to http://activemq.apache.org/jndi-support.html
- * http://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html
- * @author gloomyfish
- *
- */
- public class PureJMSProducer {
- private static final Log LOG = LogFactory.getLog(PureJMSProducer.class);
- private PureJMSProducer() {
- }
- /**
- * @param args the destination name to send to and optionally, the number of
- * messages to send
- */
- public static void main(String[] args) {
- Context jndiContext = null;
- ConnectionFactory connectionFactory = null;
- Connection connection = null;
- Session session = null;
- Destination destination = null;
- MessageProducer producer = null;
- BrokerService broker = null;
- final int numMsgs = 10;
- /*
- * Create a JNDI API InitialContext object
- */
- try {
- jndiContext = new InitialContext();
- } catch (NamingException e) {
- LOG.info("Could not create JNDI API context: " + e.toString());
- System.exit(1);
- }
- // create external TCP broker
- try {
- broker = BrokerFactory.createBroker(new URI("broker:tcp://localhost:61616"));
- broker.start();
- } catch (URISyntaxException e) {
- LOG.info("Could not create broker: " + e.toString());
- } catch (Exception e) {
- LOG.info("Could not create broker: " + e.toString());
- }
- // try {
- //
- // }
- /*
- * Look up connection factory and destination.
- */
- try {
- connectionFactory = (ConnectionFactory)jndiContext.lookup("ConnectionFactory");
- destination = (Destination)jndiContext.lookup("MyTopic");
- } catch (NamingException e) {
- LOG.info("JNDI API lookup failed: " + e);
- System.exit(1);
- }
- /*
- * Create connection. Create session from connection; false means
- * session is not transacted. Create sender and text message. Send
- * messages, varying text slightly. Send end-of-messages message.
- * Finally, close connection.
- */
- try {
- connection = connectionFactory.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(destination);
- TextMessage message = session.createTextMessage();
- Thread.sleep(3000);
- for (int i = 0; i < numMsgs; i++) {
- message.setText("This is message " + (i + 1));
- LOG.info("Sending message: " + message.getText());
- producer.send(message);
- Thread.sleep(3000);
- }
- /*
- * Send a non-text control message indicating end of messages.
- */
- producer.send(session.createMessage());
- } catch (JMSException e) {
- LOG.info("Exception occurred: " + e);
- } catch (InterruptedException e) {
- LOG.info("Exception occurred: " + e);
- } finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (JMSException e) {
- }
- }
- }
- // stop the TCP broker
- try {
- broker.stop();
- } catch (Exception e) {
- LOG.info("stop the broker failed: " + e);
- }
- }
- }
客戶端:
- import java.io.IOException;
- import javax.jms.Connection;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageConsumer;
- import javax.jms.MessageListener;
- import javax.jms.Session;
- import javax.jms.TextMessage;
- import javax.jms.Topic;
- import javax.naming.InitialContext;
- import org.apache.activemq.ActiveMQConnectionFactory;
- public class ActiveMQClient {
- public static void main(String[] args) throws IOException {
- // -- http://dlc.sun.com/pdf//816-5904-10/816-5904-10.pdf
- try {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
- // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://locahost");
- Connection connection = factory.createConnection();
- connection.start();
- // create message topic
- //Topic topic= new ActiveMQTopic("MyTopic");
- InitialContext jndiContext=new InitialContext();
- Topic topic=(Topic)jndiContext.lookup("MyTopic");
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // register message consumer
- MessageConsumer comsumer1 = session.createConsumer(topic);
- comsumer1.setMessageListener(new MessageListener(){
- public void onMessage(Message m) {
- try {
- System.out.println("Consumer get " + ((TextMessage)m).getText());
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
- Thread.sleep(30000);
- session.close();
- connection.stop();
- } catch(Exception e) {
- e.printStackTrace();
- }
- }
- }
項(xiàng)目配置,Jar依賴:
依賴的三個(gè)Jar分別為:
- activemq-all.jar
- geronimo-jms_1.1_spec-1.1.1.jar
- xbean-spring.jar