事务-07丨ByteTcc原理

Posted by jiefang on January 31, 2021

ByteTcc原理

开启TCC事务

服务A->CompensableHandlerInterceptor->CompensableMethodInterceptor->TransactionInterceptorImpl->TransactionInterceptor->...->TransactionManagerImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
 * 事务管理器实现类
 */
public class TransactionManagerImpl implements TransactionManager, CompensableBeanFactoryAware {
    /**
     * 开启事务
     * @throws NotSupportedException
     * @throws SystemException
     */
    public void begin() throws NotSupportedException, SystemException {
       TransactionManager transactionManager = this.beanFactory.getTransactionManager();
       CompensableManager compensableManager = this.beanFactory.getCompensableManager();
       //根据当前线程从map中获取分布式事务
       CompensableTransaction transaction = compensableManager.getCompensableTransactionQuietly();
       boolean markedRollbackOnly = transaction == null ? false : transaction.isMarkedRollbackOnly();

       if (markedRollbackOnly) {
          throw new SystemException("Current global transaction has already been marked rollback only!");
       }

       CompensableInvocationRegistry registry = CompensableInvocationRegistry.getInstance();
       CompensableInvocation invocation = registry.getCurrent();
       //分布式事务对象不是null,已经开启分布式事务
       if (transaction != null) {
          compensableManager.begin();
       } else if (invocation != null) {
          //开启分布式事务
          compensableManager.compensableBegin();
       } else {
          transactionManager.begin();
       }
    }
	/**
	 * 开启TCC分布式事务
	 * @throws NotSupportedException
	 * @throws SystemException
	 */
	public void compensableBegin() throws NotSupportedException, SystemException {
		if (this.getCompensableTransactionQuietly() != null) {
			throw new NotSupportedException();
		}

		CompensableLogger compensableLogger = this.beanFactory.getCompensableLogger();
		TransactionLock compensableLock = this.beanFactory.getCompensableLock();
		TransactionRepository compensableRepository = this.beanFactory.getCompensableRepository();
		RemoteCoordinator compensableCoordinator = (RemoteCoordinator) this.beanFactory.getCompensableNativeParticipant();

		XidFactory transactionXidFactory = this.beanFactory.getTransactionXidFactory();
		XidFactory compensableXidFactory = this.beanFactory.getCompensableXidFactory();
		//分布式事务全局Xid
		TransactionXid compensableXid = compensableXidFactory.createGlobalXid();
		//根据全局Xid生成事务Xid
		TransactionXid transactionXid = transactionXidFactory.createGlobalXid(compensableXid.getGlobalTransactionId());
		//分布式事务上下文
		TransactionContext compensableContext = new TransactionContext();
		compensableContext.setCoordinator(true);
		compensableContext.setCompensable(true);
		compensableContext.setStatefully(this.statefully);
		compensableContext.setXid(compensableXid);
		compensableContext.setPropagatedBy(compensableCoordinator.getIdentifier());
		//分布式事务对象
		CompensableTransactionImpl compensable = new CompensableTransactionImpl(compensableContext);
		compensable.setBeanFactory(this.beanFactory);
		//将分布式事务分别放入线程-分布式事务map和全局Xid-分布式事务map
		this.associateThread(compensable);
		logger.info("{}| compensable transaction begin!", ByteUtils.byteArrayToString(compensableXid.getGlobalTransactionId()));

		TransactionContext transactionContext = new TransactionContext();
		transactionContext.setXid(transactionXid);

		boolean failure = true;
		try {
			this.invokeBegin(transactionContext, true);
			failure = false;
		} finally {
			if (failure) {
				logger.info("{}| compensable transaction failed!",
						ByteUtils.byteArrayToString(compensableXid.getGlobalTransactionId()));
				this.desociateThread();
			}
		}
		//全局Xid-分布式事务放入内存的map中
		compensableRepository.putTransaction(compensableXid, compensable);
		//日志处理
		compensableLogger.createTransaction(compensable.getTransactionArchive());
		boolean locked = compensableLock.lockTransaction(compensableXid, this.endpoint);
		if (locked == false) {
			this.invokeRollbackInBegin(transactionContext);
			compensableLogger.deleteTransaction(compensable.getTransactionArchive());
			this.desociateThread();
			compensableRepository.removeTransaction(compensableXid);
			logger.info("{}| compensable transaction failed!",
					ByteUtils.byteArrayToString(compensableXid.getGlobalTransactionId()));

			throw new SystemException(XAException.XAER_PROTO); // should never happen
		}
	}
}

ByteTCC与Fegin和Ribbon结合

CompensableFeignBeanPostProcessor#postProcessAfterInitialization()->CompensableFeignBeanPostProcessor#createProxiedObject()->CompensableFeignHandler#invoke()->CompensableLoadBalancerRuleImpl#choose()->CompensableLoadBalancerInterceptor#beforeCompletion()->CompensableRuleImpl#chooseServer()->CompensableLoadBalancerInterceptor#afterCompletion()->CompensableInterceptorImpl#beforeSendRequest()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
 * byteTcc实现的FeignClient的Feign代理类
 */
public class CompensableFeignHandler implements InvocationHandler {
    	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
		if (Object.class.equals(method.getDeclaringClass())) {
			return method.invoke(this, args);
		}

		final SpringCloudBeanRegistry beanRegistry = SpringCloudBeanRegistry.getInstance();
		CompensableBeanFactory beanFactory = beanRegistry.getBeanFactory();
		CompensableManager compensableManager = beanFactory.getCompensableManager();
		//使用的是org.bytesoft.bytetcc.supports.rpc.CompensableInterceptorImpl
		final TransactionInterceptor transactionInterceptor = beanFactory.getTransactionInterceptor();

		final CompensableTransactionImpl compensable = //
				(CompensableTransactionImpl) compensableManager.getCompensableTransactionQuietly();
		if (compensable == null) {
			return this.delegate.invoke(proxy, method, args);
		}

		final TransactionContext transactionContext = compensable.getTransactionContext();
		if (transactionContext.isCompensable() == false) {
			return this.delegate.invoke(proxy, method, args);
		}

		final TransactionRequestImpl request = new TransactionRequestImpl();
		final TransactionResponseImpl response = new TransactionResponseImpl();

		beanRegistry.setLoadBalancerInterceptor(new CompensableLoadBalancerInterceptor(this.statefully) {
			//实现拦截器后处理方法逻辑用于CompensableLoadBalancerRuleImpl#choose()方法调用,chooseServer()后处理
			public void afterCompletion(Server server) {
				beanRegistry.removeLoadBalancerInterceptor();

				if (server == null) {
					logger.warn(
							"There is no suitable server, the TransactionInterceptor.beforeSendRequest() operation is not executed!");
					return;
				} // end-if (server == null)

				// TransactionRequestImpl request = new TransactionRequestImpl();
				request.setTransactionContext(transactionContext);

				String instanceId = this.getInstanceId(server);

				RemoteCoordinator coordinator = beanRegistry.getConsumeCoordinator(instanceId);
				request.setTargetTransactionCoordinator(coordinator);

				transactionInterceptor.beforeSendRequest(request);
			}
		});

		// TODO should be replaced by CompensableFeignResult.getTransactionContext()
		response.setTransactionContext(transactionContext);

		try {
			return this.delegate.invoke(proxy, method, args);
		} catch (Throwable error) {
			Throwable cause = error.getCause();

			CompensableFeignResult cfresult = null;
			if (CompensableFeignResult.class.isInstance(error)) {
				cfresult = (CompensableFeignResult) error;
			} else if (CompensableFeignResult.class.isInstance(cause)) {
				cfresult = (CompensableFeignResult) cause;
			}

			if (cfresult == null) {
				throw error;
			} // end-if (cfresult == null)

			// response.setTransactionContext(cfresult.getTransactionContext());
			response.setParticipantDelistFlag(cfresult.isParticipantValidFlag());

			Object targetResult = cfresult.getResult();
			if (cfresult.isError() == false) {
				return targetResult;
			} else if (RuntimeException.class.isInstance(targetResult)) {
				throw (RuntimeException) targetResult;
			} else {
				throw new RuntimeException((Exception) targetResult);
			}
		} finally {
			Object interceptedValue = response.getHeader(TransactionInterceptor.class.getName());
			if (Boolean.valueOf(String.valueOf(interceptedValue)) == false) {
				response.setParticipantEnlistFlag(request.isParticipantEnlistFlag());

				RemoteCoordinator coordinator = request.getTargetTransactionCoordinator();
				// TODO should be replaced by CompensableFeignResult.getRemoteParticipant()
				response.setSourceTransactionCoordinator(coordinator);

				transactionInterceptor.afterReceiveResponse(response);
			} // end-if (response.isIntercepted() == false)
		}
	}
}

TCC事务提交

CompensableTransactionImpl#commit()->CompensableTransactionImpl#fireCommit()->CompensableTransactionImpl#fireNativeParticipantConfirm()->CompensableTransactionImpl#fireRemoteParticipantConfirm()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
 * 可补偿的TCC分布式事务实现类
 */
public class CompensableTransactionImpl extends TransactionListenerAdapter
		implements CompensableTransaction, CompensableRolledbackMarker {
    /**
     * 事务提交
     * @throws RollbackException
     * @throws HeuristicMixedException
     * @throws HeuristicRollbackException
     * @throws SecurityException
     * @throws IllegalStateException
     * @throws SystemException
     */
    public synchronized void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException,
    SecurityException, IllegalStateException, SystemException {
        if (this.transactionStatus == Status.STATUS_ACTIVE) {
            this.fireCommit();
        } else if (this.transactionStatus == Status.STATUS_MARKED_ROLLBACK) {
            this.fireRollback();
            throw new HeuristicRollbackException();
        } else if (this.transactionStatus == Status.STATUS_ROLLEDBACK) /* should never happen */ {
            throw new RollbackException();
        } else if (this.transactionStatus == Status.STATUS_COMMITTED) /* should never happen */ {
            logger.debug("Current transaction has already been committed.");
        } else {
            throw new IllegalStateException();
        }
    }
}

TCC事务回滚

CompensableTransactionImpl#rollback()->CompensableTransactionImpl#fireRollback()->CompensableTransactionImpl#fireNativeParticipantCancel()->CompensableTransactionImpl#fireRemoteParticipantCancel()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
 * 可补偿的TCC分布式事务实现类
 */
public class CompensableTransactionImpl extends TransactionListenerAdapter
		implements CompensableTransaction, CompensableRolledbackMarker {
    private void fireRollback() throws IllegalStateException, SystemException {
       CompensableLogger compensableLogger = this.beanFactory.getCompensableLogger();

       this.transactionStatus = Status.STATUS_ROLLING_BACK;

       this.markCurrentBranchTransactionRollbackIfNecessary();

       this.transactionContext.setCompensating(true);
       compensableLogger.updateTransactionStatus(this.getTransactionArchive()); // compensableLogger.updateTransaction(this.getTransactionArchive());

       SystemException systemEx = null;
       try {
          //本地分支事务cancel
          this.fireNativeParticipantCancel();
       } catch (SystemException ex) {
          systemEx = ex;

          logger.info("{}| cancel native branchs failed!",
                ByteUtils.byteArrayToString(this.transactionContext.getXid().getGlobalTransactionId()), ex);
       } catch (RuntimeException ex) {
          systemEx = new SystemException(XAException.XAER_RMERR);
          systemEx.initCause(ex);

          logger.info("{}| cancel native branchs failed!",
                ByteUtils.byteArrayToString(this.transactionContext.getXid().getGlobalTransactionId()), ex);
       }

       try {
          //远程分支事务取消 
          this.fireRemoteParticipantCancel();
       } catch (SystemException ex) {
          logger.info("{}| cancel remote branchs failed!",
                ByteUtils.byteArrayToString(this.transactionContext.getXid().getGlobalTransactionId()), ex);
          throw ex;
       } catch (RuntimeException ex) {
          logger.info("{}| cancel remote branchs failed!",
                ByteUtils.byteArrayToString(this.transactionContext.getXid().getGlobalTransactionId()), ex);
          SystemException sysEx = new SystemException(XAException.XAER_RMERR);
          sysEx.initCause(ex);
          throw sysEx;
       }
       if (systemEx != null) {
          throw systemEx;
       } else {
          this.transactionStatus = Status.STATUS_ROLLEDBACK;
          compensableLogger.updateTransactionStatus(this.getTransactionArchive()); // ccompensableLogger.updateTransaction(this.getTransactionArchive());
          logger.info("{}| compensable transaction rolled back!",
                ByteUtils.byteArrayToString(transactionContext.getXid().getGlobalTransactionId()));
       }
    }
}