RocketMQ学习-消息协议(三)基于JMS1.1实现点

这是我参与8月更文挑战的第7天,活动详情查看:8月更文挑战

基于JMS1.1实现点对点

上一章介绍了JMS 1.1 和 2.0版本。本章通过代码实例对两个版本的不同之处进行区分,进一步加深理解。

编程接口

  1. ConnectionFactory接口(连接工厂):ConnectionFactory是创建Connection对象的工厂,根据不同的消息类型用户可选择用队列连接工厂或者主题连接工厂,分别对应QueueConnectionFactory 和 TopicConnectionFactory。

2.Destination接口(目的地):Destination是包装了消息 目的地 标识符的受管对象消息 目的地 是指消息发布和接收的地点,消息目的地要么是队列,要么是主题。所以Desttination实际上就是两种类型的对象:Queue、Topic;

3.Connection接口(连接):表示在客户端和JMS系统之间建立的连接(实际上是对TCP/IP Socket的包装)。Connection也可以生产多个session。Connection也有两种类型:QueueConnection 和 TopicConnection;

  1. Session接口(会话):Session是实际操作消息的接口,表示一个单线程的上下文,用于发送消息和接收消息。因为是单线程,所以消息是按照顺序接收的。可以通过Session创建生产者、消费者、消息等。在规范中,Session还提供了事物功能。Session也同样分为两种类型:QueueSession 和 TopicSession

5.MessageProducer接口(消息生产者):消息生产者由Session创建用于将消息发送到Destination。消息生产者有两种类型:QueueSender 和 TopicPublisher;

6.MessageConsumer接口(消息消费者):消息消费者由Session创建,用于接收被发送到Destination的消息。消息消费者由两种类型:QueueReceiver 和 TopicSubscriber;

7.Message接口(消息):消息是在消费者和生产者之间传送的对象,即不同应用程序之间的通信。

8.MessageListener(消息监听器):如果注册了消息监听器,那么当消息到达时,将会自动调用监听器的onMessage方法;

代码示例 JMS1.1

生产者:

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Package: PACKAGE_NAME
 * @ClassName: QueueProducer
 * @Author: AZ
 * @CreateTime: 2021/8/5 10:18
 * @Description: 消息生产者
 */
public class QueueProducer {
    //默认用户名
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    //默认密码
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    //默认连接地址
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    //创建工厂
    public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);

        try{
            //创建连接
            Connection connection = connectionFactory.createConnection();

            //启动连接
            connection.start();

            //创建会话: 第一个参数:是否开启事物,第二个参数:消息确认模式
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

            //创建队列,需要指定队列名称,消息生产者和消息消费者将根据它来发送、接收对应的消息-》消息容器
            Queue myTestQueue = session.createQueue("activemq-quwuw-test1");

            //创建消息生产者
            MessageProducer producer = session.createProducer(myTestQueue);

            //创建一个消息对象
            TextMessage message = session.createTextMessage("测试点对点的一条消息");

            //发送一条消息

            producer.send(message);

            //提交事务
            session.commit();

            //关闭资源
            session.close();
            connection.close();

        }catch (JMSException e){
            System.out.println("消息生产异常-》"+e.getMessage());
        }
    }
}
复制代码

消费者:

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Package: PACKAGE_NAME
 * @ClassName: QueueConsumer
 * @Author: AZ
 * @CreateTime: 2021/8/5 11:48
 * @Description: 消费者
 */
public class QueueConsumer {

    //默认用户名
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    //默认密码
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    //默认连接地址
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try{
            //创建连接
            Connection connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //创建会话
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //创建队列,消息容器
            Queue myTestQueue = session.createQueue("activemq-quwuw-test1");
            //创建消息消费者
            MessageConsumer consumer = session.createConsumer(myTestQueue);
            //消费者实现监听接口消息
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("消费者消费的消息为-》" + textMessage.getText());
                    }catch (JMSException e){
                        System.out.println("消费异常-》"+e.getMessage());
                    }
                    try{
                        //提交事务
                        session.commit();
                    }catch (JMSException e){
                        System.out.println("消费异常-》"+e.getMessage());
                    }
                }
            });

            //让主线程休眠100秒,使消费者对象能继续存活一段时间,从而能监听到消息
            try {
                Thread.sleep(100 * 1000);
            }catch (InterruptedException e){
                System.out.println("中断异常");
            }
            //关闭资源
            session.close();
            connection.close();

        }catch (JMSException e){
            System.out.println("消费者服务异常"+e.getMessage());
        }

    }

}
复制代码

测试结果:
消费者打印信息:

消费者启动
 INFO | Successfully connected to tcp://localhost:61616
消费者消费的消息为-》测试点对点的一条消息
复制代码

生产者打印信息

生产者启动
 INFO | Successfully connected to tcp://localhost:61616
生产者生产消息-》测试点对点的一条消息
复制代码

PS:注意要电脑本地要先启动activemq服务,不要会连接失败,无法启动程序。下载地址:ActiveMQ下载

启动解压文件下bin目录下win64(本人环境是windows系统)文件下的activemq.bat脚本,运行即可。

JMS2.0

在经典API中对发消息的代码进行了简化,API的操作更易使用。

在经典API中,消息的发送上面的消费者代码已经呈现,这里不做叙述,主要看一下2.0的版本:

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Package: PACKAGE_NAME
 * @ClassName: QueueProducer
 * @Author: AZ
 * @CreateTime: 2021/8/5 10:18
 * @Description: 消息生产者 JMS 2.0
 */
public class QueueProducerJMS20 {
    //默认用户名
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    //默认密码
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    //默认连接地址
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    //创建工厂
    public static void main(String[] args) {
        System.out.println("生产者启动JMS2.0");
        //创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD, BROKER_URL);

        try{
            JMSContext context = connectionFactory.createContext();
            Queue queue = context.createQueue("activemq-quwuw-test1");
            //开始发送
            String text = "发送第一条JMS2.0消息";
            context.createProducer().send(queue, text);
            System.out.println("发送的消息为-》"+ text);
        }catch (JMSRuntimeException e){

        }
    }
}
复制代码

但是这个项目启动报错,如下:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
生产者启动JMS2.0
Exception in thread "main" java.lang.AbstractMethodError: org.apache.activemq.ActiveMQConnectionFactory.createContext()Ljavax/jms/JMSContext;
	at QueueProducerJMS20.main(QueueProducerJMS20.java:30)

Process finished with exit code 1
复制代码

网上查了一下:大概是官方这个Bug并没有关闭。网址如下:JMS2.0不支持ActiveMQ

大致介绍一下区别:
从第二段代码可以看到:编写的代码量少了。包括:

1.只需创建一个JMSContext对象,省去了创建Connection、和Session对象;

2.JMS2不需要自己关闭资源,而是在处理完逻辑后自动调用close方法,无需在代码中显示调用;

3.在JMS1.1中创建session对象需要传入参数,指定会话性质以及消息确认模式,而在JMS2.0中,这些都是默认设置的,如果希望指定其他会话模式(本地事物、CLIENT_ACKNOWLEDGE、DUPS_OK_ACKNOWLEDGE),只需要传入一个参数即可,而不是两个参数。

4.无需创建一个TextMessage对象并将其设置为指定字符串,只需要将字符串传入send方法即可。JMS提供商自动创建一个TextMessage对象并设置为所提供的字符串;

5.在JSM1.1中,几乎所有方法都会抛出JMSException,由于该异常是已检查异常,必须要捕获并抛出异常。在JMS2.0抛出的异常是JMSRuntimeException,该异常是运行时异常,所以无须调用方法来显式捕获它,也不必在其throws子句中声明。

PS:如果有解决了这个问题的,麻烦告知一波。附上我的pom文件依赖

<dependencies>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-core</artifactId>
        <version>5.7.0</version>
        <exclusions>
            <exclusion>
                <artifactId>spring-context</artifactId>
                <groupId>org.springframework</groupId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.geronimo.specs</groupId>
                <artifactId>geronimo-jms_1.1_spec</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>javax.jms</groupId>
        <artifactId>javax.jms-api</artifactId>
        <version>2.0.1</version>
    </dependency>
</dependencies>
复制代码