RocketMQ学习-设计一个简单的消息队列消息处理中心

PS:前面介绍了一下消息队列的功能及其特点,本章参考《分布式消息中间件实践-倪炜》实现一个简单的消息队列。

消息处理中心

再把这张消息队列构成图拿过来,放这里,有个中心思想:

image.png

消息队列的核心是Broker,它是数据的处理中心。所以首先从Broker的实现开始

Broker实现

/**
 * @Package: com.anzhi.simplemq
 * @ClassName: Broker
 * @Author: AZ
 * @CreateTime: 2021/8/2 9:39
 * @Description:
 */
public class Broker {
    //设置消息队列的最大容量
    private final static int MAX_SIZE = 3;

    //保存消息数据的容器-队列 ArrayBlockingQueue: 有界阻塞队列
    private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE);

    //生产消息
    public static void produce(String msg){
        if(messageQueue.offer(msg)){
            System.out.println("成功向消息处理中心投递消息: " + msg + ", 当前暂存的消息数量是: " + messageQueue.size());
        }else{
            System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");
        }
        System.out.println("====================================================");
    }
    
    //消费消息
    public static String consume() {
        String msg = messageQueue.poll();
        if(msg != null){
            //消费条件满足情况,从容器中取出一条消息
            System.out.println("已经消费消息: " + msg + ", 当前暂存的消息数量是: " + messageQueue.size());
        }else{
            System.out.println("消息处理中心内没有消息可供消费! ");
        }
        System.out.println("====================================================");
        
        return msg;
    }
}
复制代码

代码的逻辑非常简单,所以不在赘述,

BrokerServer

有了处理数据的逻辑,我们还需要提供相应的服务将该方法暴露出去,供别人使用,所以需要实现BrokerServer类来对外提供Broker服务。

/**
 * @Package: com.anzhi.simplemq
 * @ClassName: BrokerServer
 * @Author: AZ
 * @CreateTime: 2021/8/2 10:53
 * @Description:
 */
public class BrokerServer implements Runnable{
    public static int SERVICE_PORT = 9999;

    private final Socket socket;

    public BrokerServer(Socket socket){
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            PrintWriter out = new PrintWriter(socket.getOutputStream());
            while (true) {
                String str = in.readLine();
                if (str == null) {
                    continue;
                }
                System.out.println("接收到原始数据: " + str);

                if (str.equals("CONSUME")) {   //CONSUME表示要消费一条消息
                    //从消息队列中消费一条消息
                    String message = Broker.consume();
                    out.println(message);
                    out.flush();
                } else {
                    //其他情况都表示生产消息放到消息队列中
                    Broker.produce(str);
                }
            }
        }catch (Exception e){
            System.out.println("消息队列服务异常" + e.getMessage());
        }
    }

    public static void main(String[] args) {
        try {
            ServerSocket server = new ServerSocket(SERVICE_PORT);
            while (true) {
                BrokerServer brokerServerv = new BrokerServer(server.accept());
                new Thread(brokerServerv).start();
            }
        }catch (Exception e){
            System.out.println("调用消息服务异常: " + e.getMessage());
        }
    }
}
复制代码

这里采用Socket提供服务,没有使用框架。通过实现Runnable接口,启动线程,使用死循环,让程序不退出。启动main方法后,一个消息队列中心服务就启动了

客户端Consumer, Produce

服务端有了,自然需要与之相应的客户端进行发送和接收消息。

/**
 * @Package: com.anzhi.SimpleMQTest
 * @ClassName: ProduceClinet
 * @Author: AZ
 * @CreateTime: 2021/8/2 14:22
 * @Description: 生产消息
 */
public class ProduceClinet {
    public static void main(String[] args) throws Exception{
        MqClient.produce("Hello MQ !");
    }
}
复制代码

下面是执行结果, 这里遇到的问题就是一直报错:Connection reset。这是因为客户端发送完消息就断开连接了,而服务端正在读写,导致异常。可以去看看TCP的 “三四” 次。
这是执行了多次,队列已经满了。
官网也有对这个错误有说明:Connection reset异常解释

Connected to the target VM, address: '127.0.0.1:9237', transport: 'socket'
接收到原始数据: Hello MQ !
成功向消息处理中心投递消息: Hello MQ !, 当前暂存的消息数量是: 1
====================================================
消息队列服务异常Connection reset
接收到原始数据: Hello MQ !
成功向消息处理中心投递消息: Hello MQ !, 当前暂存的消息数量是: 2
====================================================
消息队列服务异常Connection reset
接收到原始数据: Hello MQ !
成功向消息处理中心投递消息: Hello MQ !, 当前暂存的消息数量是: 3
====================================================
消息队列服务异常Connection reset
接收到原始数据: Hello MQ !
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
====================================================
接收到原始数据: Hello MQ !
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
====================================================
接收到原始数据: Hello MQ !
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
====================================================
消息队列服务异常Connection reset
接收到原始数据: Hello MQ !
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
====================================================

复制代码

接着我们开始消费队列中的消息

/**
 * @Package: com.anzhi.SimpleMQTest
 * @ClassName: ConsumerClient
 * @Author: AZ
 * @CreateTime: 2021/8/2 14:58
 * @Description:
 */
public class ConsumerClient {
    public static void main(String[] args) {
        MqClient.consume();
        while (true){}
    }
}
复制代码

消费结果如下

接收到原始数据: CONSUME
已经消费消息: Hello MQ !, 当前暂存的消息数量是: 2
====================================================
接收到原始数据: CONSUME
已经消费消息: Hello MQ !, 当前暂存的消息数量是: 1
====================================================
接收到原始数据: CONSUME
已经消费消息: Hello MQ !, 当前暂存的消息数量是: 0
====================================================
接收到原始数据: CONSUME
消息处理中心内没有消息可供消费! 
====================================================
复制代码

至此一个简单的消息队列实现完成。(PS: 没有考虑其他太复杂的问题,以及业务场景)

小结:上述代码执行顺序大致如下:

image.png