ActiveMQ是Apache下的一个支持JMS的开源框架。
一:特性及优势
1、实现JMS1.1规范,支持J2EE1.4以上,支持两种消息传递模型:点对点(PTP)、发布-订阅(PUB/SUB)模型,支持持久消息。2、可运行于任何jvm和大部分web容器(ActiveMQworks great in any JVM)
3、支持多种语言客户端(java, C, C++,AJAX, ACTIONSCRIPT等等)4、支持多种协议(stomp,openwire,REST)
5、良好的spring支持(ActiveMQ hasgreat Spring Support)6、速度很快,JBossMQ的十倍(ActiveMQ isvery fast; often 10x faster than JBossMQ.)7、与OpenJMS、JbossMQ等开源jmsprovider相比,ActiveMQ有Apache的支持,持续发展的优势明显。二:下载安装
1:到下载最新的ActiveMQ。同时下载apache-tomcat-7.0.12-windows-x86.zip。2:直接解压至任意目录,如C:\JMS\apache-activemq-5.5.0。
3:启动ActiveMQ服务器,直接运行apache-activemq-5.5.0\bin\activemq.bat)4:ActiveMQ消息管理后台系统:,打开主页后,可以看到ActiveMQ中自带的一些实例。)
三:详细实例
1:准备jar包将ActiveMQlib目录下的5个jar包复制到Tomcatlib目录下:
activemq-core-5.5.0.jar
activemq-web-5.5.0.jargeronimo-j2ee-management_1.1_spec-1.0.1.jargeronimo-jms_1.1_spec-1.1.1.jargeronimo-jta_1.0.1B_spec-1.0.1.jar或者只加activemq-all-5.5.0.jar。还要加入slf4j-nop-1.5.8.jar。
2:修改配置文件。
1修改Tomcat下的conf/context.xml文件:在<context></context>节点中添加以下内容:配置说明:以JNDI的方式定义了ActiveMQ的broker连接url、Topic和Queue。 此处需加以注意的是Listener端的borkerURL使用了failover传输方式: failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5
客户端使用普通传输方式:tcp://localhost:616162
failovertransport是一种重新连接机制,用于建立可靠的传输。此处配置的是一旦ActiveMQbroker中断,Listener端将每隔100ms自动尝试连接,直至成功连接或重试5次连接失败为止。
failover还支持多个borker同时提供服务,实现负载均衡的同时可增加系统容错性,格式:failover:(uri1,...,uriN)?transportOptions
在eclipse中新建一个名为ActiveMQ_Tomcat的Dynamic WebProject工程,新建一个activemq库,里面的jar包有activemq-core-5.5.0.jar和geronimo-jms_1.1_spec-1.1.1.jar。或者直接加入activemq-all-5.5.0.jar。还要加入slf4j-api-1.5.8.jar,slf4j-nop-1.5.8.jar和javax.servlet-5.1.12.jar。在src下,新建一个名为com.jms.servlet的包,然后再修改web.xml,在里面加上
修改activemq.xml文件。jms-listener com.jms.servlet.JMSListener 1
为了支持持久化消息,需要修改ActiveMQ的配置文件如下,使用默认的AMQ Message Store方式(索引文件方式)存储消息,据官网介绍是快速、稳定的。数据库存储方式可参照官网相关文档。
(备注:2.3中红色部分直接加到后面就行,不用修改)
broker的属性有:<!--persistent="true"表示要持久化存储消息,和子元素persistenceAdapter结合使用--><!--dataDirectory默认的存储持久化数据的目录-->
<!--brokerName设置broker的name,注意在网络上必须是唯一的-->
在com.jms.servlet包中新建一个监听器类,类名是JMSListener.java。
JMSListener.java代码:package com.jms.servlet;import javax.servlet.*;import javax.servlet.http.*;import javax.naming.*;import javax.jms.*;public class JMSListener extends HttpServlet implementsMessageListener {privatestatic final long serialVersionUID = 5088494289145588596L;public voidinit(ServletConfig config) throws ServletException {InitialContext initialContext = new InitialContext();Contextcontext = (Context) initialContext.lookup("java:comp/env");ConnectionFactory connectionFactory = (ConnectionFactory)context.lookup("jms/FailoverConnectionFactory");Connectionconnection = connectionFactory.createConnection();connection.setClientID("MyClient");connection.start();Sessionsession = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);//普通消息订阅者,无法接收持久消息Destination destination = (Destination) context.lookup("jms/topic/MyTopic");MessageConsumer consumer =session.createConsumer(destination); //基于Topic创建持久的消息订阅者,前提:Connection必须指定一个唯一的clientId,当前为MyClientTopic topic= (Topic) context.lookup("jms/topic/MyTopic");TopicSubscriber consumer =session.createDurableSubscriber(topic, "MySub");} consumer.setMessageListener(this);} catch(NamingException e) {e.printStackTrace();{} catch(JMSException e) {e.printStackTrace();}public voidonMessage(Message message) {if(checkText(message, "RefreshArticleId") != null) {StringarticleId = checkText(message, "RefreshArticleId");System.out.println("refresh article, ID=" + articleId);} else if(checkText(message, "RefreshTopicId") != null) {StringtopicId = checkText(message, "RefreshTopicId");System.out.println("refresh topic, ID=" + topicId);} else{System.out.println("it's normal message, no need to care");}}private static String checkText(Message m, String s) {try {returnm.getStringProperty(s);} catch(JMSException e) {e.printStackTrace(System.out);return null;}}}建立发布端,类名为MyPublish.java,代码如下:
package com.jms.servlet;import java.io.IOException;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageProducer;import javax.jms.Session;import javax.naming.Context;importjavax.naming.InitialContext;importjavax.naming.NamingException;importjavax.servlet.ServletException;importjavax.servlet.http.HttpServlet;importjavax.servlet.http.HttpServletRequest;importjavax.servlet.http.HttpServletResponse;public class MyPublish extends HttpServlet {private static final long serialVersionUID = 8861449351626383534L;private InitialContext initialContext;private Contextcontext;privateConnectionFactoryconnectionFactory;private Connection connection;private Session session;private Destination destination;private MessageProducer messageProducer;public void init() throwsServletException{try {initialContext = new InitialContext();context = (Context)initialContext.lookup("java:comp/env");connectionFactory = (ConnectionFactory) context.lookup("jms/NormalConnectionFactory");connection = connectionFactory.createConnection();session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);destination = (Destination) context.lookup("jms/topic/MyTopic");messageProducer = session.createProducer(destination);} catch(NamingExceptione) {}}