监控

监控服务目前看下来,只在MPushServer服务中启用;

  • 监控服务
  • 总收集器
  • 线程池信息收集
  • ……

监控服务

1、初始化总收集器;
2、打印所有监控信息到控制台;
3、根据系统负载将线程栈、堆栈输出到本地文件;

MonitorService.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package com.mpush.monitor.service;

import com.mpush.api.common.Monitor;
import com.mpush.api.service.BaseService;
import com.mpush.api.service.Listener;
import com.mpush.monitor.data.MonitorResult;
import com.mpush.monitor.data.ResultCollector;
import com.mpush.tools.Utils;
import com.mpush.tools.common.JVMUtil;
import com.mpush.tools.config.CC;
import com.mpush.tools.log.Logs;
import com.mpush.tools.thread.ThreadNames;

import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

public class MonitorService extends BaseService implements Monitor, Runnable {

private static final int FIRST_DUMP_JSTACK_LOAD_AVG = 2,
SECOND_DUMP_JSTACK_LOAD_AVG = 4,
THIRD_DUMP_JSTACK_LOAD_AVG = 6,
FIRST_DUMP_JMAP_LOAD_AVG = 4;

private static final String dumpLogDir = CC.mp.monitor.dump_dir;
private static final boolean dumpEnabled = CC.mp.monitor.dump_stack;
private static final boolean printLog = CC.mp.monitor.print_log;
private static final long dumpPeriod = CC.mp.monitor.dump_period.getSeconds();

private volatile boolean dumpFirstJstack = false;
private volatile boolean dumpSecondJstack = false;
private volatile boolean dumpThirdJstack = false;
private volatile boolean dumpJmap = false;

private final ResultCollector collector;

private final ThreadPoolManager threadPoolManager;

public MonitorService() {
threadPoolManager = new ThreadPoolManager();
collector = new ResultCollector(threadPoolManager);
}

private Thread thread;

@Override
public void run() {
while (isRunning()) {
MonitorResult result = collector.collect();

if (printLog) {
Logs.MONITOR.info(result.toJson());
}

if (dumpEnabled) {
dump();
}

try {
TimeUnit.SECONDS.sleep(dumpPeriod);
} catch (InterruptedException e) {
if (isRunning()) stop();
}
}
}

@Override
protected void doStart(Listener listener) throws Throwable {
if (printLog || dumpEnabled) {
thread = Utils.newThread(ThreadNames.T_MONITOR, this);
thread.setDaemon(true);
thread.start();
}
listener.onSuccess();
}

@Override
protected void doStop(Listener listener) throws Throwable {
if (thread != null && thread.isAlive()) thread.interrupt();
listener.onSuccess();
}

private void dump() {
double load = collector.getJvmInfo().load();
if (load > FIRST_DUMP_JSTACK_LOAD_AVG) {
if (!dumpFirstJstack) {
dumpFirstJstack = true;
JVMUtil.dumpJstack(dumpLogDir);
}
}

if (load > SECOND_DUMP_JSTACK_LOAD_AVG) {
if (!dumpSecondJstack) {
dumpSecondJstack = true;
JVMUtil.dumpJmap(dumpLogDir);
}
}

if (load > THIRD_DUMP_JSTACK_LOAD_AVG) {
if (!dumpThirdJstack) {
dumpThirdJstack = true;
JVMUtil.dumpJmap(dumpLogDir);
}
}

if (load > FIRST_DUMP_JMAP_LOAD_AVG) {
if (!dumpJmap) {
dumpJmap = true;
JVMUtil.dumpJmap(dumpLogDir);
}
}
}


@Override
public void monitor(String name, Thread thread) {

}

@Override
public void monitor(String name, Executor executor) {
threadPoolManager.register(name, executor);
}

public ThreadPoolManager getThreadPoolManager() {
return threadPoolManager;
}
}

总收集器

收集类型:

  • JVMInfo
  • JVMGC 垃圾回收
  • JVMMemory 堆
  • JVMThread 线程
  • JVMThreadPool 线程池

ResultCollector.java 监控信息收集器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.mpush.monitor.data;

import com.mpush.monitor.quota.impl.*;
import com.mpush.monitor.service.ThreadPoolManager;

/**
* Created by yxx on 2016/5/19.
*
* @author ohun@live.cn
*/
public class ResultCollector {
private final JVMInfo jvmInfo;
private final JVMGC jvmgc;
private final JVMMemory jvmMemory;
private final JVMThread jvmThread;
private final JVMThreadPool jvmThreadPool;

public ResultCollector(ThreadPoolManager threadPoolManager) {
this.jvmInfo = new JVMInfo();
this.jvmgc = new JVMGC();
this.jvmMemory = new JVMMemory();
this.jvmThread = new JVMThread();
this.jvmThreadPool = new JVMThreadPool(threadPoolManager);
}

public MonitorResult collect() {
MonitorResult result = new MonitorResult();
result.addResult("jvm-info", jvmInfo.monitor());
result.addResult("jvm-gc", jvmgc.monitor());
result.addResult("jvm-memory", jvmMemory.monitor());
result.addResult("jvm-thread", jvmThread.monitor());
result.addResult("jvm-thread-pool", jvmThreadPool.monitor());
return result;
}

public JVMInfo getJvmInfo() {
return jvmInfo;
}

public JVMGC getJvmgc() {
return jvmgc;
}

public JVMMemory getJvmMemory() {
return jvmMemory;
}

public JVMThread getJvmThread() {
return jvmThread;
}

public JVMThreadPool getJvmThreadPool() {
return jvmThreadPool;
}
}

线程池信息收集

获取各个线程池的信息,如corePoolSize、maxPoolSize、activeCount(workingThread)、poolSize(workThread)、queueSize(blockedTask)

组装为map.put(“event-bus”,poolInfo);

JVMThreadPool.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.mpush.monitor.quota.impl;

import com.mpush.monitor.quota.ThreadPoolQuota;
import com.mpush.monitor.service.ThreadPoolManager;
import io.netty.channel.EventLoopGroup;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

import static com.mpush.tools.Utils.getPoolInfo;


public class JVMThreadPool implements ThreadPoolQuota {
private final ThreadPoolManager threadPoolManager;

public JVMThreadPool(ThreadPoolManager threadPoolManager) {
this.threadPoolManager = threadPoolManager;
}

@Override
public Object monitor(Object... args) {
Map<String, Object> result = new HashMap<>();
Map<String, Executor> pools = threadPoolManager.getActivePools();
for (Map.Entry<String, Executor> entry : pools.entrySet()) {
String serviceName = entry.getKey();
Executor executor = entry.getValue();
if (executor instanceof ThreadPoolExecutor) {
result.put(serviceName, getPoolInfo((ThreadPoolExecutor) executor));
} else if (executor instanceof EventLoopGroup) {
result.put(serviceName, getPoolInfo((EventLoopGroup) executor));
}
}
return result;
}
}



功能组件文章目录:

------ 本文结束 感谢您的阅读 ------