Hystrix-05丨Hystrix超时中断机制

Posted by jiefang on December 26, 2020

Hystrix超时中断机制

HystrixCommand超时中断机制

HystrixObservableTimeoutOperator#call()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {
    @Override
    public Subscriber<? super R> call(final Subscriber<? super R> child) {
        final CompositeSubscription s = new CompositeSubscription();
        // if the child unsubscribes we unsubscribe our parent as well
        child.add(s);

        //定义超时执行抛出异常的方法
        final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {

            @Override
            public void run() {
                child.onError(new HystrixTimeoutException());
            }
        });
		//定义一个TimerListener用于定时任务线程池调度,如果超时会调用
        TimerListener listener = new TimerListener() {
			
            @Override
            public void tick() {
                // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
                // otherwise it means we lost a race and the run() execution completed or did not start
                if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                    // report timeout failure
                    originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);

                    // shut down the original request
                    s.unsubscribe();
					//抛出超时异常
                    timeoutRunnable.run();
                    //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout
                }
            }
            @Override
            public int getIntervalTimeInMilliseconds() {
                return originalCommand.properties.executionTimeoutInMilliseconds().get();
            }
        };
		//提交listener至延时周期线程池执行
        final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);

        // set externally so execute/queue can see this
        originalCommand.timeoutTimer.set(tl);
        /**
         * If this subscriber receives values it means the parent succeeded/completed
         */
        Subscriber<R> parent = new Subscriber<R>() {
			//如果已执行完成,取消定时任务
            @Override
            public void onCompleted() {
                if (isNotTimedOut()) {
                    // stop timer and pass notification through
                    tl.clear();
                    child.onCompleted();
                }
            }
			//异常,取消定时任务
            @Override
            public void onError(Throwable e) {
                if (isNotTimedOut()) {
                    // stop timer and pass notification through
                    tl.clear();
                    child.onError(e);
                }
            }

            @Override
            public void onNext(R v) {
                if (isNotTimedOut()) {
                    child.onNext(v);
                }
            }
			//判断是否不是超时
            private boolean isNotTimedOut() {
                // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
                //如果超时状态为已完成或可以设置为已完成
                return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
                    originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
            }
        };
        // if s is unsubscribed we want to unsubscribe the parent
        s.add(parent);
        return parent;
    }
}

HystrixTimer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class HystrixTimer {
    public Reference<TimerListener> addTimerListener(final TimerListener listener) {
        //初始化延时周期线程池
        startThreadIfNeeded();
        // add the listener
        Runnable r = new Runnable() {
            @Override
            public void run() {
                try {
                    //执行tick()方法
                    listener.tick();
                } catch (Exception e) {
                    logger.error("Failed while ticking TimerListener", e);
                }
            }
        };

        ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
        return new TimerReference(listener, f);
    }
}

HystrixTimer#startThreadIfNeeded()

1
2
3
4
5
6
7
8
9
    protected void startThreadIfNeeded() {
        //如果不存在就初始化
        while (executor.get() == null || ! executor.get().isInitialized()) {
            if (executor.compareAndSet(null, new ScheduledExecutor())) {
                // initialize the executor that we 'won' setting
                executor.get().initialize();
            }
        }
    }

HystrixTimer#initialize()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    public void initialize() {
        HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
        int coreSize = propertiesStrategy.getTimerThreadPoolProperties().getCorePoolSize().get();

        ThreadFactory threadFactory = null;
        if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
            threadFactory = new ThreadFactory() {
                final AtomicInteger counter = new AtomicInteger();

                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "HystrixTimer-" + counter.incrementAndGet());
                    thread.setDaemon(true);
                    return thread;
                }

            };
        } else {
            threadFactory = PlatformSpecific.getAppEngineThreadFactory();
        }

        executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);
        initialized = true;
    }

TimerReference

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static class TimerReference extends SoftReference<TimerListener> {
    private final ScheduledFuture<?> f;

    TimerReference(TimerListener referent, ScheduledFuture<?> f) {
        super(referent);
        this.f = f;
    }
    //取消定时任务
    @Override
    public void clear() {
        super.clear();
        // stop this ScheduledFuture from any further executions
        f.cancel(false);
    }
}