初识activeMQ

项目需要用到消息队列,就想先了解一下activeMQ这个东西

关于activeMQ

ActiveMQ是Apache出品的,非常流行的消息中间件。

安装

上官网下载对应系统的版本。http://activemq.apache.org/.

启动

windows 直接运行bin目录下activemq.bat
linux 在bin目录下执行./activemq start

目录介绍

  • conf里面是配置文件,重点关注的是activemq.xml(链接端口灯)、jetty.xml(登录地址,端口信息)、jetty-realm.properties(Web控制台需要用户名、密码信息)。。

  • data目录下是ActiveMQ进行消息持久化存放的地方,默认采用的是kahadb,当然我们可以采用leveldb,或者采用JDBC存储到MySQL,或者干脆不使用持久化机制。

  • webapps,注意ActiveMQ自带Jetty提供Web管控台

使用activeMQ

配置maven

根据activeMQ版本增加对应依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.0</version>
</dependency>

消息队列P2P模式

创建生产者队列并发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package active_mq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TestActiveMQ {

ConnectionFactory getConnectionFactory(){
return new ActiveMQConnectionFactory("user","uesr","tcp://localhost:61616");
}

void testMQ() throws JMSException {
//创建Connection
Connection conn = getConnectionFactory().createConnection();
conn.start();
//创建Session
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//创建Destination 目标队列
Destination destination = session.createQueue("firstQueue");
//创建MessageProducer 生产者
MessageProducer messageProducer = session.createProducer(destination);
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//定义消息对象
TextMessage textMessage = session.createTextMessage();
textMessage.setText("Hello,ActiveMQ"+System.currentTimeMillis());
//发送
messageProducer.send(textMessage);
//关闭链接
if (conn != null) {
conn.close();
}
}

public static void main(String[] args) {
TestActiveMQ activeMQ = new TestActiveMQ();
try {
activeMQ.testMQ();
activeMQ.testMQ();
activeMQ.testMQ();
} catch (JMSException e) {
e.printStackTrace();
}
}
}

发送了三条消息

创建消费者并接收消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package active_mq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TestActiveMQConsumer {

ConnectionFactory getConnectionFactory(){
return new ActiveMQConnectionFactory("user","uesr","tcp://localhost:61616");
}

void testMQ(String name) throws JMSException {
Connection conn = getConnectionFactory().createConnection();
conn.start();
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("firstQueue");
MessageConsumer messageConsumer = session.createConsumer(destination);
//设置listener,封装好了消息轮询
messageConsumer.setMessageListener(message -> {
try {
System.out.println("消息来啦:"+name);
System.out.println(((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
}

public static void main(String[] args) {
TestActiveMQConsumer activeMQ = new TestActiveMQConsumer();
try {
activeMQ.testMQ("xiaohong");
activeMQ.testMQ("xiaolv");
} catch (JMSException e) {
e.printStackTrace();
}
}
}

运行结果:结果每个人可能会不一样。

1
2
3
4
5
6
消息来啦:xiaohong
Hello,ActiveMQ1529400563732
消息来啦:xiaolv
Hello,ActiveMQ1529400563763
消息来啦:xiaohong
Hello,ActiveMQ1529400563788

订阅模式Pub/Sub

一对多通信,发送一条消息,所有订阅了该目标的消费者都会收到消息。
P2P、Pub/Sub在代码上的区别点仅仅在于,目标类型的创建是createQueue or createTopic,其他一切照旧

创建生产者队列并发送消息

代码跟队列基本没差别。只是创建的消息对象时topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package active_mq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TestActiveMQ {

ConnectionFactory getConnectionFactory(){
return new ActiveMQConnectionFactory("user","uesr","tcp://localhost:61616");
}

void testMQ() throws JMSException {
Connection conn = getConnectionFactory().createConnection();
conn.setClientID("clientFirst");
conn.start();
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//创建消息订阅对象
Destination destination = session.createTopic("topic");
MessageProducer messageProducer = session.createProducer(destination);
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage textMessage = session.createTextMessage();
textMessage.setText("Hello,ActiveMQ"+System.currentTimeMillis());
messageProducer.send(textMessage);
if (conn != null) {
conn.close();
}
}

public static void main(String[] args) {
TestActiveMQ activeMQ = new TestActiveMQ();
try {
activeMQ.testMQ();
activeMQ.testMQ();
activeMQ.testMQ();
} catch (JMSException e) {
e.printStackTrace();
}
}
}

订阅消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package active_mq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TestActiveMQConsumer {

ConnectionFactory getConnectionFactory(){
return new ActiveMQConnectionFactory("user","uesr","tcp://localhost:61616");
}

void testMQ(String name) throws JMSException {
Connection conn = getConnectionFactory().createConnection();
//设置id后,不在线也不会丢失订阅消息,下次上线的时候就可以获取到消息
conn.setClientID(name);
conn.start();
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("topic");
//订阅方式的差别在这里
MessageConsumer messageConsumer = session.createDurableSubscriber((Topic)destination,name);
messageConsumer.setMessageListener(message -> {
try {
System.out.println("消息来啦:"+name);
System.out.println(((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
}

public static void main(String[] args) {
TestActiveMQConsumer activeMQ = new TestActiveMQConsumer();
try {
activeMQ.testMQ("xiaohong");
activeMQ.testMQ("xiaolv");
} catch (JMSException e) {
e.printStackTrace();
}
}
}

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
消息来啦:xiaolv
Hello,ActiveMQ1529403218938
消息来啦:xiaohong
Hello,ActiveMQ1529403218938
消息来啦:xiaohong
Hello,ActiveMQ1529403218975
消息来啦:xiaolv
Hello,ActiveMQ1529403218975
消息来啦:xiaolv
Hello,ActiveMQ1529403219001
消息来啦:xiaohong
Hello,ActiveMQ1529403219001