这是我参与8月更文挑战的第7天,活动详情查看:8月更文挑战
基于JMS1.1实现点对点
上一章介绍了JMS 1.1 和 2.0版本。本章通过代码实例对两个版本的不同之处进行区分,进一步加深理解。
编程接口
- ConnectionFactory接口(连接工厂):ConnectionFactory是创建Connection对象的工厂,根据不同的消息类型用户可选择用队列连接工厂或者主题连接工厂,分别对应QueueConnectionFactory 和 TopicConnectionFactory。
2.Destination接口(目的地):Destination是包装了消息 目的地 标识符的受管对象消息 目的地 是指消息发布和接收的地点,消息目的地要么是队列,要么是主题。所以Desttination实际上就是两种类型的对象:Queue、Topic;
3.Connection接口(连接):表示在客户端和JMS系统之间建立的连接(实际上是对TCP/IP Socket的包装)。Connection也可以生产多个session。Connection也有两种类型:QueueConnection 和 TopicConnection;
- Session接口(会话):Session是实际操作消息的接口,表示一个单线程的上下文,用于发送消息和接收消息。因为是单线程,所以消息是按照顺序接收的。可以通过Session创建生产者、消费者、消息等。在规范中,Session还提供了事物功能。Session也同样分为两种类型:QueueSession 和 TopicSession
5.MessageProducer接口(消息生产者):消息生产者由Session创建用于将消息发送到Destination。消息生产者有两种类型:QueueSender 和 TopicPublisher;
6.MessageConsumer接口(消息消费者):消息消费者由Session创建,用于接收被发送到Destination的消息。消息消费者由两种类型:QueueReceiver 和 TopicSubscriber;
7.Message接口(消息):消息是在消费者和生产者之间传送的对象,即不同应用程序之间的通信。
8.MessageListener(消息监听器):如果注册了消息监听器,那么当消息到达时,将会自动调用监听器的onMessage方法;
代码示例 JMS1.1
生产者:
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Package: PACKAGE_NAME
* @ClassName: QueueProducer
* @Author: AZ
* @CreateTime: 2021/8/5 10:18
* @Description: 消息生产者
*/
public class QueueProducer {
//默认用户名
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认密码
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
//创建工厂
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try{
//创建连接
Connection connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建会话: 第一个参数:是否开启事物,第二个参数:消息确认模式
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//创建队列,需要指定队列名称,消息生产者和消息消费者将根据它来发送、接收对应的消息-》消息容器
Queue myTestQueue = session.createQueue("activemq-quwuw-test1");
//创建消息生产者
MessageProducer producer = session.createProducer(myTestQueue);
//创建一个消息对象
TextMessage message = session.createTextMessage("测试点对点的一条消息");
//发送一条消息
producer.send(message);
//提交事务
session.commit();
//关闭资源
session.close();
connection.close();
}catch (JMSException e){
System.out.println("消息生产异常-》"+e.getMessage());
}
}
}
复制代码
消费者:
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Package: PACKAGE_NAME
* @ClassName: QueueConsumer
* @Author: AZ
* @CreateTime: 2021/8/5 11:48
* @Description: 消费者
*/
public class QueueConsumer {
//默认用户名
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认密码
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try{
//创建连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//创建会话
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//创建队列,消息容器
Queue myTestQueue = session.createQueue("activemq-quwuw-test1");
//创建消息消费者
MessageConsumer consumer = session.createConsumer(myTestQueue);
//消费者实现监听接口消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("消费者消费的消息为-》" + textMessage.getText());
}catch (JMSException e){
System.out.println("消费异常-》"+e.getMessage());
}
try{
//提交事务
session.commit();
}catch (JMSException e){
System.out.println("消费异常-》"+e.getMessage());
}
}
});
//让主线程休眠100秒,使消费者对象能继续存活一段时间,从而能监听到消息
try {
Thread.sleep(100 * 1000);
}catch (InterruptedException e){
System.out.println("中断异常");
}
//关闭资源
session.close();
connection.close();
}catch (JMSException e){
System.out.println("消费者服务异常"+e.getMessage());
}
}
}
复制代码
测试结果:
消费者打印信息:
消费者启动
INFO | Successfully connected to tcp://localhost:61616
消费者消费的消息为-》测试点对点的一条消息
复制代码
生产者打印信息
生产者启动
INFO | Successfully connected to tcp://localhost:61616
生产者生产消息-》测试点对点的一条消息
复制代码
PS:注意要电脑本地要先启动activemq服务,不要会连接失败,无法启动程序。下载地址:ActiveMQ下载
启动解压文件下bin目录下win64(本人环境是windows系统)文件下的activemq.bat脚本,运行即可。
JMS2.0
在经典API中对发消息的代码进行了简化,API的操作更易使用。
在经典API中,消息的发送上面的消费者代码已经呈现,这里不做叙述,主要看一下2.0的版本:
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Package: PACKAGE_NAME
* @ClassName: QueueProducer
* @Author: AZ
* @CreateTime: 2021/8/5 10:18
* @Description: 消息生产者 JMS 2.0
*/
public class QueueProducerJMS20 {
//默认用户名
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认密码
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
//创建工厂
public static void main(String[] args) {
System.out.println("生产者启动JMS2.0");
//创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD, BROKER_URL);
try{
JMSContext context = connectionFactory.createContext();
Queue queue = context.createQueue("activemq-quwuw-test1");
//开始发送
String text = "发送第一条JMS2.0消息";
context.createProducer().send(queue, text);
System.out.println("发送的消息为-》"+ text);
}catch (JMSRuntimeException e){
}
}
}
复制代码
但是这个项目启动报错,如下:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
生产者启动JMS2.0
Exception in thread "main" java.lang.AbstractMethodError: org.apache.activemq.ActiveMQConnectionFactory.createContext()Ljavax/jms/JMSContext;
at QueueProducerJMS20.main(QueueProducerJMS20.java:30)
Process finished with exit code 1
复制代码
网上查了一下:大概是官方这个Bug并没有关闭。网址如下:JMS2.0不支持ActiveMQ
大致介绍一下区别:
从第二段代码可以看到:编写的代码量少了。包括:
1.只需创建一个JMSContext对象,省去了创建Connection、和Session对象;
2.JMS2不需要自己关闭资源,而是在处理完逻辑后自动调用close方法,无需在代码中显示调用;
3.在JMS1.1中创建session对象需要传入参数,指定会话性质以及消息确认模式,而在JMS2.0中,这些都是默认设置的,如果希望指定其他会话模式(本地事物、CLIENT_ACKNOWLEDGE、DUPS_OK_ACKNOWLEDGE),只需要传入一个参数即可,而不是两个参数。
4.无需创建一个TextMessage对象并将其设置为指定字符串,只需要将字符串传入send方法即可。JMS提供商自动创建一个TextMessage对象并设置为所提供的字符串;
5.在JSM1.1中,几乎所有方法都会抛出JMSException,由于该异常是已检查异常,必须要捕获并抛出异常。在JMS2.0抛出的异常是JMSRuntimeException,该异常是运行时异常,所以无须调用方法来显式捕获它,也不必在其throws子句中声明。
PS:如果有解决了这个问题的,麻烦告知一波。附上我的pom文件依赖
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
<exclusions>
<exclusion>
<artifactId>spring-context</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
</dependencies>
复制代码
近期评论