介绍
一个内存级别的异步事件总线服务,实现了简单的生产-消费者模式;
EventBus是Guava框架对观察者模式的一种实现,使用EventBus可以很简洁的实现事件注册监听和消费。
使用场景:
- MPUSH中各事件的发布、订阅;
- Elastic-Job任务执行和任务轨迹记录
new EventBus();
new AsyncEventBus();
- post(new xxxEvent()) 发布事件
- register(this) 注册、订阅事件
@Subscribe
@AllowConcurrentEvents - unregister(this) 取消订阅
第一步:哪个类需要订阅,首先的注册,然后订阅1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// 注册
EventBus.register(this);
// 订阅事件
void on(ConnectionCloseEvent event) {
....
}
void on(xxxxEvent event) {
....
}
第二步:发布事件1
EventBus.post(new UserOnlineEvent(message.getConnection(), message.userId));
参考:https://blog.csdn.net/fanhenghui/article/details/51459273
MPUSH源码实现
MPushClient.java 创建EventBus1
2
3
4public MPushClient() {
monitorService = new MonitorService();
EventBus.create(monitorService.getThreadPoolManager().getEventBusExecutor());
}
EventBus.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36package com.mpush.tools.event;
import com.google.common.eventbus.AsyncEventBus;
import com.mpush.api.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Executor;
/**
* Created by ohun on 2015/12/29.
*
* @author ohun@live.cn
*/
public class EventBus {
private static final Logger LOGGER = LoggerFactory.getLogger(EventBus.class);
private static com.google.common.eventbus.EventBus eventBus;
public static void create(Executor executor) {
eventBus = new AsyncEventBus(executor, (exception, context)
-> LOGGER.error("event bus subscriber ex", exception));
}
public static void post(Event event) {
eventBus.post(event);
}
public static void register(Object bean) {
eventBus.register(bean);
}
public static void unregister(Object bean) {
eventBus.unregister(bean);
}
}
EventConsumer.java 父类中注册,子类中可以订阅事件1
2
3
4
5
6package com.mpush.tools.event;
public abstract class EventConsumer {
public EventConsumer() {
EventBus.register(this);
}
}
1 | public final class LocalRouterManager extends EventConsumer implements RouterManager<LocalRouter> { |
功能组件文章目录: