线程池
Dubbo里提供的线程池种类有:
- FixedThreadPool:创建一个具有固定个数线程的线程池。
- LimitedThreadPool:创建一个线程池,这个线程池中的线程个数随着需要量动态增加,但是数量不超过配置的阈值。另外,空闲线程不会被回收,会一直存在。
- EagerThreadPool:创建一个线程池,在这个线程池中,当所有核心线程都处于忙碌状态时,将创建新的线程来执行新任务,而不是把任务放入线程池阻塞队列。
- CachedThreadPool:创建一个自适应线程池,当线程空闲1分钟时,线程会被回收;当有新请求到来时,会创建新线程。
ThreadPool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//Dubbo SPI 拓展点,默认为 "fixed"
@SPI("fixed")
public interface ThreadPool {
/**
* Thread pool
*
* @param url URL contains thread parameter
* @return thread pool
*/
@Adaptive({THREADPOOL_KEY})//基于 Dubbo SPI Adaptive 机制,加载对应的线程池实现,使用 URL.threadpool 属性
//获得对应的线程池的执行器
Executor getExecutor(URL url);
}
FixedThreadPool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package org.apache.dubbo.common.threadpool.support.fixed;
//固定大小线程池
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
//线程名前缀Dubbo-thread-1
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
//核心线程数和最大线程数,默认200
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
//队列默认长度为0
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
//如果队列长度为0,使用SynchronousQueue,否则使用LinkedBlockingQueue
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
//自定义拒绝策略
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
CachedThreadPool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package org.apache.dubbo.common.threadpool.support.cached;
public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
//核心线程数默认为0
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
//最大线程数,默认为Integer最大值
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
//队列默认长度为0
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
LimitedThreadPool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class LimitedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
//默认为0
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
//默认为200
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
EagerThreadPool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//当核心线程都处于繁忙状态时,创建新线程,而不是将任务放入阻塞队列
public class EagerThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
//默认为0
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
//Integer最大值
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
//默认为0
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
// init queue and executor
TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedInternalThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
return executor;
}
}
拒绝策略
AbortPolicyWithReport
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
private final String threadName;
private final URL url;
private static volatile long lastPrintTime = 0;
private static final long TEN_MINUTES_MILLS = 10 * 60 * 1000;
private static final String OS_WIN_PREFIX = "win";
private static final String OS_NAME_KEY = "os.name";
private static final String WIN_DATETIME_FORMAT = "yyyy-MM-dd_HH-mm-ss";
private static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd_HH:mm:ss";
private static Semaphore guard = new Semaphore(1);
private static final String USER_HOME = System.getProperty("user.home");
public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//打警告日志
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: "
+ "%d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
//打印 JStack ,分析线程状态
dumpJStack();
dispatchThreadPoolExhaustedEvent(msg);
//抛出 RejectedExecutionException 异常
throw new RejectedExecutionException(msg);
}
/**
* dispatch ThreadPoolExhaustedEvent
* @param msg
*/
public void dispatchThreadPoolExhaustedEvent(String msg) {
EventDispatcher.getDefaultExtension().dispatch(new ThreadPoolExhaustedEvent(this, msg));
}
private void dumpJStack() {
long now = System.currentTimeMillis();
//dump every 10 minutes每10分钟打印一次
if (now - lastPrintTime < TEN_MINUTES_MILLS) {
return;
}
//只需要1个线程打印日志
if (!guard.tryAcquire()) {
return;
}
ExecutorService pool = Executors.newSingleThreadExecutor();
pool.execute(() -> {
String dumpPath = getDumpPath();
SimpleDateFormat sdf;
String os = System.getProperty(OS_NAME_KEY).toLowerCase();
// window system don't support ":" in file name
if (os.contains(OS_WIN_PREFIX)) {
sdf = new SimpleDateFormat(WIN_DATETIME_FORMAT);
} else {
sdf = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT);
}
String dateStr = sdf.format(new Date());
//try-with-resources
try (FileOutputStream jStackStream = new FileOutputStream(
new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
//打印jstack
JVMUtil.jstack(jStackStream);
} catch (Throwable t) {
logger.error("dump jStack error", t);
} finally {
guard.release();
}
lastPrintTime = System.currentTimeMillis();
});
//must shutdown thread pool ,if not will lead to OOM
pool.shutdown();
}
private String getDumpPath() {
final String dumpPath = url.getParameter(DUMP_DIRECTORY);
if (StringUtils.isEmpty(dumpPath)) {
return USER_HOME;
}
final File dumpDirectory = new File(dumpPath);
if (!dumpDirectory.exists()) {
if (dumpDirectory.mkdirs()) {
logger.info(format("Dubbo dump directory[%s] created", dumpDirectory.getAbsolutePath()));
} else {
logger.warn(format("Dubbo dump directory[%s] can't be created, use the 'user.home'[%s]",
dumpDirectory.getAbsolutePath(), USER_HOME));
return USER_HOME;
}
}
return dumpPath;
}
}
JVMUtil#jstack()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class JVMUtil {
public static void jstack(OutputStream stream) throws Exception {
ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
for (ThreadInfo threadInfo : threadMxBean.dumpAllThreads(true, true)) {
stream.write(getThreadDumpString(threadInfo).getBytes());
}
}
private static String getThreadDumpString(ThreadInfo threadInfo) {
StringBuilder sb = new StringBuilder("\"" + threadInfo.getThreadName() + "\"" +
" Id=" + threadInfo.getThreadId() + " " +
threadInfo.getThreadState());
if (threadInfo.getLockName() != null) {
sb.append(" on " + threadInfo.getLockName());
}
if (threadInfo.getLockOwnerName() != null) {
sb.append(" owned by \"" + threadInfo.getLockOwnerName() +
"\" Id=" + threadInfo.getLockOwnerId());
}
if (threadInfo.isSuspended()) {
sb.append(" (suspended)");
}
if (threadInfo.isInNative()) {
sb.append(" (in native)");
}
sb.append('\n');
int i = 0;
StackTraceElement[] stackTrace = threadInfo.getStackTrace();
MonitorInfo[] lockedMonitors = threadInfo.getLockedMonitors();
for (; i < stackTrace.length && i < 32; i++) {
StackTraceElement ste = stackTrace[i];
sb.append("\tat " + ste.toString());
sb.append('\n');
if (i == 0 && threadInfo.getLockInfo() != null) {
Thread.State ts = threadInfo.getThreadState();
switch (ts) {
case BLOCKED:
sb.append("\t- blocked on " + threadInfo.getLockInfo());
sb.append('\n');
break;
case WAITING:
sb.append("\t- waiting on " + threadInfo.getLockInfo());
sb.append('\n');
break;
case TIMED_WAITING:
sb.append("\t- waiting on " + threadInfo.getLockInfo());
sb.append('\n');
break;
default:
}
}
for (MonitorInfo mi : lockedMonitors) {
if (mi.getLockedStackDepth() == i) {
sb.append("\t- locked " + mi);
sb.append('\n');
}
}
}
if (i < stackTrace.length) {
sb.append("\t...");
sb.append('\n');
}
LockInfo[] locks = threadInfo.getLockedSynchronizers();
if (locks.length > 0) {
sb.append("\n\tNumber of locked synchronizers = " + locks.length);
sb.append('\n');
for (LockInfo li : locks) {
sb.append("\t- " + li);
sb.append('\n');
}
}
sb.append('\n');
return sb.toString();
}
}