Hystrix-07丨Hystrix熔断器机制

Posted by jiefang on December 27, 2020

Hystrix熔断器机制

Hystrix熔断器机制

熔断器初始化

1
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);

AbstractCommand#initCircuitBreaker()

1
2
3
4
5
6
7
8
9
10
11
12
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
    if (enabled) {
        if (fromConstructor == null) {
            // get the default implementation of HystrixCircuitBreaker
            return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
        } else {
            return fromConstructor;
        }
    } else {
        return new NoOpCircuitBreaker();
    }
}

HystrixCircuitBreaker#getInstance()

熔断器基于commandKey创建,所以每个Http接口都会创建一个熔断器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
    // this should find it for all but the first time
    HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
    if (previouslyCached != null) {
        return previouslyCached;
    }
	//原子CAS,创建并放入Map
    HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
    if (cbForCommand == null) {
        //说明熔断器是新创建的,返回
        return circuitBreakersByCommand.get(key.name());
    } else {
        //说明另一个线程创建成功,则直接返回
        return cbForCommand;
    }
}

熔断器实现

HystrixCircuitBreakerImpl

熔断器实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
    private final HystrixCommandProperties properties;
    private final HystrixCommandMetrics metrics;

    /* track whether this circuit is open/closed at any given point in time (default to false==closed) */
    private AtomicBoolean circuitOpen = new AtomicBoolean(false);

    /* when the circuit was marked open or was last allowed to try a 'singleTest' */
    private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();

    protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        this.properties = properties;
        this.metrics = metrics;
    }
}

markSuccess()

1
2
3
4
5
6
7
8
9
//标记熔断器成功
public void markSuccess() {
    if (circuitOpen.get()) {
        if (circuitOpen.compareAndSet(true, false)) {
            //重新订阅metrics
            metrics.resetStream();
        }
    }
}

allowRequest()

是否允许请求通过熔断器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public boolean allowRequest() {
    //是否设置强制打开熔断器
    if (properties.circuitBreakerForceOpen().get()) {
        // properties have asked us to force the circuit open so we will allow NO requests
        return false;
    }
    //是否设置强制关闭熔断器
    if (properties.circuitBreakerForceClosed().get()) {
        // we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
        isOpen();
        // properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
        return true;
    }
    //返回是否允许请求通过
    return !isOpen() || allowSingleTest();
}

allowSingleTest()

熔断器在打开后,经过circuitBreakerSleepWindowInMilliseconds睡眠后尝试发送一个请求,看请求结果是否成功,成功则关闭熔断器;否则继续关闭,等待睡眠时间后再次尝试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public boolean allowSingleTest() {
    //熔断器断开时间或上次singleTest测试时间
    long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
    // 1) if the circuit is open
    // 2) and it's been longer than 'sleepWindow' since we opened the circuit
    //熔断器开启且当前时间>熔断器断开时间+睡眠时间
    if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
        // We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
        // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
        //更新timeCircuitOpenedOrWasLastTested为当前时间
        if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
            // if this returns true that means we set the time so we'll return true to allow the singleTest
            // if it returned false it means another thread raced us and allowed the singleTest before we did
            return true;
        }
    }
    return false;
}

isOpen()

判断熔断器现在是否打开状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    @Override
    public boolean isOpen() {
        //如果熔断器已打开,返回true
        if (circuitOpen.get()) {
            // if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
            return true;
        }
        // we're closed, so let's see if errors have made us so we should trip the circuit open
        //获取健康检查状况
        HealthCounts health = metrics.getHealthCounts();

        // check if we are past the statisticalWindowVolumeThreshold
        //检查一个时间窗口内请求数量<circuitBreakerRequestVolumeThreshold,必须大于等于才会继续判断
        if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
            // we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
            return false;
        }
		//检查一个时间窗口内异常比例<circuitBreakerErrorThresholdPercentage,必须大于等于才会继续判断
        if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
            return false;
        } else {
            // our failure rate is too high, trip the circuit
            //满足上面两个条件打开熔断器
            if (circuitOpen.compareAndSet(false, true)) {
                // if the previousValue was false then we want to set the currentTime
                circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
                return true;
            } else {
                //存在线程竞争,另外一个线程正在打开熔断器,直接返回true
                return true;
            }
        }
    }
}

熔断器使用

applyHystrixSemantics

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    executionHook.onStart(_cmd);
    //熔断器是否允许请求通过
    if (circuitBreaker.allowRequest()) {
        final TryableSemaphore executionSemaphore = getExecutionSemaphore();
        final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
        final Action0 singleSemaphoreRelease = new Action0() {
            @Override
            public void call() {
                if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                    executionSemaphore.release();
                }
            }
        };

        final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
            @Override
            public void call(Throwable t) {
                eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
            }
        };

        if (executionSemaphore.tryAcquire()) {
            try {
                /* used to track userThreadExecutionTime */
                executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } catch (RuntimeException e) {
                return Observable.error(e);
            }
        } else {
            return handleSemaphoreRejectionViaFallback();
        }
    //熔断器打开,fallback降级    
    } else {
        return handleShortCircuitViaFallback();
    }
}

executeCommandAndObserve()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

    final Action1<R> markEmits = new Action1<R>() {
        @Override
        public void call(R r) {
            if (shouldOutputOnNextEvents()) {
                executionResult = executionResult.addEvent(HystrixEventType.EMIT);
                eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
            }
            if (commandIsScalar()) {
                long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                //熔断器标记成功
                circuitBreaker.markSuccess();
            }
        }
    };

    final Action0 markOnCompleted = new Action0() {
        @Override
        public void call() {
            if (!commandIsScalar()) {
                long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                //熔断器标记成功
                circuitBreaker.markSuccess();
            }
        }
    };
}