Hystrix超时中断机制
HystrixObservableTimeoutOperator#call()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {
@Override
public Subscriber<? super R> call(final Subscriber<? super R> child) {
final CompositeSubscription s = new CompositeSubscription();
// if the child unsubscribes we unsubscribe our parent as well
child.add(s);
//定义超时执行抛出异常的方法
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {
@Override
public void run() {
child.onError(new HystrixTimeoutException());
}
});
//定义一个TimerListener用于定时任务线程池调度,如果超时会调用
TimerListener listener = new TimerListener() {
@Override
public void tick() {
// if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
// otherwise it means we lost a race and the run() execution completed or did not start
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
// report timeout failure
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
// shut down the original request
s.unsubscribe();
//抛出超时异常
timeoutRunnable.run();
//if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout
}
}
@Override
public int getIntervalTimeInMilliseconds() {
return originalCommand.properties.executionTimeoutInMilliseconds().get();
}
};
//提交listener至延时周期线程池执行
final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
// set externally so execute/queue can see this
originalCommand.timeoutTimer.set(tl);
/**
* If this subscriber receives values it means the parent succeeded/completed
*/
Subscriber<R> parent = new Subscriber<R>() {
//如果已执行完成,取消定时任务
@Override
public void onCompleted() {
if (isNotTimedOut()) {
// stop timer and pass notification through
tl.clear();
child.onCompleted();
}
}
//异常,取消定时任务
@Override
public void onError(Throwable e) {
if (isNotTimedOut()) {
// stop timer and pass notification through
tl.clear();
child.onError(e);
}
}
@Override
public void onNext(R v) {
if (isNotTimedOut()) {
child.onNext(v);
}
}
//判断是否不是超时
private boolean isNotTimedOut() {
// if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
//如果超时状态为已完成或可以设置为已完成
return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
}
};
// if s is unsubscribed we want to unsubscribe the parent
s.add(parent);
return parent;
}
}
HystrixTimer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class HystrixTimer {
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
//初始化延时周期线程池
startThreadIfNeeded();
// add the listener
Runnable r = new Runnable() {
@Override
public void run() {
try {
//执行tick()方法
listener.tick();
} catch (Exception e) {
logger.error("Failed while ticking TimerListener", e);
}
}
};
ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
return new TimerReference(listener, f);
}
}
HystrixTimer#startThreadIfNeeded()
1
2
3
4
5
6
7
8
9
protected void startThreadIfNeeded() {
//如果不存在就初始化
while (executor.get() == null || ! executor.get().isInitialized()) {
if (executor.compareAndSet(null, new ScheduledExecutor())) {
// initialize the executor that we 'won' setting
executor.get().initialize();
}
}
}
HystrixTimer#initialize()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void initialize() {
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
int coreSize = propertiesStrategy.getTimerThreadPoolProperties().getCorePoolSize().get();
ThreadFactory threadFactory = null;
if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
threadFactory = new ThreadFactory() {
final AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "HystrixTimer-" + counter.incrementAndGet());
thread.setDaemon(true);
return thread;
}
};
} else {
threadFactory = PlatformSpecific.getAppEngineThreadFactory();
}
executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);
initialized = true;
}
TimerReference
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static class TimerReference extends SoftReference<TimerListener> {
private final ScheduledFuture<?> f;
TimerReference(TimerListener referent, ScheduledFuture<?> f) {
super(referent);
this.f = f;
}
//取消定时任务
@Override
public void clear() {
super.clear();
// stop this ScheduledFuture from any further executions
f.cancel(false);
}
}