- 线程池使用
- 线程池监控
线程池监控
监控总收集器
ResultCollector.java 监控信息收集器1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55package com.mpush.monitor.data;
import com.mpush.monitor.quota.impl.*;
import com.mpush.monitor.service.ThreadPoolManager;
/**
* Created by yxx on 2016/5/19.
*
* @author ohun@live.cn
*/
public class ResultCollector {
private final JVMInfo jvmInfo;
private final JVMGC jvmgc;
private final JVMMemory jvmMemory;
private final JVMThread jvmThread;
private final JVMThreadPool jvmThreadPool;
public ResultCollector(ThreadPoolManager threadPoolManager) {
this.jvmInfo = new JVMInfo();
this.jvmgc = new JVMGC();
this.jvmMemory = new JVMMemory();
this.jvmThread = new JVMThread();
this.jvmThreadPool = new JVMThreadPool(threadPoolManager);
}
public MonitorResult collect() {
MonitorResult result = new MonitorResult();
result.addResult("jvm-info", jvmInfo.monitor());
result.addResult("jvm-gc", jvmgc.monitor());
result.addResult("jvm-memory", jvmMemory.monitor());
result.addResult("jvm-thread", jvmThread.monitor());
result.addResult("jvm-thread-pool", jvmThreadPool.monitor());
return result;
}
public JVMInfo getJvmInfo() {
return jvmInfo;
}
public JVMGC getJvmgc() {
return jvmgc;
}
public JVMMemory getJvmMemory() {
return jvmMemory;
}
public JVMThread getJvmThread() {
return jvmThread;
}
public JVMThreadPool getJvmThreadPool() {
return jvmThreadPool;
}
}
线程池信息收集
获取各个线程池的信息,如corePoolSize、maxPoolSize、activeCount(workingThread)、poolSize(workThread)、queueSize(blockedTask)
组装为map.put(“event-bus”,poolInfo);
JVMThreadPool.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37package com.mpush.monitor.quota.impl;
import com.mpush.monitor.quota.ThreadPoolQuota;
import com.mpush.monitor.service.ThreadPoolManager;
import io.netty.channel.EventLoopGroup;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import static com.mpush.tools.Utils.getPoolInfo;
public class JVMThreadPool implements ThreadPoolQuota {
private final ThreadPoolManager threadPoolManager;
public JVMThreadPool(ThreadPoolManager threadPoolManager) {
this.threadPoolManager = threadPoolManager;
}
public Object monitor(Object... args) {
Map<String, Object> result = new HashMap<>();
Map<String, Executor> pools = threadPoolManager.getActivePools();
for (Map.Entry<String, Executor> entry : pools.entrySet()) {
String serviceName = entry.getKey();
Executor executor = entry.getValue();
if (executor instanceof ThreadPoolExecutor) {
result.put(serviceName, getPoolInfo((ThreadPoolExecutor) executor));
} else if (executor instanceof EventLoopGroup) {
result.put(serviceName, getPoolInfo((EventLoopGroup) executor));
}
}
return result;
}
}
线程池使用
ThreadPoolManager.java 线程池管理者,有5种线程池,包括MQ/event-bus/push-client-timer/push-task-timer/ack-timer1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57package com.mpush.monitor.service;
import com.mpush.api.spi.common.ExecutorFactory;
import com.mpush.tools.thread.NamedThreadFactory;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ThreadProperties;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
public final class ThreadPoolManager {
private final ExecutorFactory executorFactory = ExecutorFactory.create();
private final Map<String, Executor> pools = new ConcurrentHashMap<>();
public Executor getRedisExecutor() {
return pools.computeIfAbsent("mq", s -> executorFactory.get(ExecutorFactory.MQ));
}
public Executor getEventBusExecutor() {
return pools.computeIfAbsent("event-bus", s -> executorFactory.get(ExecutorFactory.EVENT_BUS));
}
public ScheduledExecutorService getPushClientTimer() {
return (ScheduledExecutorService) pools.computeIfAbsent("push-client-timer"
, s -> executorFactory.get(ExecutorFactory.PUSH_CLIENT));
}
public ScheduledExecutorService getPushTaskTimer() {
return (ScheduledExecutorService) pools.computeIfAbsent("push-task-timer"
, s -> executorFactory.get(ExecutorFactory.PUSH_TASK));
}
public ScheduledExecutorService getAckTimer() {
return (ScheduledExecutorService) pools.computeIfAbsent("ack-timer"
, s -> executorFactory.get(ExecutorFactory.ACK_TIMER));
}
public void register(String name, Executor executor) {
Objects.requireNonNull(name);
Objects.requireNonNull(executor);
pools.put(name, executor);
}
public Map<String, Executor> getActivePools() {
return pools;
}
}
ClientExecutorFactory.java 客户端线程池工厂
mpush-client/resources/META-INF/services/com.mpush.api.spi.common.ExecutorFactory1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44package com.mpush.client;
import com.mpush.api.spi.Spi;
import com.mpush.common.CommonExecutorFactory;
import com.mpush.tools.log.Logs;
import com.mpush.tools.thread.NamedPoolThreadFactory;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static com.mpush.tools.config.CC.mp.thread.pool.ack_timer;
import static com.mpush.tools.config.CC.mp.thread.pool.push_client;
import static com.mpush.tools.thread.ThreadNames.T_ARK_REQ_TIMER;
import static com.mpush.tools.thread.ThreadNames.T_PUSH_CLIENT_TIMER;
/**
* 此线程池可伸缩,线程空闲一定时间后回收,新请求重新创建线程
*/
1) (order =
public final class ClientExecutorFactory extends CommonExecutorFactory {
public Executor get(String name) {
switch (name) {
case PUSH_CLIENT: {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(push_client
, new NamedPoolThreadFactory(T_PUSH_CLIENT_TIMER), (r, e) -> r.run() // run caller thread
);
executor.setRemoveOnCancelPolicy(true);
return executor;
}
case ACK_TIMER: {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(ack_timer,
new NamedPoolThreadFactory(T_ARK_REQ_TIMER),
(r, e) -> Logs.PUSH.error("one ack context was rejected, context=" + r)
);
executor.setRemoveOnCancelPolicy(true);
return executor;
}
default:
return super.get(name);
}
}
}
ServerExecutorFactory 服务端线程池工厂
mpush-core/resources/META-INF/services/com.mpush.api.spi.common.ExecutorFactory1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40/**
* 此线程池可伸缩,线程空闲一定时间后回收,新请求重新创建线程
*/
1) (order =
public final class ServerExecutorFactory extends CommonExecutorFactory {
public Executor get(String name) {
final ThreadPoolConfig config;
switch (name) {
case MQ:
config = ThreadPoolConfig
.build(T_MQ)
.setCorePoolSize(CC.mp.thread.pool.mq.min)
.setMaxPoolSize(CC.mp.thread.pool.mq.max)
.setKeepAliveSeconds(TimeUnit.SECONDS.toSeconds(10))
.setQueueCapacity(CC.mp.thread.pool.mq.queue_size)
.setRejectedPolicy(ThreadPoolConfig.REJECTED_POLICY_CALLER_RUNS);
break;
case PUSH_TASK:
return new ScheduledThreadPoolExecutor(push_task, new NamedPoolThreadFactory(T_PUSH_CENTER_TIMER),
(r, e) -> {
throw new PushException("one push task was rejected. task=" + r);
}
);
case ACK_TIMER: {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(ack_timer,
new NamedPoolThreadFactory(T_ARK_REQ_TIMER),
(r, e) -> Logs.PUSH.error("one ack context was rejected, context=" + r)
);
executor.setRemoveOnCancelPolicy(true);
return executor;
}
default:
return super.get(name);
}
return get(config);
}
}
ExecutorFactory.java 线程池工厂接口(SPI)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24package com.mpush.api.spi.common;
import com.mpush.api.spi.SpiLoader;
import java.util.concurrent.Executor;
/**
* Created by yxx on 2016/5/20.
*
* @author ohun@live.cn
*/
public interface ExecutorFactory {
String PUSH_CLIENT = "push-client";
String PUSH_TASK = "push-task";
String ACK_TIMER = "ack-timer";
String EVENT_BUS = "event-bus";
String MQ = "mq";
Executor get(String name);
static ExecutorFactory create() {
return SpiLoader.load(ExecutorFactory.class);
}
}
CommonExecutorFactory.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78package com.mpush.common;
import com.mpush.api.spi.common.ExecutorFactory;
import com.mpush.tools.config.CC;
import com.mpush.tools.log.Logs;
import com.mpush.tools.thread.NamedPoolThreadFactory;
import com.mpush.tools.thread.pool.DefaultExecutor;
import com.mpush.tools.thread.pool.DumpThreadRejectedHandler;
import com.mpush.tools.thread.pool.ThreadPoolConfig;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static com.mpush.tools.config.CC.mp.thread.pool.ack_timer;
import static com.mpush.tools.config.CC.mp.thread.pool.push_client;
import static com.mpush.tools.thread.ThreadNames.T_ARK_REQ_TIMER;
import static com.mpush.tools.thread.ThreadNames.T_EVENT_BUS;
import static com.mpush.tools.thread.ThreadNames.T_PUSH_CLIENT_TIMER;
/**
* Created by ohun on 2017/7/15.
*
* @author ohun@live.cn (夜色)
*/
public class CommonExecutorFactory implements ExecutorFactory {
protected Executor get(ThreadPoolConfig config) {
String name = config.getName();
int corePoolSize = config.getCorePoolSize();
int maxPoolSize = config.getMaxPoolSize();
int keepAliveSeconds = config.getKeepAliveSeconds();
BlockingQueue<Runnable> queue = config.getQueue();
return new DefaultExecutor(corePoolSize
, maxPoolSize
, keepAliveSeconds
, TimeUnit.SECONDS
, queue
, new NamedPoolThreadFactory(name)
, new DumpThreadRejectedHandler(config));
}
public Executor get(String name) {
final ThreadPoolConfig config;
switch (name) {
case EVENT_BUS:
config = ThreadPoolConfig
.build(T_EVENT_BUS)
.setCorePoolSize(CC.mp.thread.pool.event_bus.min)
.setMaxPoolSize(CC.mp.thread.pool.event_bus.max)
.setKeepAliveSeconds(TimeUnit.SECONDS.toSeconds(10))
.setQueueCapacity(CC.mp.thread.pool.event_bus.queue_size)
.setRejectedPolicy(ThreadPoolConfig.REJECTED_POLICY_CALLER_RUNS);
break;
case PUSH_CLIENT: {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(push_client
, new NamedPoolThreadFactory(T_PUSH_CLIENT_TIMER), (r, e) -> r.run() // run caller thread
);
executor.setRemoveOnCancelPolicy(true);
return executor;
}
case ACK_TIMER: {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(ack_timer,
new NamedPoolThreadFactory(T_ARK_REQ_TIMER),
(r, e) -> Logs.PUSH.error("one ack context was rejected, context=" + r)
);
executor.setRemoveOnCancelPolicy(true);
return executor;
}
default:
throw new IllegalArgumentException("no executor for " + name);
}
return get(config);
}
}
ThreadPoolConfig.java 线程池配置1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135/*
* (C) Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* ohun@live.cn (夜色)
*/
package com.mpush.tools.thread.pool;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
public final class ThreadPoolConfig {
public static final int REJECTED_POLICY_ABORT = 0;
public static final int REJECTED_POLICY_DISCARD = 1;
public static final int REJECTED_POLICY_CALLER_RUNS = 2;
private String name;//名字
private int corePoolSize; //最小线程大小
private int maxPoolSize; //最大线程大小
private int queueCapacity; // 允许缓冲在队列中的任务数 (0:不缓冲、负数:无限大、正数:缓冲的任务数)
private int keepAliveSeconds;// 存活时间
private int rejectedPolicy = REJECTED_POLICY_ABORT;
public ThreadPoolConfig(String name) {
this.name = name;
}
public String getName() {
return name;
}
public ThreadPoolConfig setName(String name) {
this.name = name;
return this;
}
public int getCorePoolSize() {
return corePoolSize;
}
public ThreadPoolConfig setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
return this;
}
public int getMaxPoolSize() {
return maxPoolSize;
}
public ThreadPoolConfig setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
return this;
}
public int getQueueCapacity() {
return queueCapacity;
}
public ThreadPoolConfig setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
return this;
}
public int getKeepAliveSeconds() {
return keepAliveSeconds;
}
public ThreadPoolConfig setKeepAliveSeconds(long keepAliveSeconds) {
this.keepAliveSeconds = (int) keepAliveSeconds;
return this;
}
public int getRejectedPolicy() {
return rejectedPolicy;
}
public ThreadPoolConfig setRejectedPolicy(int rejectedPolicy) {
this.rejectedPolicy = rejectedPolicy;
return this;
}
public static ThreadPoolConfig buildFixed(String name, int threads, int queueCapacity) {
return new ThreadPoolConfig(name)
.setCorePoolSize(threads)
.setMaxPoolSize(threads)
.setQueueCapacity(queueCapacity)
.setKeepAliveSeconds(0);
}
public static ThreadPoolConfig buildCached(String name) {
return new ThreadPoolConfig(name)
.setKeepAliveSeconds(0);
}
public static ThreadPoolConfig build(String name) {
return new ThreadPoolConfig(name);
}
public BlockingQueue<Runnable> getQueue() {
BlockingQueue<Runnable> blockingQueue;
if (queueCapacity == 0) {
blockingQueue = new SynchronousQueue<>();
} else if (queueCapacity < 0) {
blockingQueue = new LinkedBlockingQueue<>();
} else {
blockingQueue = new LinkedBlockingQueue<>(queueCapacity);
}
return blockingQueue;
}
public String toString() {
return "ThreadPoolConfig{" +
"name='" + name + '\'' +
", corePoolSize=" + corePoolSize +
", maxPoolSize=" + maxPoolSize +
", queueCapacity=" + queueCapacity +
", keepAliveSeconds=" + keepAliveSeconds +
'}';
}
}
功能组件文章目录: