Ribbon-05丨RestTemplate实际调用与IPing

Posted by jiefang on December 11, 2020

RestTemplate实际调用与IPing

RestTemplate实际调用过程

LoadBalancerInterceptor#intercept()

1
2
3
4
5
6
7
8
9
10
11
12
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    @Override
	public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) throws IOException {
		final URI originalUri = request.getURI();
		String serviceName = originalUri.getHost();
		Assert.state(serviceName != null,
				"Request URI does not contain a valid hostname: " + originalUri);
         //创建LoadBalancerRequest并执行RibbonLoadBalancerClient#execute方法
		return this.loadBalancer.execute(serviceName,this.requestFactory.createRequest(request, body, execution));
	}
}

RibbonLoadBalancerClient#execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class RibbonLoadBalancerClient implements LoadBalancerClient {
	@Override
	public <T> T execute(String serviceId, LoadBalancerRequest<T> request)throws IOException {
		return execute(serviceId, request, null);
	}
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)throws IOException {
         //从ApplicationContext中获取负载均衡器,默认为ZoneAwareLoadBalancer
		ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
         //通过负载均衡器的负载均衡算法获取一个Server
		Server server = getServer(loadBalancer, hint);
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}
		RibbonServer ribbonServer = new RibbonServer(serviceId, server,
				isSecure(server, serviceId),
				serverIntrospector(serviceId).getMetadata(server));
		
		return execute(serviceId, ribbonServer, request);
	}
	@Override
	public <T> T execute(String serviceId, ServiceInstance serviceInstance,
			LoadBalancerRequest<T> request) throws IOException {
		Server server = null;
		if (serviceInstance instanceof RibbonServer) {
			server = ((RibbonServer) serviceInstance).getServer();
		}
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}
		RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);
		RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
		try {
             //执行真正的远程调用
			T returnVal = request.apply(serviceInstance);
			statsRecorder.recordStats(returnVal);
			return returnVal;
		}catch (IOException ex) {
			statsRecorder.recordStats(ex);
			throw ex;
		}catch (Exception ex) {
			statsRecorder.recordStats(ex);
			ReflectionUtils.rethrowRuntimeException(ex);
		}
		return null;
	}    
}

LoadBalancerRequest#apply()

实际调用的是其子类的方法,为LoadBalancerRequestFactory的匿名内部类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class LoadBalancerRequestFactory {
     //创建LoadBalancerRequest时实现的匿名内部类
	public LoadBalancerRequest<ClientHttpResponse> createRequest(
			final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) {
		return instance -> {
			HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
					this.loadBalancer);
			if (this.transformers != null) {
				for (LoadBalancerRequestTransformer transformer : this.transformers) {
					serviceRequest = transformer.transformRequest(serviceRequest,
							instance);
				}
			}
             //实际调用http请求
			return execution.execute(serviceRequest, body);
		};
	}
}

execution来源

InterceptingClientHttpRequest.InterceptingRequestExecution#execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private class InterceptingRequestExecution implements ClientHttpRequestExecution {
    private final Iterator<ClientHttpRequestInterceptor> iterator;
    public InterceptingRequestExecution() {
        this.iterator = interceptors.iterator();
    }
    @Override
    public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
        if (this.iterator.hasNext()) {
            ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
            return nextInterceptor.intercept(request, body, this);
        }else {
            HttpMethod method = request.getMethod();
            Assert.state(method != null, "No standard HTTP method");
            //URL转换
            ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
            request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
            if (body.length > 0) {
                if (delegate instanceof StreamingHttpOutputMessage) {
                    StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                    streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
                }
                else {
                    StreamUtils.copy(body, delegate.getBody());
                }
            }
            //真正的远程调用
            return delegate.execute();
        }
    }
}

URL转换

ServiceRequestWrapper#getURI()

1
2
3
4
5
6
7
public class ServiceRequestWrapper extends HttpRequestWrapper {
	@Override
	public URI getURI() {
		URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
		return uri;
	}
}

RibbonLoadBalancerClient#reconstructURI()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class RibbonLoadBalancerClient implements LoadBalancerClient {
		@Override
	public URI reconstructURI(ServiceInstance instance, URI original) {
		Assert.notNull(instance, "instance can not be null");
		String serviceId = instance.getServiceId();
		RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);
		URI uri;
		Server server;
		if (instance instanceof RibbonServer) {
			RibbonServer ribbonServer = (RibbonServer) instance;
			server = ribbonServer.getServer();
             //获取uri
			uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
		}
		else {
			server = new Server(instance.getScheme(), instance.getHost(),
					instance.getPort());
			IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
			ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
			uri = updateToSecureConnectionIfNeeded(original, clientConfig,
					serverIntrospector, server);
		}
         //对uri进行转换
		return context.reconstructURIWithServer(server, uri);
	}
}

AbstractClientHttpRequest#execute()

1
2
3
4
5
6
7
@Override
public final ClientHttpResponse execute() throws IOException {
	assertNotExecuted();
	ClientHttpResponse result = executeInternal(this.headers);
	this.executed = true;
	return result;
}

SimpleStreamingClientHttpRequest#executeInternal

使用spring-web.jar包里的普通http请求处理类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
	try {
		if (this.body != null) {
			this.body.close();
		}
		else {
			SimpleBufferingClientHttpRequest.addHeaders(this.connection, headers);
			this.connection.connect();
			// Immediately trigger the request in a no-output scenario as well
			this.connection.getResponseCode();
		}
	}
	catch (IOException ex) {
		// ignore
	}
	return new SimpleClientHttpResponse(this.connection);
}

ribbon默认IPing

DummyPing

RibbonClientConfiguration注入的DummyPing不起作用.

1
2
3
4
5
6
7
8
9
10
public class RibbonClientConfiguration {
    @Bean
	@ConditionalOnMissingBean
	public IPing ribbonPing(IClientConfig config) {
		if (this.propertiesFactory.isSet(IPing.class, name)) {
			return this.propertiesFactory.get(IPing.class, config, name);
		}
		return new DummyPing();
	}
}

NIWSDiscoveryPing实例化

真正起作用的是EurekaRibbonClientConfiguration注入的NIWSDiscoveryPing

1
2
3
4
5
6
7
8
9
10
11
12
public class EurekaRibbonClientConfiguration {
    @Bean
	@ConditionalOnMissingBean
	public IPing ribbonPing(IClientConfig config) {
		if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
			return this.propertiesFactory.get(IPing.class, config, serviceId);
		}
		NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
		ping.initWithNiwsConfig(config);
		return ping;
	}
}

IPing定时调度

ZoneAwareLoadBalancer初始化对IPing进行定时调度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,ServerListUpdater serverListUpdater) {
    super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,ServerListUpdater serverListUpdater) {
    super(clientConfig, rule, ping);
    this.serverListImpl = serverList;
    this.filter = filter;
    this.serverListUpdater = serverListUpdater;
    if (filter instanceof AbstractServerListFilter) {
        ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
    }
    restOfInit(clientConfig);
}
public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) {
    initWithConfig(config, rule, ping, createLoadBalancerStatsFromConfig(config));
}
BaseLoadBalancer#initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {
    this.config = clientConfig;
    String clientName = clientConfig.getClientName();
    this.name = clientName;
    int pingIntervalTime = Integer.parseInt(""
            + clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerPingInterval,Integer.parseInt("30")));
    int maxTotalPingTime = Integer.parseInt(""
            + clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,
                    Integer.parseInt("2")));
    //设置ping时间周期为30S
    setPingInterval(pingIntervalTime);
    setMaxTotalPingTime(maxTotalPingTime);
    
    setRule(rule);
    //设置Timer定时调度
    setPing(ping);

    setLoadBalancerStats(stats);
    rule.setLoadBalancer(this);
    if (ping instanceof AbstractLoadBalancerPing) {
        ((AbstractLoadBalancerPing) ping).setLoadBalancer(this);
    }
    logger.info("Client: {} instantiated a LoadBalancer: {}", name, this);
    boolean enablePrimeConnections = clientConfig.get(
            CommonClientConfigKey.EnablePrimeConnections, DefaultClientConfigImpl.DEFAULT_ENABLE_PRIME_CONNECTIONS);

    if (enablePrimeConnections) {
        this.setEnablePrimingConnections(true);
        PrimeConnections primeConnections = new PrimeConnections(
                this.getName(), clientConfig);
        this.setPrimeConnections(primeConnections);
    }
    init();
}

BaseLoadBalancer#setPing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
BaseLoadBalancer#setPing(IPing ping) {
    if (ping != null) {
        if (!ping.equals(this.ping)) {
            this.ping = ping;
            setupPingTask(); // since ping data changed
        }
    } else {
        this.ping = null;
        // cancel the timer task
        lbTimer.cancel();
    }
}
BaseLoadBalancer#setupPingTask() {
    if (canSkipPing()) {
        return;
    }
    if (lbTimer != null) {
        lbTimer.cancel();
    }
    //Timer定时调度
    lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
            true);
    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
    forceQuickPing();
}

PingTask

1
2
3
4
5
6
7
8
9
class PingTask extends TimerTask {
    public void run() {
        try {
        	new Pinger(pingStrategy).runPinger();
        } catch (Exception e) {
            logger.error("LoadBalancer [{}]: Error pinging", name, e);
        }
    }
}

Pinger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public void runPinger() throws Exception {
    if (!pingInProgress.compareAndSet(false, true)) { 
        return; // Ping in progress - nothing to do
    }
    Server[] allServers = null;
    boolean[] results = null;

    Lock allLock = null;
    Lock upLock = null;

    try {
        allLock = allServerLock.readLock();
        allLock.lock();
        allServers = allServerList.toArray(new Server[allServerList.size()]);
        allLock.unlock();

        int numCandidates = allServers.length;
        //调用IPing实现ping每一个服务实例,返回结果
        results = pingerStrategy.pingServers(ping, allServers);

        final List<Server> newUpList = new ArrayList<Server>();
        final List<Server> changedServers = new ArrayList<Server>();

        for (int i = 0; i < numCandidates; i++) {
            boolean isAlive = results[i];
            Server svr = allServers[i];
            boolean oldIsAlive = svr.isAlive();

            svr.setAlive(isAlive);

            if (oldIsAlive != isAlive) {
                changedServers.add(svr);
                logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
            		name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
            }

            if (isAlive) {
                newUpList.add(svr);
            }
        }
        upLock = upServerLock.writeLock();
        upLock.lock();
        upServerList = newUpList;
        upLock.unlock();

        notifyServerStatusChangeListener(changedServers);
    } finally {
        pingInProgress.set(false);
    }
}

SerialPingStrategy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static class SerialPingStrategy implements IPingStrategy {
    @Override
    public boolean[] pingServers(IPing ping, Server[] servers) {
        int numCandidates = servers.length;
        boolean[] results = new boolean[numCandidates];
        logger.debug("LoadBalancer:  PingTask executing [{}] servers configured", numCandidates);

        for (int i = 0; i < numCandidates; i++) {
            results[i] = false; /* Default answer is DEAD. */
            try {
                if (ping != null) {
                    results[i] = ping.isAlive(servers[i]);
                }
            } catch (Exception e) {
                logger.error("Exception while pinging Server: '{}'", servers[i], e);
            }
        }
        return results;
    }
}

NIWSDiscoveryPing#isAlive

简单的根据每个服务实例的状态是否存活来判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
NIWSDiscoveryPing#isAlive(Server server) {
    boolean isAlive = true;
    if (server!=null && server instanceof DiscoveryEnabledServer){
        DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;	            
        InstanceInfo instanceInfo = dServer.getInstanceInfo();
        if (instanceInfo!=null){	                
            InstanceStatus status = instanceInfo.getStatus();
            if (status!=null){
                isAlive = status.equals(InstanceStatus.UP);
            }
        }
    }
    return isAlive;
}