`
xpenxpen
  • 浏览: 704283 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

集成Websphere Application Server 和Active MQ

    博客分类:
  • MQ
JMS 
阅读更多
本篇我们将不采用Tomcat,改用Websphere Application Server6.1, 但MQ仍采用ActiveMQ。
SpringJMS也不用了,收消息改用EJB2.0的MDB,发消息则自己连JNDI。

0. 准备工作:建Queue
0.1.在ActiveMQ里建2个队列ConsumerQueue和ReplyQueue



Q: 如何连接Websphere 和ActiveMQ?
A: Active MQ 是一个 JMS provider, 一般我们通过 Java EE Connector architecture(JCA) 来集成 JMS provider, 我们通过 Active MQ 提供的 resource adapter 来访问 Active MQ 服务器


1.安装 Adapter

1.1. 登录 WAS console
1.2. 访问 Resources->Resource Adapters->Resource adapters



1.3. 点击 Install RAR, 选择某个 node, 在 ACTIVEMQ_HOME\lib\optional 下可以找到 activemq-rar-5.4.1.rar,这是我们需要安装的 adapter:



1.4. 点击 Next,配置参数



1.5. 查看 Custom Properties, 其中 ServerUrl 应配置成指向已经安装的 Active MQ server




2.配置 J2C connection factories
2.1. 访问 Resources->Resource Adapters->J2C connection factories
2.2. 点击New
2.3. 连接工厂的名字输入MessageQueueCF, 类型选择javax.jms.QueueConnectionFactory



2.4.点击OK


3.配置 J2C activation specifications
3.1. 访问 Resources->Resource Adapters->J2C activation specifications
3.2. 点击New
3.3. 名字输入ConsumerQueueActivationSpec



3.4.点击OK
3.5.再次进入刚配置的activationSpec,点击右侧J2C activation specification custom properties



3.6.destination 改为ConsumerQueue ,destinationType 改为javax.jms.Queue



3.7.点击上方的save




4.配置 J2C administered objects
4.1. 访问 Resources->Resource Adapters->J2C administered objects
4.2. 点击New
4.3. 名字输入ConsumerQueue ,Administered object class选择ActiveMQQueue



4.4.点击OK
4.5.再次进入刚配置的Administered object,点击右侧J2C administered objects custom properties
4.6.PhysicalName 改为ConsumerQueue



4.7.点击上方的save

同样的步骤再配一个ReplyQueue


5.建工程
5.1.打开我们的RAD,新建一个EJB Project,工程名字JMSTest,
5.2.再新建一个Enterprise Application Project,工程名字JMSTestEAR,JavaEE Module Dependencies选中刚刚建的工程JMSTest
5.3.在JMSTest工程里新建一个java类,QueueConsumerMDBBean,这个类采用EJB2.0的MDB方式监听ConsumerQueue消息,
收到消息后,我们打印了消息到控制台,然后自己用JNDI查找另外一个ReplyQueue,把此消息转发给那个Queue
package ejbs;

import javax.ejb.MessageDrivenBean;
import javax.ejb.MessageDrivenContext;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class QueueConsumerMDBBean implements MessageDrivenBean, MessageListener {
	
	private static final String ENV_JNDI_REPLY_QUEUE_CON_FACTORY = "java:comp/env/MessageQueueCF";
	private static final String ENV_JNDI_REPLY_QUEUE = "java:comp/env/ReplyQueue";
	
	private MessageDrivenContext messageDrivenCtx;
	
	public MessageDrivenContext getMessageDrivenContext() {
		return messageDrivenCtx;
	}
	public void setMessageDrivenContext(MessageDrivenContext ctx) {
		messageDrivenCtx = ctx;
	}
	public void ejbCreate() {
	}
	public void ejbRemove() {
	}
	
	/**
	 * onMessage
	 */
	public void onMessage(Message msg) {
		if (!(msg instanceof TextMessage)) {
			throw new IllegalArgumentException("Message type is not text message!");
		}
		
		try {
		
			TextMessage textMessage = (TextMessage) msg;
			System.out.println("Receive message:" + textMessage.getText());
		
			InitialContext initContext = new InitialContext();
			QueueConnectionFactory replyQueueConnectionFactory = (QueueConnectionFactory) initContext.lookup(ENV_JNDI_REPLY_QUEUE_CON_FACTORY);
			Queue replyQueue = (Queue) initContext.lookup(ENV_JNDI_REPLY_QUEUE);
			
			QueueConnection queueConnection = replyQueueConnectionFactory.createQueueConnection();
			queueConnection.start();
			QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
			QueueSender queueSender = queueSession.createSender(replyQueue);
			TextMessage outMessage = queueSession.createTextMessage(textMessage.getText());
			queueSender.send(outMessage);
			
		} catch (NamingException e) {
			throw new RuntimeException(e);
		} catch (JMSException e) {
			throw new RuntimeException(e);
		}
	}
	
}



6.配置ejb-jar.xml
6.1.双击打开JMSTest工程里的ejb-jar.xml
6.2.选择bean选项卡
6.3.点击Add…
6.4.选择Message-driven Bean, Bean name输入QueueConsumerMDB,



6.5.JMS type选择MessageListener



6.6.Transaction type选择Bean



6.7.完成以后,在右侧Destination type选择Queue,Websphere bindings选择JCA adapter,ActivationSpec JNDI Name输入ConsumerQueueActivationSpec,Destination JNDI name输入ConsumerQueue



6.8.选择References选项卡
6.9.选中QueueConsumerMDB,点击Add…
6.10.选择resource reference



6.11.名字输入MessageQueueCF,Type选择javax.jms.QueueConnectionFactory,Authentication选择Application,Sharing scope选择shareable



6.12.完成以后,在右侧JNDI Name输入MessageQueueCF



6.13.选中QueueConsumerMDB,点击Add…
6.14.选择resource reference
6.15.名字输入ReplyQueue,Type选择javax.jms.Queue,Authentication选择Application,Sharing scope选择shareable



6.16.完成以后,在右侧JNDI Name输入ReplyQueue



6.17.最后不要忘了保存ejb-jar.xml

7.测试

7.1.打开ActiveMQ图形化管理界面,点击ConsumerQueue右边的"Send To",输入消息,点击Send



7.2.可在控制台看到打印的消息,证明MDB已经接收到消息



7.3.并且可以看到ReplyQueue里接受到一条消息,一层层点进去,确认一下果然是那句消息,证明发送消息也成功了。







最后附上RAD工程文件
  • 大小: 60.5 KB
  • 大小: 34.2 KB
  • 大小: 27.8 KB
  • 大小: 47.1 KB
  • 大小: 9.3 KB
  • 大小: 21.1 KB
  • 大小: 13.1 KB
  • 大小: 5.9 KB
  • 大小: 39.3 KB
  • 大小: 18.1 KB
  • 大小: 25.6 KB
  • 大小: 5.9 KB
  • 大小: 45.1 KB
  • 大小: 32.1 KB
  • 大小: 63.8 KB
  • 大小: 43.2 KB
  • 大小: 39.3 KB
  • 大小: 39.9 KB
  • 大小: 29 KB
  • 大小: 38.5 KB
  • 大小: 24.7 KB
  • 大小: 57.2 KB
  • 大小: 25.1 KB
  • 大小: 9.3 KB
  • 大小: 13.1 KB
  • 大小: 2.8 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics