Smalldok's Tech Blog

穷则变、变则通、通则达


  • 首页

  • 分类

  • 关于

  • 标签

  • 归档

  • 公益 404

  • 搜索

服务启动监听

| 分类于 源码学习 , MPush , 源码分析 , 功能组件 | 评论数: | 阅读次数:

主要用于服务的启动和停止,子类只需要继承BaseService类,然后调用其start、stop方法;

服务启动监听

启动服务:

1
2
3
mPushServer.getHttpClient().syncStart();
//或者
mPushServer.getHttpClient().start();

停止服务:

1
2
3
mPushServer.getHttpClient().syncStop();
//或者
mPushServer.getHttpClient().stop();

子类中的启动、停止方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
protected void doStart(Listener listener) throws Throwable {
//....实现相关的启动业务

//通知监听器,表示该操作已经完成
listener.onSuccess();
}
@Override
protected void doStop(Listener listener) throws Throwable {
//....实现相关的停止业务

//通知监听器,表示该操作已经完成
listener.onSuccess();
}

BaseService.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package com.mpush.api.service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Created by yxx on 2016/5/19.
*
* @author ohun@live.cn
*/
public abstract class BaseService implements Service {

protected final AtomicBoolean started = new AtomicBoolean();

@Override
public void init() {
}

@Override
public boolean isRunning() {
return started.get();
}

protected void tryStart(Listener l, FunctionEx function) {
FutureListener listener = wrap(l);
if (started.compareAndSet(false, true)) {
try {
init();
function.apply(listener);
listener.monitor(this);//主要用于异步,否则应该放置在function.apply(listener)之前
} catch (Throwable e) {
listener.onFailure(e);
throw new ServiceException(e);
}
} else {
if (throwIfStarted()) {
listener.onFailure(new ServiceException("service already started."));
} else {
listener.onSuccess();
}
}
}

protected void tryStop(Listener l, FunctionEx function) {
FutureListener listener = wrap(l);
if (started.compareAndSet(true, false)) {
try {
function.apply(listener);
listener.monitor(this);//主要用于异步,否则应该放置在function.apply(listener)之前
} catch (Throwable e) {
listener.onFailure(e);
throw new ServiceException(e);
}
} else {
if (throwIfStopped()) {
listener.onFailure(new ServiceException("service already stopped."));
} else {
listener.onSuccess();
}
}
}

public final CompletableFuture<Boolean> start() {
FutureListener listener = new FutureListener(started);
start(listener);
return listener;
}

public final CompletableFuture<Boolean> stop() {
FutureListener listener = new FutureListener(started);
stop(listener);
return listener;
}

@Override
public final boolean syncStart() {
return start().join();
}

@Override
public final boolean syncStop() {
return stop().join();
}

@Override
public void start(Listener listener) {
tryStart(listener, this::doStart);
}

@Override
public void stop(Listener listener) {
tryStop(listener, this::doStop);
}

protected void doStart(Listener listener) throws Throwable {
listener.onSuccess();
}

protected void doStop(Listener listener) throws Throwable {
listener.onSuccess();
}

/**
* 控制当服务已经启动后,重复调用start方法,是否抛出服务已经启动异常
* 默认是true
*
* @return true:抛出异常
*/
protected boolean throwIfStarted() {
return true;
}

/**
* 控制当服务已经停止后,重复调用stop方法,是否抛出服务已经停止异常
* 默认是true
*
* @return true:抛出异常
*/
protected boolean throwIfStopped() {
return true;
}

/**
* 服务启动停止,超时时间, 默认是10s
*
* @return 超时时间
*/
protected int timeoutMillis() {
return 1000 * 10;
}

protected interface FunctionEx {
void apply(Listener l) throws Throwable;
}

/**
* 防止Listener被重复执行
*
* @param l listener
* @return FutureListener
*/
public FutureListener wrap(Listener l) {
if (l == null) return new FutureListener(started);
if (l instanceof FutureListener) return (FutureListener) l;
return new FutureListener(l, started);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.mpush.api.service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class FutureListener extends CompletableFuture<Boolean> implements Listener {
private final Listener listener;
private final AtomicBoolean started;

public FutureListener(AtomicBoolean started) {
this.started = started;
this.listener = null;
}

public FutureListener(Listener listener, AtomicBoolean started) {
this.listener = listener;
this.started = started;
}

@Override
public void onSuccess(Object... args) {
if (isDone()) return;// 防止Listener被重复执行
complete(started.get());
if (listener != null) listener.onSuccess(args);
}

@Override
public void onFailure(Throwable cause) {
if (isDone()) return;// 防止Listener被重复执行
completeExceptionally(cause);
if (listener != null) listener.onFailure(cause);
throw cause instanceof ServiceException
? (ServiceException) cause
: new ServiceException(cause);
}

/**
* 防止服务长时间卡在某个地方,增加超时监控
*
* @param service 服务
*/
public void monitor(BaseService service) {
if (isDone()) return;// 防止Listener被重复执行
runAsync(() -> {
try {
this.get(service.timeoutMillis(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
this.onFailure(new ServiceException(String.format("service %s monitor timeout", service.getClass().getSimpleName())));
}
});
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException();
}

}



功能组件文章目录:

  • CC-配置中心
  • EventBus-事件总线
  • FlowControl-流控
  • JVMUtil
  • Logs
  • Profiler-统计方法或者线程执行时间
  • Profiler入门
  • SPI机制
  • TimeLine-时间线
  • 服务启动监听
  • 监控
  • 通信模型与超时控制
  • 线程池
  • 状态判断-位运算
1…454647…100
smalldok

smalldok

JAVA服务端架构,目前方向是微服务落地、基础设施、中间件、DevOps

100 日志
27 分类
17 标签
Links
  • sofa-bolt
  • sofa-rpc
  • dubbo
  • sofa-tracer
  • sofa-boot
  • Sentinel
  • COLA
  • nacos
  • xxl-job
  • apollo
  • sharding-sphere
  • fescar
  • ByteTCC
  • tcc-transaction
  • rocketmq
  • canal
  • arthas
  • jvm-sandbox
  • sofa-lookout
  • disruptor
  • mpush
© 2007 – 2019 smalldok
由 Hexo 强力驱动 v3.8.0
|
主题 – NexT.Mist v6.7.0