观察者模式结合Springboot

前言

在学习设计模式之美时,作者提出一个问题 我们现在要实现用户注册功能,用户注册成功后,需要给他发送一个体验金,但是,有时间发送体验金又可能会发生改变。比如说:改成发送短信 + 优惠卷;为了解决这个问题,就设计了观察者模式。

public interface RegObserver {
    void handleRegSuccess(long userId);
}

public class RegNotificationObserver implements RegObserver {
    private NotificationService notificationService;
    @Override
    public void handleRegSuccess(long userId) {
        notificationService.sendInboxMessage(userId, "Welcome ...");
    }
}

public class RegPromotionObserver implements RegObserver{
    private PromotionService promotionService;
    @Override
    public void handleRegSuccess(long userId) {
        promotionService.issuNewUserExperienceCash(userId);
    }
}

public class UserController {
    private UserService userService;
    private List<RegObserver> regObservers = new ArrayList<>();

    // 一次性设置好,之后也不可动态的修改
    public void setRegObservers (List<RegObserver> observers) {
        regObservers.addAll(observers);
    }

    public Long register(String telephone, String password) {
        long userId = userService.register(telephone, password);

        for (RegObserver observer : regObservers) {
            observer.handleRegSuccess(userId);
        }
        return userId;
    }
}
复制代码

同时,有一些接口对时间敏感,需要设计为异步方式。代码如下:

// 第一种实现方式
public class RegPromotionObserver implements RegObserver {
	private PromotionServer promotionService; // 依赖注入
    @Override 
    public void handleRegSuccess(long userId) {
        new Thread(new Runnable() {
        	@Override
            public void run() {
            	promotionService.issueNewUserExperienceCash(userId);
            }
        }).start();
    }
}

// 第二种实现方式,其他类代码不变,就没有再重复罗列
public class UserController {
	private UserService userService; // 依赖注入
    private List<RegObserver> regObservers = new ArrayList<>();
    private Executor executor;
    
    public UserController (Executor executor) {
    	this.executor = executor;
    }
    
	public void setRegObservers(List<RegObserver> observers) {
    	regObservers.addAll(observers);
    }	
	
    public Long register(String telephone, String password) {
    	long userId = userService.register(telephone, password);
        for (RegObserver observer : regObservers) {
        	executor.execute( new Runnable() {
            	@Override
                public void run() {
                	observer.handleRegSuccess(userId);
                }
            });
        }
    
    }
}
复制代码

对于第一种实现方式,频繁地创建和销毁线程比较耗时,并且并发线程数无法控制,创建过多的线程会导致堆栈溢出。第二种实现方式,尽管利用了线程池解决了第一种实现方式的问题,但线程池、异步执行逻辑耦合在了register() 函数中,增加了这部分业务代码的维护成本。除此之外,如果在项目中,不止一个业务模块需要用到异步非阻塞观察者模式,那这样的代码实现也无法做到复用。

为了解决上面的问题,作者使用了谷歌开源EventBus方式。但是,在Java开发中,我们主要是使用spring boot框架,其实在spring boot框架中就有对应的事件发布和监听机制,来帮助我们解决这些问题。针对以上场景,我们也来实现一下:

2. Spring boot 实现

  1. controller 类
@RestController
@RequestMapping("/reg")
@Slf4j
public class DemoController {

    @Autowired
    DemoService service;

    @GetMapping
    public String reg(String username,String password) {
        service.register(username, password);
        return "注册用户!";
    }
}
复制代码
  1. service类 在里面注入事件发布类
@Component
public class DemoService {
    /**
     * 注入 事件发布类
     */
    @Autowired
    ApplicationEventPublisher eventPublisher;

    public void register(String userName, String password) {
        System.out.println("注册用户");
        // 执行成功后,调用监听器 就是执行对应的观察者
        eventPublisher.publishEvent(new SuccessEvent(userName, password));
    }
}
复制代码
  1. 实现事件源 SuccessEvent

Spring中,事件源不强迫继承ApplicationEvent接口的,也就是可以直接发布任意一个对象类。但内部其实是使用PayloadApplicationEvent类进行包装了一层。

@Data
@NoArgsConstructor
@AllArgsConstructor
public class SuccessEvent {
    private String username;
    private String password;
}
复制代码
  1. 实现处理逻辑,也就是各Observer要处理的逻辑
@Configuration
@Slf4j
public class EventListenerConfig {

    @EventListener
    public void handleEvent(Object event) {
        //监听所有事件 可以看看 系统各类时间 发布了哪些事件
        //可根据 instanceof 监听想要监听的事件
//        if(event instanceof CustomEvent) {
//
//        }
       // log.info("事件:{}", event);
    }

    @EventListener
    public void handleSuccessEvent(SuccessEvent successEvent) throws Exception{

        long start = System.currentTimeMillis();
        log.info("start:" + start);
        Thread.sleep(1000);
        System.out.println("end:" + System.currentTimeMillis());
    }

    @EventListener
    public void handleSuccessEvent2(SuccessEvent successEvent) throws Exception{
        long start = System.currentTimeMillis();
        log.info("start2:" + start);
        Thread.sleep(1000);
        log.info("end:" + System.currentTimeMillis());
    }


    /**
     * 监听 密码为123456的事件
     */
    @EventListener(condition="#successEvent.password == '123456'")
    public void handleCustomEventByCondition(SuccessEvent successEvent) {
        //监听 CustomEvent事件
        String password = successEvent.getPassword();
        log.info("password {}" , password);
    }
    
}
复制代码

这里有两个注意点:第一个注意点的话,@EventListener 是同步的,也就是说,第一个handleEvent处理完毕后,才会处理到下面的EventLister (测试结果也是符合的)各个EventLister是按照顺序判断执行的。 第二个注意点的话,这里是支持condition来进行判定的。

这时候,如果注册逻辑中将优惠卷改成了别的话,就可以在这里面进行更改;增添也可以在这里进行增添;

以上,实现了同步观察者模式的实现,但是,如果是同步的观察者模式其实实现本身就简单的,那么,如果,需要对接口进行加速,异步的观察者模式应该怎么实现呢?

  1. 在启动类上加入@EnableAsync 使异步调用@Async 注解生效
  2. 在要开启异步调用的方法中 加入@Async注解
// 1. 启动类上加入@EnableAsync 注解
@EnableAsync
@SpringBootApplication
public class DesignModdenDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DesignModdenDemoApplication.class, args);
    }
}
// 2.在要调用的方法上执行对应的@Async异步调用
    @Async()
    @EventListener
    public void handleSuccessEvent(SuccessEvent successEvent) throws Exception
复制代码

这时候,进行测试:

从结果的角度,确实是进行异步调用了,但是它的的线程名为SimpleAsyncTaskExecutor-1

由于默认情况下,未设置TaskExecutor 默认使用的是SimpleAsyncTaskExecutor,这个线程池,但是这个线程池不是真正意义上的线程池,线程不重用,每次调用都会创建新的一个线程,所以我们需要对线程池进行设置;

为了测试方便,我创建核心线程数为1的线程来进行实验

@Configuration
public class Config {
    /**
     * 配置线程池
     */
    @Bean("asyncPoolTaskExecutor")
    public ThreadPoolExecutor getAsyncThreadPoolTaskExecutor() {
        return new ThreadPoolExecutor(1, 20, 20, TimeUnit.SECONDS,new ArrayBlockingQueue<>(20));
    }
}
复制代码

然后,在异步调用的时候传入线程池

    @Async("asyncPoolTaskExecutor")
    @EventListener
    public void handleSuccessEvent(SuccessEvent successEvent) throws Exception
复制代码

结果如下:

很显然,线程复用已经实现了;但是,第二个handleSuccessEvent 却没有异步执行;这里原因是,只有一个线程的话,是无法同时执行两个handlerSuccessEvent的,(另一个在排队) 我增加核心线程数,增加到20个时,就会有两个线程去执行这两个handlerSuccessEvent方法了。

这时候也实现了如下功能

  1. 异步调用
  2. 线程的循环利用
  3. 代码复用

成功解决了之前提到的两个问题。

  1. 线程池和register()耦合
  2. 无法复用异步非阻塞观察者