事件驱动,我是这样理解的!

一、前言

昨天我写了第一篇Netty基础的文章,如果看过这篇文章的同学,可能会发现,当我写到事件驱动的时候,我写了这么一个**“占位符”**

我今天趁着加班,悄悄的学习了事件驱动的内容,由于从事金融工作,每一天都很忙,只有晚上的时间才可以抽出一小会的时间学习,废话不说,直接开始内容。

二、什么是事件驱动?

度娘的解释

事件驱动程序的基本结构是由一个事件收集器、一个事件发送器和一个事件处理器组成。事件收集器专门负责收集所有事件,包括来自用户的(如鼠标、键盘事件等)、来自硬件的(如时钟事件等)和来自软件的(如操作系统、应用程序本身等)。事件发送器负责将收集器收集到的事件分发到目标对象中。事件处理器做具体的事件响应工作

根据度娘的解释,我是这样认为的事件驱动主要分为以下的内容:

  • 事件源:负责产生事件的对象,比如Button,点击Button,就会产生点击这个事件
  • 事件收集器:或者称为事件对象,是事件源和事件监听器之间的信息桥梁,是整个事件模型驱动的核心
  • 事件监听器(事件处理器):负责处理事件的对象

举个例子:因为我从从事金融行业,我就用银行来举一个例子

  • 事件收集器:叫号机
  • 事件源:叫号器
  • 事件监听器:客户

早晨开完晨会开始营业,客户来到叫号器取号(也就是将事件监听器注册到事件收集器中),柜员点击叫号器(发布事件),叫号机开始呼叫**“请001号客户到5号柜台办理业务”**,这个时候柜员和客户进行绑定(也就是事件源与事件监听器绑定)

三、简单的例子

实现思路

  • 首先,需要实现事件监听器、事件收集器,将事件监听器注册到事件收集器
  • 事件发送器发送事件到事件中心,事件中心去查找与处理该事件的监听器
事件发送器  -----(2)----->事件收集器<--------(1)-----事件监听器
复制代码

3.1 实现事件收集器

解释

  • 1.ConcurrentHashMap用来存储事件源与事件监听器的绑定关系
private final ConcurrentHashMap<Class<?>, List<EventListener>> subscribers = new ConcurrentHashMap<>();
复制代码

第一个参数为数据源的Class对象,第二个为监听器

  • 2.Executor
灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相当于消费者,并用Runnable来表示任务,
复制代码
  • 3.EventCenter

构造方法,用于实例化Executor对象

public EventCenter(Executor executor) {
    this.executor = executor;
}
复制代码
  • 4.registry

主要用于实现事件源和事件监听器(事件处理器)之间的绑定管理,第一个参数为事件源的Class对象,第二个为监听器的对象

/**
 * 将事件和事件监听器绑定
 * @param clazz
 * @param eventListener
 */
public void registry(Class clazz,EventListener eventListener){
    //通过ConcurrentHashMap查询监听器是否注册
    List<EventListener> eventListeners = subscribers.get(clazz);
    //如果监听器未注册,初始化List
    if(eventListener == null){
        eventListeners = new ArrayList<>();
    }
    //将监听器添加到List中
    eventListeners.add(eventListener);
    //事件和事件监听器绑定
    subscribers.put(clazz,eventListeners);
}
复制代码
  • 5.send

用于发布事件使用,其中BanderEvent需要自定义实现,我们会在后面的地方讲解如果实现

/**
 * 事件发送
 * @param bankerEvent
 */
public void send(BankerEvent bankerEvent) throws EventException {
    //查询监听器是否注册
    List<EventListener> eventListeners = subscribers.get(bankerEvent.getClass());
    if(eventListeners == null || eventListeners.size() == 0){
        //如果未注册抛出异常
        throw new EventException((short) 500,"事件监听器未注册");
    }
    //循环遍历监听器,并且发布事件
    for(EventListener eventListener : eventListeners){
        executor.execute(()->eventListener.trigger(bankerEvent));
    }
}
复制代码

完整的结构

**
 * 事件收集器/注册中心
 */
public class EventCenter {

    /**
     * 用于绑定事件和事件监听器
     */
    private final ConcurrentHashMap<Class<?>, List<EventListener>> subscribers = new ConcurrentHashMap<>();

   //任务处理
    private final Executor executor;

    public EventCenter(Executor executor) {
        this.executor = executor;
    }

    /**
     * 将事件和事件监听器绑定
     * @param clazz
     * @param eventListener
     */
    public void registry(Class clazz,EventListener eventListener){
        //通过ConcurrentHashMap查询监听器是否注册
        List<EventListener> eventListeners = subscribers.get(clazz);
        //如果监听器未注册,初始化List
        if(eventListener == null){
            eventListeners = new ArrayList<>();
        }
        //将监听器添加到List中
        eventListeners.add(eventListener);
        //事件和事件监听器绑定
        subscribers.put(clazz,eventListeners);
    }

    /**
     * 事件发送
     * @param bankerEvent
     */
    public void send(BankerEvent bankerEvent) throws EventException {
        //查询监听器是否注册
        List<EventListener> eventListeners = subscribers.get(bankerEvent.getClass());
        if(eventListeners == null || eventListeners.size() == 0){
            //如果未注册抛出异常
            throw new EventException((short) 500,"事件监听器未注册");
        }
        //循环遍历监听器,并且发布事件
        for(EventListener eventListener : eventListeners){
            executor.execute(()->eventListener.trigger(bankerEvent));
        }
    }

}
复制代码

3.2 实现事件监听器

解释

  • 1.ConsumerListener主要实现了EventListener接口
  • 2.trigger方法,是为了触发事件时调用
/**
 * 事件触发时响应
 * @param bankerEvent
 */
@Override
public void trigger(BankerEvent bankerEvent) {
    Integer number = bankerEvent.getNumber();
    log.info("请{}号客户到5号窗口办理业务",number);
}
复制代码
  • 3.registry方法是用于将事件和事件监听器注册到注册中心,并且进行绑定
@Autowired
private EventCenter eventCenter;

/**
 * 注册到事件收集中心,也相当于确定需要订阅的事件
 */
@PostConstruct
public void registry(){
    eventCenter.registry(BankerCreateEvent.class,this);
}
复制代码

定义事件监听器,首先我们需要定义一个接口

/**
 * 事件监听器接口
 */
public interface EventListener {
    /**
     * 事件触发时调用
     * @param bankerEvent
     */
    public void trigger(BankerEvent bankerEvent);
}
复制代码

具体操作

实现类,并且注册到EvnetCenter

/**
 * 客户监听器
 */
@Component
@Slf4j
public class ConsumerListener implements EventListener {

    @Autowired
    private EventCenter eventCenter;

    /**
     * 注册到事件收集中心,也相当于确定需要订阅的事件
     */
    @PostConstruct
    public void registry(){
        eventCenter.registry(BankerCreateEvent.class,this);
    }

    /**
     * 事件触发时响应
     * @param bankerEvent
     */
    @Override
    public void trigger(BankerEvent bankerEvent) {
        Integer number = bankerEvent.getNumber();
        log.info("请{}号客户到5号窗口办理业务",number);
    }

}
复制代码

3.3 实现事件源

创建接口

/**
 * 柜员事件
 */
public interface BankerEvent {
    public Integer getNumber();
}
复制代码

实现类

public class BankerCreateEvent implements BankerEvent {
    Integer number = 100;

    @Override
    public Integer getNumber() {
        return number;
    }
}
复制代码

3.4 实现事件发布者

@Component
public class EventSender {
    @Autowired
    private EventCenter eventCenter;

    public void send(BankerEvent bankerEvent){
        //发送事件
        eventCenter.send(bankerEvent);
    }
}
复制代码

3.5 事件注册中心配置类

@Configuration
public class EventCenterConfig {

    @Bean
    public EventCenter eventCenter(){
        ThreadFactory namedThreadFactory = Executors.defaultThreadFactory();
        int corePoolSize = Runtime.getRuntime().availableProcessors();
        int maximumPoolSize = corePoolSize * 2;
        // 创建一个线程池
        Executor pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 10L,
                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
                namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
        return new EventCenter(pool);
    }

}
复制代码

3.6 事件调用

@Slf4j
@SpringBootApplication
public class LinkedApplication {

    public static void main(String[] args) throws InterruptedException {
        ApplicationContext applicationContext = SpringApplication.run(LinkedApplication.class, args);
        //获取EventSender上下文
        EventSender eventSender = applicationContext.getBean(EventSender.class);
        while(true){
            long orderId = ThreadLocalRandom.current().nextLong();
            //发送事件
            eventSender.send(new BankerCreateEvent());
            Thread.sleep(ThreadLocalRandom.current().nextLong(1000, 10000));
        }
    }

}
复制代码

2020-12-24 00:55:22.335 INFO 5315 --- [pool-1-thread-1] c.y.linked.listener.ConsumerListener : 请100号客户到5号窗口办理业务

github 地址

github.com/13150702172…

今天就到此结束吧!因为明天还要继续上班,晚安!🌛