Hystrix-06丨Hystrix降级机制

Posted by jiefang on December 27, 2020

Hystrix降级机制

以下四种情况将触发getFallback()调用:

  • run()方法抛出非HystrixBadRequestException异常
  • run()方法调用超时
  • 熔断器开启拦截调用;
  • 线程池/队列/信号量是否已满;

熔断器开启

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        executionHook.onStart(_cmd);
        //熔断器关闭
        if (circuitBreaker.allowRequest()) {
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        executionSemaphore.release();
                    }
                }
            };
            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };

            if (executionSemaphore.tryAcquire()) {
                try {
                    /* used to track userThreadExecutionTime */
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            //获取信号量拒绝,fallback降级    
            } else {
                return handleSemaphoreRejectionViaFallback();
            }
        //熔断器开启,fallback降级    
        } else {
            return handleShortCircuitViaFallback();
        }
    }

AbstractCommand#executeCommandAndObserve#handleFallback

此方法定义用于Command执行时异常fallback降级。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
        @Override
        public Observable<R> call(Throwable t) {
            Exception e = getExceptionFromThrowable(t);
            executionResult = executionResult.setExecutionException(e);
            //线程池满拒绝异常
            if (e instanceof RejectedExecutionException) {
                return handleThreadPoolRejectionViaFallback(e);
            //超时异常    
            } else if (t instanceof HystrixTimeoutException) {
                return handleTimeoutViaFallback();
            //请求失败异常    
            } else if (t instanceof HystrixBadRequestException) {
                return handleBadRequestByEmittingError(e);
            } else {
                //其它异常
                if (e instanceof HystrixBadRequestException) {
                    eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                    return Observable.error(e);
                }
                return handleFailureViaFallback(e);
            }
        }
    };
}

getFallbackOrThrowException()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
    final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
    // record the executionResult
    executionResult = executionResult.addEvent((int) latency, eventType);

    if (shouldNotBeWrapped(originalException)){
        /* executionHook for all errors */
        Exception e = wrapWithOnErrorHook(failureType, originalException);
        return Observable.error(e);
    } else if (isUnrecoverable(originalException)) {
        logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);
        /* executionHook for all errors */
        Exception e = wrapWithOnErrorHook(failureType, originalException);
        return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null));
    } else {
        if (isRecoverableError(originalException)) {
            logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);
        }
		//是否开启降级,默认是true
        if (properties.fallbackEnabled().get()) {
            /* fallback behavior is permitted so attempt */
            final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
                @Override
                public void call(Notification<? super R> rNotification) {
                    setRequestContextIfNeeded(requestContext);
                }
            };

            final Action1<R> markFallbackEmit = new Action1<R>() {
                @Override
                public void call(R r) {
                    if (shouldOutputOnNextEvents()) {
                        executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT);
                        eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey);
                    }
                }
            };

            final Action0 markFallbackCompleted = new Action0() {
                @Override
                public void call() {
                    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                    eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey);
                    executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_SUCCESS);
                }
            };

            final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {
                @Override
                public Observable<R> call(Throwable t) {
                    Exception e = originalException;
                    Exception fe = getExceptionFromThrowable(t);

                    if (fe instanceof UnsupportedOperationException) {
                        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                        logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we're throwing the exception and someone higher will do something with it
                        eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);
                        executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);

                        /* executionHook for all errors */
                        e = wrapWithOnErrorHook(failureType, e);

                        return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe));
                    } else {
                        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                        logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);
                        eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);
                        executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);

                        /* executionHook for all errors */
                        e = wrapWithOnErrorHook(failureType, e);

                        return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe));
                    }
                }
            };
			//降级使用的信号量,默认10
            final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        fallbackSemaphore.release();
                    }
                }
            };

            Observable<R> fallbackExecutionChain;

            // acquire a permit
            if (fallbackSemaphore.tryAcquire()) {
                try {
                    if (isFallbackUserDefined()) {
                        executionHook.onFallbackStart(this);
                        fallbackExecutionChain = getFallbackObservable();
                    } else {
                        //same logic as above without the hook invocation
                        fallbackExecutionChain = getFallbackObservable();
                    }
                } catch (Throwable ex) {
                    //If hook or user-fallback throws, then use that as the result of the fallback lookup
                    fallbackExecutionChain = Observable.error(ex);
                }

                return fallbackExecutionChain
                    .doOnEach(setRequestContext)
                    .lift(new FallbackHookApplication(_cmd))
                    .lift(new DeprecatedOnFallbackHookApplication(_cmd))
                    .doOnNext(markFallbackEmit)
                    .doOnCompleted(markFallbackCompleted)
                    .onErrorResumeNext(handleFallbackError)
                    .doOnTerminate(singleSemaphoreRelease)
                    .doOnUnsubscribe(singleSemaphoreRelease);
            } else {
                return handleFallbackRejectionByEmittingError();
            }
        } else {
            return handleFallbackDisabledByEmittingError(originalException, failureType, message);
        }
    }
}