一、前言
昨天我写了第一篇Netty基础的文章,如果看过这篇文章的同学,可能会发现,当我写到事件驱动的时候,我写了这么一个**“占位符”**
我今天趁着加班,悄悄的学习了事件驱动的内容,由于从事金融工作,每一天都很忙,只有晚上的时间才可以抽出一小会的时间学习,废话不说,直接开始内容。
二、什么是事件驱动?
度娘的解释
事件驱动程序的基本结构是由一个事件收集器、一个事件发送器和一个事件处理器组成。事件收集器专门负责收集所有事件,包括来自用户的(如鼠标、键盘事件等)、来自硬件的(如时钟事件等)和来自软件的(如操作系统、应用程序本身等)。事件发送器负责将收集器收集到的事件分发到目标对象中。事件处理器做具体的事件响应工作
根据度娘的解释,我是这样认为的事件驱动主要分为以下的内容:
- 事件源:负责产生事件的对象,比如Button,点击Button,就会产生点击这个事件
- 事件收集器:或者称为事件对象,是事件源和事件监听器之间的信息桥梁,是整个事件模型驱动的核心
- 事件监听器(事件处理器):负责处理事件的对象
举个例子:因为我从从事金融行业,我就用银行来举一个例子
- 事件收集器:叫号机
- 事件源:叫号器
- 事件监听器:客户
早晨开完晨会开始营业,客户来到叫号器取号(也就是将事件监听器注册到事件收集器中),柜员点击叫号器(发布事件),叫号机开始呼叫**“请001号客户到5号柜台办理业务”**,这个时候柜员和客户进行绑定(也就是事件源与事件监听器绑定)
三、简单的例子
实现思路
- 首先,需要实现事件监听器、事件收集器,将事件监听器注册到事件收集器
- 事件发送器发送事件到事件中心,事件中心去查找与处理该事件的监听器
事件发送器 -----(2)----->事件收集器<--------(1)-----事件监听器
复制代码
3.1 实现事件收集器
解释
- 1.ConcurrentHashMap用来存储事件源与事件监听器的绑定关系
private final ConcurrentHashMap<Class<?>, List<EventListener>> subscribers = new ConcurrentHashMap<>();
复制代码
第一个参数为数据源的Class对象,第二个为监听器
- 2.Executor
灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相当于消费者,并用Runnable来表示任务,
复制代码
- 3.EventCenter
构造方法,用于实例化Executor对象
public EventCenter(Executor executor) {
this.executor = executor;
}
复制代码
- 4.registry
主要用于实现事件源和事件监听器(事件处理器)之间的绑定管理,第一个参数为事件源的Class对象,第二个为监听器的对象
/**
* 将事件和事件监听器绑定
* @param clazz
* @param eventListener
*/
public void registry(Class clazz,EventListener eventListener){
//通过ConcurrentHashMap查询监听器是否注册
List<EventListener> eventListeners = subscribers.get(clazz);
//如果监听器未注册,初始化List
if(eventListener == null){
eventListeners = new ArrayList<>();
}
//将监听器添加到List中
eventListeners.add(eventListener);
//事件和事件监听器绑定
subscribers.put(clazz,eventListeners);
}
复制代码
- 5.send
用于发布事件使用,其中BanderEvent需要自定义实现,我们会在后面的地方讲解如果实现
/**
* 事件发送
* @param bankerEvent
*/
public void send(BankerEvent bankerEvent) throws EventException {
//查询监听器是否注册
List<EventListener> eventListeners = subscribers.get(bankerEvent.getClass());
if(eventListeners == null || eventListeners.size() == 0){
//如果未注册抛出异常
throw new EventException((short) 500,"事件监听器未注册");
}
//循环遍历监听器,并且发布事件
for(EventListener eventListener : eventListeners){
executor.execute(()->eventListener.trigger(bankerEvent));
}
}
复制代码
完整的结构
**
* 事件收集器/注册中心
*/
public class EventCenter {
/**
* 用于绑定事件和事件监听器
*/
private final ConcurrentHashMap<Class<?>, List<EventListener>> subscribers = new ConcurrentHashMap<>();
//任务处理
private final Executor executor;
public EventCenter(Executor executor) {
this.executor = executor;
}
/**
* 将事件和事件监听器绑定
* @param clazz
* @param eventListener
*/
public void registry(Class clazz,EventListener eventListener){
//通过ConcurrentHashMap查询监听器是否注册
List<EventListener> eventListeners = subscribers.get(clazz);
//如果监听器未注册,初始化List
if(eventListener == null){
eventListeners = new ArrayList<>();
}
//将监听器添加到List中
eventListeners.add(eventListener);
//事件和事件监听器绑定
subscribers.put(clazz,eventListeners);
}
/**
* 事件发送
* @param bankerEvent
*/
public void send(BankerEvent bankerEvent) throws EventException {
//查询监听器是否注册
List<EventListener> eventListeners = subscribers.get(bankerEvent.getClass());
if(eventListeners == null || eventListeners.size() == 0){
//如果未注册抛出异常
throw new EventException((short) 500,"事件监听器未注册");
}
//循环遍历监听器,并且发布事件
for(EventListener eventListener : eventListeners){
executor.execute(()->eventListener.trigger(bankerEvent));
}
}
}
复制代码
3.2 实现事件监听器
解释
- 1.ConsumerListener主要实现了EventListener接口
- 2.trigger方法,是为了触发事件时调用
/**
* 事件触发时响应
* @param bankerEvent
*/
@Override
public void trigger(BankerEvent bankerEvent) {
Integer number = bankerEvent.getNumber();
log.info("请{}号客户到5号窗口办理业务",number);
}
复制代码
- 3.registry方法是用于将事件和事件监听器注册到注册中心,并且进行绑定
@Autowired
private EventCenter eventCenter;
/**
* 注册到事件收集中心,也相当于确定需要订阅的事件
*/
@PostConstruct
public void registry(){
eventCenter.registry(BankerCreateEvent.class,this);
}
复制代码
定义事件监听器,首先我们需要定义一个接口
/**
* 事件监听器接口
*/
public interface EventListener {
/**
* 事件触发时调用
* @param bankerEvent
*/
public void trigger(BankerEvent bankerEvent);
}
复制代码
具体操作
实现类,并且注册到EvnetCenter
/**
* 客户监听器
*/
@Component
@Slf4j
public class ConsumerListener implements EventListener {
@Autowired
private EventCenter eventCenter;
/**
* 注册到事件收集中心,也相当于确定需要订阅的事件
*/
@PostConstruct
public void registry(){
eventCenter.registry(BankerCreateEvent.class,this);
}
/**
* 事件触发时响应
* @param bankerEvent
*/
@Override
public void trigger(BankerEvent bankerEvent) {
Integer number = bankerEvent.getNumber();
log.info("请{}号客户到5号窗口办理业务",number);
}
}
复制代码
3.3 实现事件源
创建接口
/**
* 柜员事件
*/
public interface BankerEvent {
public Integer getNumber();
}
复制代码
实现类
public class BankerCreateEvent implements BankerEvent {
Integer number = 100;
@Override
public Integer getNumber() {
return number;
}
}
复制代码
3.4 实现事件发布者
@Component
public class EventSender {
@Autowired
private EventCenter eventCenter;
public void send(BankerEvent bankerEvent){
//发送事件
eventCenter.send(bankerEvent);
}
}
复制代码
3.5 事件注册中心配置类
@Configuration
public class EventCenterConfig {
@Bean
public EventCenter eventCenter(){
ThreadFactory namedThreadFactory = Executors.defaultThreadFactory();
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maximumPoolSize = corePoolSize * 2;
// 创建一个线程池
Executor pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 10L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
return new EventCenter(pool);
}
}
复制代码
3.6 事件调用
@Slf4j
@SpringBootApplication
public class LinkedApplication {
public static void main(String[] args) throws InterruptedException {
ApplicationContext applicationContext = SpringApplication.run(LinkedApplication.class, args);
//获取EventSender上下文
EventSender eventSender = applicationContext.getBean(EventSender.class);
while(true){
long orderId = ThreadLocalRandom.current().nextLong();
//发送事件
eventSender.send(new BankerCreateEvent());
Thread.sleep(ThreadLocalRandom.current().nextLong(1000, 10000));
}
}
}
复制代码
2020-12-24 00:55:22.335 INFO 5315 --- [pool-1-thread-1] c.y.linked.listener.ConsumerListener : 请100号客户到5号窗口办理业务
github 地址
今天就到此结束吧!因为明天还要继续上班,晚安!🌛




近期评论