Eclipse+JBoss+EJB3消息驅(qū)動Bean
在前面的文章中給出的SessionBean的例子都是同步調(diào)用SessionBean方法的,也就是說,只有當方法中的代碼都執(zhí)行完,才能返回到客戶端。但在某些情況下,由于SessionBean方法的執(zhí)行時間比較長,這就需要異步地調(diào)用該方法,否則客戶端就需要等待比較長的時間。要實現(xiàn)異步調(diào)用,就需要使用本要講的消息驅(qū)動Bean。消息驅(qū)動Bean的基本原理是客戶端向消息服務(wù)器發(fā)送一條消息后,消息服務(wù)器會將該消息保存在消息隊列中。在這時消息服務(wù)器中的某個消費者(讀取并處理消息的對象)會讀取該消息,并進行處理。發(fā)送消息的客戶端被稱為消息生產(chǎn)者。
本文給出的消息驅(qū)動Bean的例子的基本功能是客戶端向消息服務(wù)器發(fā)送一條消息(該消息實際上是一個實體Bean的對象實例),然后消息消費者讀取這條消息后,將消息中的實體Bean持久化。實現(xiàn)消息驅(qū)動Bean的步驟如下:
一、實現(xiàn)實體Bean
- package entity;
- import java.io.Serializable;
- import java.util.Date;
- import javax.persistence.Column;
- import javax.persistence.Entity;
- import javax.persistence.GeneratedValue;
- import javax.persistence.GenerationType;
- import javax.persistence.Id;
- import javax.persistence.Table;
- @Entity
- @Table(name="t_date")
- public class DateBean implements Serializable
- {
- private int id;
- private Date myDate;
- @Id
- @GeneratedValue(strategy=GenerationType.IDENTITY)
- public int getId()
- {
- return id;
- }
- public void setId(int id)
- {
- this.id = id;
- }
- @Column(name="mydate")
- public Date getMyDate()
- {
- return myDate;
- }
- public void setMyDate(Date myDate)
- {
- this.myDate = myDate;
- }
- }
二、編寫消息驅(qū)動Bean
消息驅(qū)動Bean必須實現(xiàn)MessageListener接口,當該消息驅(qū)動Bean接收到一個消息后,EJB容器就會調(diào)用MessageListener接口的onMessage方法來理該消息。消息驅(qū)動Bean的代碼如下:
- package service;
- import javax.ejb.ActivationConfigProperty;
- import javax.ejb.EJBException;
- import javax.ejb.MessageDriven;
- import javax.jms.Message;
- import javax.jms.MessageListener;
- import javax.jms.ObjectMessage;
- import javax.persistence.EntityManager;
- import javax.persistence.PersistenceContext;
- import entity.DateBean;
- @MessageDriven( activationConfig = {
- @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
- @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/MDBQueue")
- })
- public class DateMessageBean implements MessageListener
- {
- @PersistenceContext(unitName = "myentity1")
- private EntityManager em;
- @Override
- public void onMessage(Message message)
- {
- try
- {
- if(message instanceof ObjectMessage)
- {
- ObjectMessage objmsg = (ObjectMessage) message;
- DateBean dateBean = (DateBean) objmsg.getObject();
- em.persist(dateBean);
- System.out.println("成功持久化DateBean對象!");
- }
- else
- {
- System.out.println("消息類型錯誤!");
- }
- }
- catch (Exception e)
- {
- throw new EJBException(e);
- }
- }
- }
消息驅(qū)動Bean需要使用@MessageDriven進行注釋。要注意的是destination屬性的值是queue/MDBQueue。JBoss不會自已建立一個Queue對象,因此,需要手工來配置Queue對象。讀者可以\server\default\deploy目錄中建立一個xxx-service.xml文件,其中xxx可以任意取值,但必須跟“-service”后綴,例如,abc-service.xml。該文件可以放在deploy或其子目錄(可以是多層子目錄)中。該文件的內(nèi)容如下:
- < xml version="1.0" encoding="UTF-8"?>
- < server>
- < mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.mq.destination:service=Queue,name=MDBQueue">
- < depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManagerdepends>
- < mbean>
- < server>
要注意的是,元素的name屬性值中的name必須是MDBQueue,要與queue/MDBQueue中的/后面的部分一致。如果不進行上面的配置,在啟動JBOSS時就會拋出如下的異常:
javax.naming.NameNotFoundException: MDBQueue not bound
也可以將元素放在deploy目錄中的其他以-service.xml結(jié)尾的文件中。
如果不設(shè)置destination屬性的值,在啟動JBoss是會拋出如下的異常:
- org.jboss.deployers.spi.DeploymentException: Required config property RequiredConfigPropertyMetaData@174098f[name=destination descriptions=[DescriptionMetaData@4ca30b[language=zh]]] for messagingType 'javax.jms.MessageListener' not found in activation config [ActivationConfigProperty(destinationType=javax.jms.Queue), ActivationConfigProperty(connectionFactoryJndiName=MyQueueConnectionFactory), ActivationConfigProperty(destinationName=MyRequestQueue)] ra=jboss.jca:service=RARDeployment,name='jms-ra.rar'
- ... ...
三、編寫調(diào)用消息驅(qū)動Bean的SessionBean
- package service;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.List;
- import javax.annotation.Resource;
- import javax.ejb.Stateless;
- import javax.jms.Connection;
- import javax.jms.ConnectionFactory;
- import javax.jms.MessageProducer;
- import javax.jms.ObjectMessage;
- import javax.jms.Queue;
- import javax.jms.Session;
- import javax.persistence.EntityManager;
- import entity.DateBean;
- import entity.Greeting;
- @Stateless
- public class GreeterBean implements Greeter
- {
- @Resource(mappedName = "ConnectionFactory")
- private ConnectionFactory cf;
- @Resource(mappedName = "queue/MDBQueue")
- private Queue queue;
- @Override
- public String greet(String message)
- {
- try
- {
- DateBean db = new DateBean();
- db.setMyDate(new Date());
- Connection connection = cf.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer messageProducer = session.createProducer(queue);
- ObjectMessage objectMessage = session.createObjectMessage();
- objectMessage.setObject(db);
- messageProducer.send(objectMessage);
- connection.close();
- System.out.println("成功發(fā)送消息!");
- }
- catch (Exception e)
- {
- System.out.println("發(fā)送消息失??!");
- }
- return "方法成功返回";
- }
- }
在上面的代碼中使用ObjectMessage對象來包裝要向消息服務(wù)器發(fā)送的實體Bean的對象實例。
除了可以在SessionBean中訪問消息驅(qū)動Bean外,還可以在不同的機器上通過jndi來查找并調(diào)用消息驅(qū)動Bean,代碼如下:
- package test;
- import java.util.Date;
- import javax.ejb.EJB;
- import javax.jms.Destination;
- import javax.jms.MessageProducer;
- import javax.jms.ObjectMessage;
- import javax.jms.Queue;
- import javax.jms.QueueConnection;
- import javax.jms.QueueConnectionFactory;
- import javax.jms.QueueSession;
- import javax.jms.TextMessage;
- import javax.naming.InitialContext;
- import entity.DateBean;
- import service.Greeter;
- public class Client
- {
- public static void main(String[] args) throws Exception
- {
- InitialContext ctx = new InitialContext();
- QueueConnection connection = null;
- QueueSession session = null;
- QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
- connection = factory.createQueueConnection();
- session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
- Destination destination = (Queue) ctx.lookup("queue/MDBQueue");
- MessageProducer messageProducer = session.createProducer(destination);
- ObjectMessage objectMessage = session.createObjectMessage();
- DateBean db = new DateBean();
- db.setMyDate(new Date());
- objectMessage.setObject(db);
- messageProducer.send(objectMessage);
- connection.close();
- System.out.println("成功發(fā)送消息!");
- }
- }
【編輯推薦】