ByteTcc原理
开启TCC事务
服务A->CompensableHandlerInterceptor->CompensableMethodInterceptor->TransactionInterceptorImpl->TransactionInterceptor->...->TransactionManagerImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
* 事务管理器实现类
*/
public class TransactionManagerImpl implements TransactionManager, CompensableBeanFactoryAware {
/**
* 开启事务
* @throws NotSupportedException
* @throws SystemException
*/
public void begin() throws NotSupportedException, SystemException {
TransactionManager transactionManager = this.beanFactory.getTransactionManager();
CompensableManager compensableManager = this.beanFactory.getCompensableManager();
//根据当前线程从map中获取分布式事务
CompensableTransaction transaction = compensableManager.getCompensableTransactionQuietly();
boolean markedRollbackOnly = transaction == null ? false : transaction.isMarkedRollbackOnly();
if (markedRollbackOnly) {
throw new SystemException("Current global transaction has already been marked rollback only!");
}
CompensableInvocationRegistry registry = CompensableInvocationRegistry.getInstance();
CompensableInvocation invocation = registry.getCurrent();
//分布式事务对象不是null,已经开启分布式事务
if (transaction != null) {
compensableManager.begin();
} else if (invocation != null) {
//开启分布式事务
compensableManager.compensableBegin();
} else {
transactionManager.begin();
}
}
/**
* 开启TCC分布式事务
* @throws NotSupportedException
* @throws SystemException
*/
public void compensableBegin() throws NotSupportedException, SystemException {
if (this.getCompensableTransactionQuietly() != null) {
throw new NotSupportedException();
}
CompensableLogger compensableLogger = this.beanFactory.getCompensableLogger();
TransactionLock compensableLock = this.beanFactory.getCompensableLock();
TransactionRepository compensableRepository = this.beanFactory.getCompensableRepository();
RemoteCoordinator compensableCoordinator = (RemoteCoordinator) this.beanFactory.getCompensableNativeParticipant();
XidFactory transactionXidFactory = this.beanFactory.getTransactionXidFactory();
XidFactory compensableXidFactory = this.beanFactory.getCompensableXidFactory();
//分布式事务全局Xid
TransactionXid compensableXid = compensableXidFactory.createGlobalXid();
//根据全局Xid生成事务Xid
TransactionXid transactionXid = transactionXidFactory.createGlobalXid(compensableXid.getGlobalTransactionId());
//分布式事务上下文
TransactionContext compensableContext = new TransactionContext();
compensableContext.setCoordinator(true);
compensableContext.setCompensable(true);
compensableContext.setStatefully(this.statefully);
compensableContext.setXid(compensableXid);
compensableContext.setPropagatedBy(compensableCoordinator.getIdentifier());
//分布式事务对象
CompensableTransactionImpl compensable = new CompensableTransactionImpl(compensableContext);
compensable.setBeanFactory(this.beanFactory);
//将分布式事务分别放入线程-分布式事务map和全局Xid-分布式事务map
this.associateThread(compensable);
logger.info("{}| compensable transaction begin!", ByteUtils.byteArrayToString(compensableXid.getGlobalTransactionId()));
TransactionContext transactionContext = new TransactionContext();
transactionContext.setXid(transactionXid);
boolean failure = true;
try {
this.invokeBegin(transactionContext, true);
failure = false;
} finally {
if (failure) {
logger.info("{}| compensable transaction failed!",
ByteUtils.byteArrayToString(compensableXid.getGlobalTransactionId()));
this.desociateThread();
}
}
//全局Xid-分布式事务放入内存的map中
compensableRepository.putTransaction(compensableXid, compensable);
//日志处理
compensableLogger.createTransaction(compensable.getTransactionArchive());
boolean locked = compensableLock.lockTransaction(compensableXid, this.endpoint);
if (locked == false) {
this.invokeRollbackInBegin(transactionContext);
compensableLogger.deleteTransaction(compensable.getTransactionArchive());
this.desociateThread();
compensableRepository.removeTransaction(compensableXid);
logger.info("{}| compensable transaction failed!",
ByteUtils.byteArrayToString(compensableXid.getGlobalTransactionId()));
throw new SystemException(XAException.XAER_PROTO); // should never happen
}
}
}
ByteTCC与Fegin和Ribbon结合
CompensableFeignBeanPostProcessor#postProcessAfterInitialization()->CompensableFeignBeanPostProcessor#createProxiedObject()->CompensableFeignHandler#invoke()->CompensableLoadBalancerRuleImpl#choose()->CompensableLoadBalancerInterceptor#beforeCompletion()->CompensableRuleImpl#chooseServer()->CompensableLoadBalancerInterceptor#afterCompletion()->CompensableInterceptorImpl#beforeSendRequest()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* byteTcc实现的FeignClient的Feign代理类
*/
public class CompensableFeignHandler implements InvocationHandler {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(this, args);
}
final SpringCloudBeanRegistry beanRegistry = SpringCloudBeanRegistry.getInstance();
CompensableBeanFactory beanFactory = beanRegistry.getBeanFactory();
CompensableManager compensableManager = beanFactory.getCompensableManager();
//使用的是org.bytesoft.bytetcc.supports.rpc.CompensableInterceptorImpl
final TransactionInterceptor transactionInterceptor = beanFactory.getTransactionInterceptor();
final CompensableTransactionImpl compensable = //
(CompensableTransactionImpl) compensableManager.getCompensableTransactionQuietly();
if (compensable == null) {
return this.delegate.invoke(proxy, method, args);
}
final TransactionContext transactionContext = compensable.getTransactionContext();
if (transactionContext.isCompensable() == false) {
return this.delegate.invoke(proxy, method, args);
}
final TransactionRequestImpl request = new TransactionRequestImpl();
final TransactionResponseImpl response = new TransactionResponseImpl();
beanRegistry.setLoadBalancerInterceptor(new CompensableLoadBalancerInterceptor(this.statefully) {
//实现拦截器后处理方法逻辑用于CompensableLoadBalancerRuleImpl#choose()方法调用,chooseServer()后处理
public void afterCompletion(Server server) {
beanRegistry.removeLoadBalancerInterceptor();
if (server == null) {
logger.warn(
"There is no suitable server, the TransactionInterceptor.beforeSendRequest() operation is not executed!");
return;
} // end-if (server == null)
// TransactionRequestImpl request = new TransactionRequestImpl();
request.setTransactionContext(transactionContext);
String instanceId = this.getInstanceId(server);
RemoteCoordinator coordinator = beanRegistry.getConsumeCoordinator(instanceId);
request.setTargetTransactionCoordinator(coordinator);
transactionInterceptor.beforeSendRequest(request);
}
});
// TODO should be replaced by CompensableFeignResult.getTransactionContext()
response.setTransactionContext(transactionContext);
try {
return this.delegate.invoke(proxy, method, args);
} catch (Throwable error) {
Throwable cause = error.getCause();
CompensableFeignResult cfresult = null;
if (CompensableFeignResult.class.isInstance(error)) {
cfresult = (CompensableFeignResult) error;
} else if (CompensableFeignResult.class.isInstance(cause)) {
cfresult = (CompensableFeignResult) cause;
}
if (cfresult == null) {
throw error;
} // end-if (cfresult == null)
// response.setTransactionContext(cfresult.getTransactionContext());
response.setParticipantDelistFlag(cfresult.isParticipantValidFlag());
Object targetResult = cfresult.getResult();
if (cfresult.isError() == false) {
return targetResult;
} else if (RuntimeException.class.isInstance(targetResult)) {
throw (RuntimeException) targetResult;
} else {
throw new RuntimeException((Exception) targetResult);
}
} finally {
Object interceptedValue = response.getHeader(TransactionInterceptor.class.getName());
if (Boolean.valueOf(String.valueOf(interceptedValue)) == false) {
response.setParticipantEnlistFlag(request.isParticipantEnlistFlag());
RemoteCoordinator coordinator = request.getTargetTransactionCoordinator();
// TODO should be replaced by CompensableFeignResult.getRemoteParticipant()
response.setSourceTransactionCoordinator(coordinator);
transactionInterceptor.afterReceiveResponse(response);
} // end-if (response.isIntercepted() == false)
}
}
}
TCC事务提交
CompensableTransactionImpl#commit()->CompensableTransactionImpl#fireCommit()->CompensableTransactionImpl#fireNativeParticipantConfirm()->CompensableTransactionImpl#fireRemoteParticipantConfirm()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 可补偿的TCC分布式事务实现类
*/
public class CompensableTransactionImpl extends TransactionListenerAdapter
implements CompensableTransaction, CompensableRolledbackMarker {
/**
* 事务提交
* @throws RollbackException
* @throws HeuristicMixedException
* @throws HeuristicRollbackException
* @throws SecurityException
* @throws IllegalStateException
* @throws SystemException
*/
public synchronized void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException,
SecurityException, IllegalStateException, SystemException {
if (this.transactionStatus == Status.STATUS_ACTIVE) {
this.fireCommit();
} else if (this.transactionStatus == Status.STATUS_MARKED_ROLLBACK) {
this.fireRollback();
throw new HeuristicRollbackException();
} else if (this.transactionStatus == Status.STATUS_ROLLEDBACK) /* should never happen */ {
throw new RollbackException();
} else if (this.transactionStatus == Status.STATUS_COMMITTED) /* should never happen */ {
logger.debug("Current transaction has already been committed.");
} else {
throw new IllegalStateException();
}
}
}
TCC事务回滚
CompensableTransactionImpl#rollback()->CompensableTransactionImpl#fireRollback()->CompensableTransactionImpl#fireNativeParticipantCancel()->CompensableTransactionImpl#fireRemoteParticipantCancel()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* 可补偿的TCC分布式事务实现类
*/
public class CompensableTransactionImpl extends TransactionListenerAdapter
implements CompensableTransaction, CompensableRolledbackMarker {
private void fireRollback() throws IllegalStateException, SystemException {
CompensableLogger compensableLogger = this.beanFactory.getCompensableLogger();
this.transactionStatus = Status.STATUS_ROLLING_BACK;
this.markCurrentBranchTransactionRollbackIfNecessary();
this.transactionContext.setCompensating(true);
compensableLogger.updateTransactionStatus(this.getTransactionArchive()); // compensableLogger.updateTransaction(this.getTransactionArchive());
SystemException systemEx = null;
try {
//本地分支事务cancel
this.fireNativeParticipantCancel();
} catch (SystemException ex) {
systemEx = ex;
logger.info("{}| cancel native branchs failed!",
ByteUtils.byteArrayToString(this.transactionContext.getXid().getGlobalTransactionId()), ex);
} catch (RuntimeException ex) {
systemEx = new SystemException(XAException.XAER_RMERR);
systemEx.initCause(ex);
logger.info("{}| cancel native branchs failed!",
ByteUtils.byteArrayToString(this.transactionContext.getXid().getGlobalTransactionId()), ex);
}
try {
//远程分支事务取消
this.fireRemoteParticipantCancel();
} catch (SystemException ex) {
logger.info("{}| cancel remote branchs failed!",
ByteUtils.byteArrayToString(this.transactionContext.getXid().getGlobalTransactionId()), ex);
throw ex;
} catch (RuntimeException ex) {
logger.info("{}| cancel remote branchs failed!",
ByteUtils.byteArrayToString(this.transactionContext.getXid().getGlobalTransactionId()), ex);
SystemException sysEx = new SystemException(XAException.XAER_RMERR);
sysEx.initCause(ex);
throw sysEx;
}
if (systemEx != null) {
throw systemEx;
} else {
this.transactionStatus = Status.STATUS_ROLLEDBACK;
compensableLogger.updateTransactionStatus(this.getTransactionArchive()); // ccompensableLogger.updateTransaction(this.getTransactionArchive());
logger.info("{}| compensable transaction rolled back!",
ByteUtils.byteArrayToString(transactionContext.getXid().getGlobalTransactionId()));
}
}
}