什么是ActiveMQ
ActiveMQ是Apache出品,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
JMS介绍
JMS的全称是Java Message Service,即Java消息服务。用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。
对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
· StreamMessage -- Java原始值的数据流
· MapMessage--一套名称-值对
· TextMessage--一个字符串对象
· ObjectMessage--一个序列化的 Java对象
· BytesMessage--一个字节的数据流
JMS应用程序接口
ConnectionFactory 接口(连接工厂)
用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。
Connection 接口(连接)
连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。
Destination 接口(目标)
目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。
MessageConsumer 接口(消息消费者)
由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。
MessageProducer 接口(消息生产者)
由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。
Message 接口(消息)
是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另一个应用程序。一个消息有三个主要部分:
消息头(必须):包含用于识别和为消息寻找路由的操作设置。
一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。
消息接口非常灵活,并提供了许多方式来定制消息的内容。
Session 接口(会话)
表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。
ActiveMQ 安装
ActiveMQ 依赖JDK版本
MQ版本号 Build-Jdk 依赖JDK
apache-activemq-5.0.0 1.5.0_12 1.5+
...
apache-activemq-5.4.0 1.5.0_19 1.5+
apache-activemq-5.5.0 1.6.0_23 1.6+
...
apache-activemq-5.9.0 1.6.0_51 1.6+
apache-activemq-5.10.0 1.7.0_12-ea 1.7+
...
apache-activemq-5.14.0 1.7.0_80 1.7+
apache-activemq-5.15.0 1.8.0_112 1.8+
下载解压,启动activemq.bat
ActiveMQ 默认用户名和密码
用户名:admin
密码:admin
可以在/conf/users.properties中寻找。
默认登录地址:
Producer
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。
注:8161是后台管理系统,61616是给java用的tcp端口
import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Queue;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;/** * 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。第二步:使用ConnectionFactory对象创建一个Connection对象。第三步:开启连接,调用Connection对象的start方法。第四步:使用Connection对象创建一个Session对象。第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。第六步:使用Session对象创建一个Producer对象。第七步:创建一个Message对象,创建一个TextMessage对象。第八步:使用Producer对象发送消息。第九步:关闭资源。 * */public class ActiveMQProduder { public static void main(String[] args) { //第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { //第二步:使用ConnectionFactory对象创建一个Connection对象。 Connection connection = connectionFactory.createConnection(); //第三步:开启连接,调用Connection对象的start方法。 connection.start(); //第四步:使用Connection对象创建一个Session对象。 //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。 //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。 //参数:队列的名称。 // Destination :消息的目的地消息发送给谁. Destination destination = session.createQueue("test-queue"); //Destination 是Queue父类 //Queue queue = session.createQueue("test-queue"); // 第六步:使用Session对象创建一个Producer对象。 MessageProducer producer = session.createProducer(destination); //创建方式等价的 //session.createProducer(queue); // 第七步:创建一个Message对象,创建一个TextMessage对象。 /*TextMessage message = new ActiveMQTextMessage(); message.setText("hello activeMq,this is my first test.");*/ TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test."); // 第八步:使用Producer对象发送消息。 producer.send(textMessage); System.out.println("producer 创建的消息:"+textMessage.getText()); // 第九步:关闭资源。 producer.close(); session.close(); connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }}
下面看下生产一条消息控制台的变化
Queues
Name ↑ | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued | Views | Operations |
---|---|---|---|---|---|---|
test-queue | 1 | 0 | 1 | 0 | Browse Active Consumers |
Number Of Pending Messages 生产了没有消费的message 为1
Message Enqueued 进入消息队列的message 为1
Consumer
消费者有两种消费方法:
1、同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。
2、异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。
实现MessageListener接口,在MessageListener()方法中实现消息的处理逻辑。import java.io.IOException;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;/** * * 消费者有两种消费方法::1、同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。2、异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。实现MessageListener接口,在MessageListener()方法中实现消息的处理逻辑。 * */public class ActiveMQConsumer{ public static void main(String[] args) { //第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { //第二步:使用ConnectionFactory对象创建一个Connection对象。 Connection connection = connectionFactory.createConnection(); //第三步:开启连接,调用Connection对象的start方法。 connection.start(); //第四步:使用Connection对象创建一个Session对象。 //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。 //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。 //参数:队列的名称。 // Destination :消息的目的地消息发送给谁. Destination destination = session.createQueue("test-queue"); //第五步:创建一个消费者 MessageConsumer consumer = session.createConsumer(destination); /** while(true) { //第六步:同步接收消息 设置接收消息的时间100s Message message = consumer.receive(100000); if(message != null){ System.out.println("consumer接收的消息:"+message); }else{ break; } }*/ //第七步: 设置消息监听,异步接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage=(TextMessage) message; try { System.out.println("异步接收consumer消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //方便看到消息 System.in.read(); // 第八步:关闭资源。 consumer.close(); session.close(); connection.close(); }catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); }catch(IOException e1){ e1.printStackTrace(); } } }
Queues
Name ↑ | Number Of Pending Messages | Number Of Consumers | Messages Enqueued | Messages Dequeued | Views | Operations |
---|---|---|---|---|---|---|
test-queue | 0 | 1 | 1 | 1 | Browse Active Consumers |
消费一条消息的变化
Number Of Pending Messages 生产了没有消费的message 为0
Number Of Pending Messages 消费的消息的message 为1
Message Enqueued 进入消息队列的message 为1
Message Dequeued 进入消息队列的message 为1
**Topic **
队列(Queue)和主题(Topic)是JMS支持的两种消息传递模型:
1、点对点(point-to-point,简称PTP)Queue消息传递模型: 通过该消息传递模型,一个应用程序(即消息生产者)可以向另外一个应用程序(即消息消费者)发送消息。在此传递模型中,消息目的地类型是队列(即Destination接口实现类实例由Session接口实现类实例通过调用其createQueue方法并传入队列名称而创建)。消息首先被传送至消息服务器端特定的队列中,然后从此对列中将消息传送至对此队列进行监听的某个消费者。同一个队列可以关联多个消息生产者和消息消费者,但一条消息仅能传递给一个消息消费者。如果多个消息消费者正在监听队列上的消息,,JMS消息服务器将根据“先来者优先”的原则确定由哪个消息消费者接收下一条消息。如果没有消息消费者在监听队列,消息将保留在队列中,直至消息消费者连接到队列为止。这种消息传递模型是传统意义上的懒模型或轮询模型。在此模型中,消息不是自动推动给消息消费者的,而是要由消息消费者从队列中请求获得。 2、发布/订阅(publish/subscribe,简称pub/sub)Topic消息传递模型:通过该消息传递模型,应用程序能够将一条消息发送给多个消息消费者。在此传送模型中,消息目的地类型是主题(即Destination接口实现类实例由Session接口实现类实例通过调用其createTopic方法并传入主题名称而创建)。消息首先由消息生产者发布至消息服务器中特定的主题中,然后由消息服务器将消息传送至所有已订阅此主题的消费者。主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时该消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。如果消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与PTP消息传递模型不同,pub/sub消息传递模型允许多个主题订阅者接收同一条消息。JMS一直保留消息,直至所有主题订阅者都接收到消息为止。pub/sub消息传递模型基本上是一个推模型。在该模型中,消息会自动广播,消息消费者无须通过主动请求或轮询主题的方法来获得新的消息。
具体区别对比如下:
类型 | Topic | Queue |
概要 | Publish Subscribe messaging 发布订阅消息 | Point-to-Point 点对点 |
有无状态 | topic数据默认不落地,是无状态的。 | Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。 |
完整性保障 | 并不保证publisher发布的每条数据,Subscriber都能接受到。 | Queue保证每条数据都能被receiver接收。 |
消息是否会丢失 | 一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。 | Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。 |
消息发布接收策略 | 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器 | 一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。 |
** TopicConsumer**
import java.io.IOException;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class TopicConsumer { public static void main(String[] args) { //第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { //第二步:使用ConnectionFactory对象创建一个Connection对象。 Connection connection = connectionFactory.createConnection(); //第三步:开启连接,调用Connection对象的start方法。 connection.start(); //第四步:使用Connection对象创建一个Session对象。 //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。 //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。 //参数:话题的名称。 Topic topic = session.createTopic("test-topic"); //第五步:创建一个消费者 MessageConsumer consumer = session.createConsumer(topic); System.out.println("consumer消息者3"); //第六步: 设置消息监听,异步接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage=(TextMessage) message; try { System.out.println("异步接收consumer消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //方便看到消息 System.in.read(); // 第七步:关闭资源。 consumer.close(); session.close(); connection.close(); }catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); }catch(IOException e1){ e1.printStackTrace(); } }}
Topics
Name ↑ | Number Of Consumers | Messages Enqueued | Messages Dequeued | Operations |
---|---|---|---|---|
test-topic | 3 | 0 | 0 | Send To Active Subscribers |
TopicProduder
import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class TopicProduder { public static void main(String[] args) { //第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { //第二步:使用ConnectionFactory对象创建一个Connection对象。 Connection connection = connectionFactory.createConnection(); //第三步:开启连接,调用Connection对象的start方法。 connection.start(); //第四步:使用Connection对象创建一个Session对象。 //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。 //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。 //参数:话题的名称。 Topic topic = session.createTopic("test-topic"); // 第六步:使用Session对象创建一个Producer对象。 MessageProducer producer = session.createProducer(topic); // 第七步:创建一个Message对象,创建一个TextMessage对象。 /*TextMessage message = new ActiveMQTextMessage(); message.setText("hello activeMq,this is my first test.");*/ TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test."); // 第八步:使用Producer对象发送消息。 producer.send(textMessage); System.out.println("producer 创建的消息:"+textMessage.getText()); // 第九步:关闭资源。 producer.close(); session.close(); connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }}
注:由于发布订阅关系,消息不会落地,要先启动多个消费者监听订阅的消息,生产者(消息发布者)发送消息后监听订阅消息的消费者都会接到消息
看下控制台的情况
Topics
Name ↑ | Number Of Consumers | Messages Enqueued | Messages Dequeued | Operations |
---|---|---|---|---|
test-topic | 3 | 1 | 3 | Send To Active Subscribers |
Active Non-Durable Topic Subscribers
Destination | Selector | Pending Queue Size | Dispatched Queue Size | Dispatched Counter | Enqueue Counter | Dequeue Counter | Operations | |||
---|---|---|---|---|---|---|---|---|---|---|
test-topic | 0 | 0 | 1 | 1 | 1 | |||||
test-topic | 0 | 0 | 1 | 1 | 1 | |||||
test-topic | 0 | 0 | 1 | 1 | 1 |
这样的特性就可以应用分布式中应用解构,异步消息队列,流量削锋比较重要的中间件
特别介绍几种代表的应用场景
应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口
假如库存系统无法访问,则订单减库存将失败,从而导致订单失败
订单系统与库存系统耦合
- 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
- 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。
- 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。
流量削锋
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
- 可以控制活动的人数;
- 可以缓解短时间内高流量压垮应用;
- 用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面;
- 秒杀业务根据消息队列中的请求信息,再做后续处理
再来简单看看Linux上部署ActiveMQ
安装activemq
1、gz文件拷贝到/usr/local/src目录
2、解压启动
tar -zxvf apache-activemq-5.14.0-bin.tar.gz
cd apache-activemq-5.14.0
cd bin
./activemq start
netstat -anp|grep 61616
3.开通防火墙8161(web管理页面端口)、61616(activemq服务监控端口) 两个端口
/sbin/iptables -I INPUT -p tcp --dport 8161 -j ACCEPT&&/etc/init.d/iptables save&&service iptables restart&&/etc/init.d/iptables status
/sbin/iptables -I INPUT -p tcp --dport 61616 -j ACCEPT&&/etc/init.d/iptables save&&service iptables restart&&/etc/init.d/iptables status
常用命令
- activemq-admin stop
- activemq-admin list
- activemq-admin query
- activemq-admin bstat
- activemq-admin browse
activemq官方命令行手册:http://activemq.apache.org/activemq-command-line-tools-reference.html