PS:前面介绍了一下消息队列的功能及其特点,本章参考《分布式消息中间件实践-倪炜》实现一个简单的消息队列。
消息处理中心
再把这张消息队列构成图拿过来,放这里,有个中心思想:
消息队列的核心是Broker,它是数据的处理中心。所以首先从Broker的实现开始
Broker实现
/**
* @Package: com.anzhi.simplemq
* @ClassName: Broker
* @Author: AZ
* @CreateTime: 2021/8/2 9:39
* @Description:
*/
public class Broker {
//设置消息队列的最大容量
private final static int MAX_SIZE = 3;
//保存消息数据的容器-队列 ArrayBlockingQueue: 有界阻塞队列
private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE);
//生产消息
public static void produce(String msg){
if(messageQueue.offer(msg)){
System.out.println("成功向消息处理中心投递消息: " + msg + ", 当前暂存的消息数量是: " + messageQueue.size());
}else{
System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");
}
System.out.println("====================================================");
}
//消费消息
public static String consume() {
String msg = messageQueue.poll();
if(msg != null){
//消费条件满足情况,从容器中取出一条消息
System.out.println("已经消费消息: " + msg + ", 当前暂存的消息数量是: " + messageQueue.size());
}else{
System.out.println("消息处理中心内没有消息可供消费! ");
}
System.out.println("====================================================");
return msg;
}
}
复制代码
代码的逻辑非常简单,所以不在赘述,
BrokerServer
有了处理数据的逻辑,我们还需要提供相应的服务将该方法暴露出去,供别人使用,所以需要实现BrokerServer类来对外提供Broker服务。
/**
* @Package: com.anzhi.simplemq
* @ClassName: BrokerServer
* @Author: AZ
* @CreateTime: 2021/8/2 10:53
* @Description:
*/
public class BrokerServer implements Runnable{
public static int SERVICE_PORT = 9999;
private final Socket socket;
public BrokerServer(Socket socket){
this.socket = socket;
}
@Override
public void run() {
try {
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream());
while (true) {
String str = in.readLine();
if (str == null) {
continue;
}
System.out.println("接收到原始数据: " + str);
if (str.equals("CONSUME")) { //CONSUME表示要消费一条消息
//从消息队列中消费一条消息
String message = Broker.consume();
out.println(message);
out.flush();
} else {
//其他情况都表示生产消息放到消息队列中
Broker.produce(str);
}
}
}catch (Exception e){
System.out.println("消息队列服务异常" + e.getMessage());
}
}
public static void main(String[] args) {
try {
ServerSocket server = new ServerSocket(SERVICE_PORT);
while (true) {
BrokerServer brokerServerv = new BrokerServer(server.accept());
new Thread(brokerServerv).start();
}
}catch (Exception e){
System.out.println("调用消息服务异常: " + e.getMessage());
}
}
}
复制代码
这里采用Socket提供服务,没有使用框架。通过实现Runnable接口,启动线程,使用死循环,让程序不退出。启动main方法后,一个消息队列中心服务就启动了
客户端Consumer, Produce
服务端有了,自然需要与之相应的客户端进行发送和接收消息。
/**
* @Package: com.anzhi.SimpleMQTest
* @ClassName: ProduceClinet
* @Author: AZ
* @CreateTime: 2021/8/2 14:22
* @Description: 生产消息
*/
public class ProduceClinet {
public static void main(String[] args) throws Exception{
MqClient.produce("Hello MQ !");
}
}
复制代码
下面是执行结果, 这里遇到的问题就是一直报错:Connection reset。这是因为客户端发送完消息就断开连接了,而服务端正在读写,导致异常。可以去看看TCP的 “三四” 次。
这是执行了多次,队列已经满了。
官网也有对这个错误有说明:Connection reset异常解释
Connected to the target VM, address: '127.0.0.1:9237', transport: 'socket'
接收到原始数据: Hello MQ !
成功向消息处理中心投递消息: Hello MQ !, 当前暂存的消息数量是: 1
====================================================
消息队列服务异常Connection reset
接收到原始数据: Hello MQ !
成功向消息处理中心投递消息: Hello MQ !, 当前暂存的消息数量是: 2
====================================================
消息队列服务异常Connection reset
接收到原始数据: Hello MQ !
成功向消息处理中心投递消息: Hello MQ !, 当前暂存的消息数量是: 3
====================================================
消息队列服务异常Connection reset
接收到原始数据: Hello MQ !
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
====================================================
接收到原始数据: Hello MQ !
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
====================================================
接收到原始数据: Hello MQ !
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
====================================================
消息队列服务异常Connection reset
接收到原始数据: Hello MQ !
消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
====================================================
复制代码
接着我们开始消费队列中的消息
/**
* @Package: com.anzhi.SimpleMQTest
* @ClassName: ConsumerClient
* @Author: AZ
* @CreateTime: 2021/8/2 14:58
* @Description:
*/
public class ConsumerClient {
public static void main(String[] args) {
MqClient.consume();
while (true){}
}
}
复制代码
消费结果如下
接收到原始数据: CONSUME
已经消费消息: Hello MQ !, 当前暂存的消息数量是: 2
====================================================
接收到原始数据: CONSUME
已经消费消息: Hello MQ !, 当前暂存的消息数量是: 1
====================================================
接收到原始数据: CONSUME
已经消费消息: Hello MQ !, 当前暂存的消息数量是: 0
====================================================
接收到原始数据: CONSUME
消息处理中心内没有消息可供消费!
====================================================
复制代码
至此一个简单的消息队列实现完成。(PS: 没有考虑其他太复杂的问题,以及业务场景)
小结:上述代码执行顺序大致如下:




近期评论