eureka server同步任务批处理机制
向队列中放入任务
ApplicationResource#addInstance()-->PeerAwareInstanceRegistryImpl#register()-->PeerAwareInstanceRegistryImpl#replicateToPeers-->PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers-->PeerEurekaNode#register-->TaskDispatcher#process()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime);
}
//创建任务调度程序
TaskDispatchers#createBatchingTaskDispatcher(String id,int maxBufferSize,int workloadSize, int workerCount, long maxBatchingDelay, long congestionRetryDelayMs,long networkFailureRetryMs,TaskProcessor<T> taskProcessor) {
final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
);
final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
return new TaskDispatcher<ID, T>() {
//提交任务到acceptorQueue
@Override
public void process(ID id, T task, long expiryTime) {
acceptorExecutor.process(id, task, expiryTime);
}
@Override
public void shutdown() {
acceptorExecutor.shutdown();
taskExecutor.shutdown();
}
};
}
//用于接收写入任务的队列
private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<>();
AcceptorExecutor#process(ID id, T task, long expiryTime) {
acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
acceptedTasks++;
}
拆分队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
//待处理任务的id
private final Deque<ID> processingOrder = new LinkedList<>();
//待处理任务
private final Map<ID, TaskHolder<ID, T>> pendingTasks = new HashMap<>();
class AcceptorRunner implements Runnable {
@Override
public void run() {
long scheduleTime = 0;
while (!isShutdown.get()) {
try {
drainInputQueues();
int totalItems = processingOrder.size();
long now = System.currentTimeMillis();
if (scheduleTime < now) {
scheduleTime = now + trafficShaper.transmissionDelay();
}
if (scheduleTime <= now) {
//批处理打包
assignBatchWork();
//处理单个任务
assignSingleItemWork();
}
// If no worker is requesting data or there is a delay injected by the traffic shaper,
// sleep for some time to avoid tight loop.
if (totalItems == processingOrder.size()) {
Thread.sleep(10);
}
} catch (InterruptedException ex) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery AcceptorThread error", e);
}
}
}
//处理输入队列
private void drainInputQueues() throws InterruptedException {
do {
drainReprocessQueue();
drainAcceptorQueue();
if (!isShutdown.get()) {
// If all queues are empty, block for a while on the acceptor queue
if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
if (taskHolder != null) {
appendTaskHolder(taskHolder);
}
}
}
} while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty());
}
//清空重新处理队列
private void drainReprocessQueue() {
long now = System.currentTimeMillis();
while (!reprocessQueue.isEmpty() && !isFull()) {
TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast();
ID id = taskHolder.getId();
if (taskHolder.getExpiryTime() <= now) {
expiredTasks++;
} else if (pendingTasks.containsKey(id)) {
overriddenTasks++;
} else {
pendingTasks.put(id, taskHolder);
processingOrder.addFirst(id);
}
}
if (isFull()) {
queueOverflows += reprocessQueue.size();
reprocessQueue.clear();
}
}
//处理接收任务队列中所有数据
private void drainAcceptorQueue() {
while (!acceptorQueue.isEmpty()) {
appendTaskHolder(acceptorQueue.poll());
}
}
//放入pendingTasks和processingOrder
private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {
if (isFull()) {
pendingTasks.remove(processingOrder.poll());
queueOverflows++;
}
TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
if (previousTask == null) {
processingOrder.add(taskHolder.getId());
} else {
overriddenTasks++;
}
}
}
取待处理队列中的数据打包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//queue中放入打包好的批处理任务
private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>();
//打包从待处理任务队列中获取放入batchWorkQueue
AcceptorRunner#assignBatchWork() {
if (hasEnoughTasksForNextBatch()) {
if (batchWorkRequests.tryAcquire(1)) {
long now = System.currentTimeMillis();
//len最大250
int len = Math.min(maxBatchingSize, processingOrder.size());
List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
//若holders里的数量小于250且待处理队列不为空
while (holders.size() < len && !processingOrder.isEmpty()) {
ID id = processingOrder.poll();
TaskHolder<ID, T> holder = pendingTasks.remove(id);
if (holder.getExpiryTime() > now) {
holders.add(holder);
} else {
expiredTasks++;
}
}
if (holders.isEmpty()) {
batchWorkRequests.release();
} else {
batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
//打包放入
batchWorkQueue.add(holders);
}
}
}
}
批处理打包的queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
//线程处理打包好的queue
static class BatchWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> {
BatchWorkerRunnable(String workerName,
AtomicBoolean isShutdown,
TaskExecutorMetrics metrics,
TaskProcessor<T> processor,
AcceptorExecutor<ID, T> acceptorExecutor) {
super(workerName, isShutdown, metrics, processor, acceptorExecutor);
}
@Override
public void run() {
try {
while (!isShutdown.get()) {
List<TaskHolder<ID, T>> holders = getWork();
metrics.registerExpiryTimes(holders);
List<T> tasks = getTasksOf(holders);
//处理任务
ProcessingResult result = processor.process(tasks);
switch (result) {
case Success:
break;
case Congestion:
case TransientError:
taskDispatcher.reprocess(holders, result);
break;
case PermanentError:
logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
}
metrics.registerTaskResult(result, tasks.size());
}
} catch (InterruptedException e) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery WorkerThread error", e);
}
}
private List<TaskHolder<ID, T>> getWork() throws InterruptedException {
BlockingQueue<List<TaskHolder<ID, T>>> workQueue = taskDispatcher.requestWorkItems();
List<TaskHolder<ID, T>> result;
do {
result = workQueue.poll(1, TimeUnit.SECONDS);
} while (!isShutdown.get() && result == null);
return (result == null) ? new ArrayList<>() : result;
}
private List<T> getTasksOf(List<TaskHolder<ID, T>> holders) {
List<T> tasks = new ArrayList<>(holders.size());
for (TaskHolder<ID, T> holder : holders) {
tasks.add(holder.getTask());
}
return tasks;
}
}
//发送http请求
ReplicationTaskProcessor#process(List<ReplicationTask> tasks) {
ReplicationList list = createReplicationListOf(tasks);
try {
EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
...
} catch (Throwable e) {
...
}
return ProcessingResult.Success;
}
//直接打包好的数据批量http发送到其它eureka server
Jersey2ReplicationClient#submitBatchUpdates(ReplicationList replicationList) {
Response response = null;
try {
//路径是peerreplication/batch/
response = jerseyClient.target(serviceUrl)
.path(PeerEurekaNode.BATCH_URL_PATH)
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.json(replicationList));
if (!isSuccess(response.getStatus())) {
return anEurekaHttpResponse(response.getStatus(), ReplicationListResponse.class).build();
}
ReplicationListResponse batchResponse = response.readEntity(ReplicationListResponse.class);
return anEurekaHttpResponse(response.getStatus(), batchResponse).type(MediaType.APPLICATION_JSON_TYPE).build();
} finally {
if (response != null) {
response.close();
}
}
}
接收批量复制事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
PeerReplicationResource#batchReplication(ReplicationList replicationList) {
try {
ReplicationListResponse batchResponse = new ReplicationListResponse();
for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
try {
batchResponse.addResponse(dispatch(instanceInfo));
} catch (Exception e) {
...
}
}
return Response.ok(batchResponse).build();
} catch (Throwable e) {
logger.error("Cannot execute batch Request", e);
return Response.status(Status.INTERNAL_SERVER_ERROR).build();
}
}
//分发注册、心跳、下线等事件
PeerReplicationResource#dispatch(ReplicationInstance instanceInfo) {
ApplicationResource applicationResource = createApplicationResource(instanceInfo);
InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);
String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
String instanceStatus = toString(instanceInfo.getStatus());
Builder singleResponseBuilder = new Builder();
switch (instanceInfo.getAction()) {
case Register:
singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
break;
case Heartbeat:
singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
break;
case Cancel:
singleResponseBuilder = handleCancel(resource);
break;
case StatusUpdate:
singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
break;
case DeleteStatusOverride:
singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
break;
}
return singleResponseBuilder.build();
}