事务-04丨JTA + Atomikos分布式事务

Posted by jiefang on January 16, 2021

JTA + Atomikos分布式事务

Atomikos使用

引入依赖

1
2
3
4
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>

修改配置文件

1
2
3
4
5
6
activity:
  datasource:
type: com.alibaba.druid.pool.xa.DruidXADataSource
jta:
  log-dir: classpath:tx-logs
  transaction-manager-id: txManager

重构Druid数据源配置

1
2
3
4
5
DruidXADataSource datasource = new DruidXADataSource();
//设置datasource属性
...
AtomikosDataSourceBean atomikosDataSource = new AtomikosDataSourceBean();
atomikosDataSource.setXaDataSource(datasource);

配置JAT事务管理器

1
2
3
4
5
6
7
@Bean(name = "xatx")
@Primary
public JtaTransactionManager activityTransactionManager() {
    UserTransactionManager userTransactionManager = new UserTransactionManager();
    UserTransaction userTransaction = new UserTransactionImp();
    return new JtaTransactionManager(userTransaction, userTransactionManager);
}

@Transactional设置事务管理器

1
@Transactional(transactionManager = "xatx", rollbackFor = Exception.class)

原理

Atomikos数据源

创建数据库连接代理对象AtomikosConnectionProxy

AbstractDataSourceBean#getConnection()->ConnectionPool#borrowConnection()->ConnectionPool#findExistingOpenConnectionForCallingThread()->ConnectionPool#recycleConnectionIfPossible()->AbstractXPooledConnection#createConnectionProxy()->AtomikosXAPooledConnection#doCreateConnectionProxy()->AtomikosConnectionProxy#newInstance

MySQL XA事务基本语法

XA {START|BEGIN} xid [JOIN|RESUME] 启动xid事务 (xid 必须是一个唯一值; 不支持[JOIN|RESUME]子句) XA END xid [SUSPEND [FOR MIGRATE]] 结束xid事务 ( 不支持[SUSPEND [FOR MIGRATE]] 子句) XA PREPARE xid 准备、预提交xid事务 XA COMMIT xid [ONE PHASE] 提交xid事务 XA ROLLBACK xid 回滚xid事务 XA RECOVER 查看处于PREPARE 阶段的所有事务

开启事务

JtaTransactionManager#doBegin()-> JtaTransactionManager#doJtaBegin()->UserTransactionImp#begin()-> UserTransactionManager#begin()->TransactionManagerImp#begin()

TransactionManagerImp#begin()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void begin () throws NotSupportedException, SystemException{
    begin ( getTransactionTimeout() );
}
public void begin ( int timeout ) throws NotSupportedException,SystemException{
    CompositeTransaction ct = null;
    ResumePreviousTransactionSubTxAwareParticipant resumeParticipant = null;

    ct = compositeTransactionManager.getCompositeTransaction();
    if ( ct != null && ct.getProperty (  JTA_PROPERTY_NAME ) == null ) {
        LOGGER.logWarning ( "JTA: temporarily suspending incompatible transaction: " + ct.getTid() +
                           " (will be resumed after JTA transaction ends)" );
        ct = compositeTransactionManager.suspend();
        resumeParticipant = new ResumePreviousTransactionSubTxAwareParticipant ( ct );
    }

    try {
        ct = compositeTransactionManager.createCompositeTransaction ( ( ( long ) timeout ) * 1000 );
        if ( resumeParticipant != null ) ct.addSubTxAwareParticipant ( resumeParticipant );
        if ( ct.isRoot () && getDefaultSerial () )
            ct.setSerial ();
        ct.setProperty ( JTA_PROPERTY_NAME , "true" );
    } catch ( SysException se ) {
        String msg = "Error in begin()";
        LOGGER.logError( msg , se );
        throw new ExtendedSystemException ( msg , se );
    }
    recreateCompositeTransactionAsJtaTransaction(ct);
}

CompositeTransactionManagerImp#getCompositeTransaction()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public CompositeTransaction getCompositeTransaction () throws SysException{
    CompositeTransaction ct = null;
    ct = getCurrentTx ();
    if ( ct != null ) {
       if(LOGGER.isTraceEnabled()){
           LOGGER.logTrace("getCompositeTransaction()  returning instance with id "
                    + ct.getTid ());
       }
    } else{
       if(LOGGER.isTraceEnabled()){
          LOGGER.logTrace("getCompositeTransaction() returning NULL!");
       }
    }
    return ct;
}
CompositeTransactionManagerImp#getCurrentTx()
1
2
3
4
5
6
7
8
9
10
11
private CompositeTransaction getCurrentTx (){
    Thread thread = Thread.currentThread ();
    synchronized ( threadtotxmap_ ) {
        //获取当前线程对应的分布式事务集合
        Stack<CompositeTransaction> txs = threadtotxmap_.get ( thread );
        if ( txs == null )
            return null;
        else
            return txs.peek ();
    }
}

CompositeTransactionManagerImp#createCompositeTransaction()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public CompositeTransaction createCompositeTransaction ( long timeout ) throws SysException{
    CompositeTransaction ct = null , ret = null;
    
    ct = getCurrentTx ();
    if ( ct == null ) {
        //创建分布式事务
        ret = getTransactionService().createCompositeTransaction ( timeout );
        if(LOGGER.isDebugEnabled()){
           LOGGER.logDebug("createCompositeTransaction ( " + timeout + " ): "
                + "created new ROOT transaction with id " + ret.getTid ());
        }
    } else {
        if(LOGGER.isDebugEnabled()) LOGGER.logDebug("createCompositeTransaction ( " + timeout + " )");
        ret = ct.createSubTransaction ();
    }
    Thread thread = Thread.currentThread ();
    setThreadMappings ( ret, thread );

    return ret;
}
CompositeTransactionManagerImp#setThreadMappings()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void setThreadMappings ( CompositeTransaction ct , Thread thread )
        throws IllegalStateException, SysException{
    //case 21806: callbacks to ct to be made outside synchronized block
   ct.addSubTxAwareParticipant ( this ); //step 1

    synchronized ( threadtotxmap_ ) {
       //between step 1 and here, intermediate timeout/rollback of the ct
       //may have happened; make sure to check or we add a thread mapping
       //that will never be removed!
       if ( TxState.ACTIVE.equals ( ct.getState() )) {
          Stack<CompositeTransaction> txs = threadtotxmap_.get ( thread );
          if ( txs == null )
             txs = new Stack<CompositeTransaction>();
          txs.push ( ct );
          threadtotxmap_.put ( thread, txs );
          txtothreadmap_.put ( ct, thread );
       }
    }
}

XA START指令

分支事务发送XA START命令,调用prepareStatement,分支事务加入全局事务。

AtomikosConnectionProxy#invoke()->AtomikosConnectionProxy#enlist()->SessionHandleState#notifyBeforeUse()-> TransactionContext#checkEnlistBeforeUse()->NotInBranchStateHandler#checkEnlistBeforeUse()->new BranchEnlistedStateHandler()->XAResourceTransaction#resume()->XAResource#start()->XA START 指令

拦截"createStatement", "prepareStatement", "prepareCall",分支事务加入分布式事务-全局事务。 AtomikosConnectionProxy#invoke()->AtomikosConnectionProxy#enlist()->CompositeTransaction#registerSynchronization()->TransactionStateHandler#registerSynchronization()->CoordinatorImp#registerSynchronization()->CoordinatorImp#rememberSychronizationForAfterCompletion()

XA END指令

分支事务发送XA END指令。 AbstractPlatformTransactionManager#processCommit->AbstractPlatformTransactionManager#triggerBeforeCompletion(status) JtaTransactionManager#doCommit()->TransactionSynchronizationUtils#triggerBeforeCompletion()->SqlSessionUtils.SqlSessionSynchronization#beforeCompletion() 拦截close()方法 AtomikosConnectionProxy#invoke()->AtomikosConnectionProxy#close()->SessionHandleState#notifySessionClosed()-> TransactionContext#sessionClosed()->BranchEnlistedStateHandler#sessionClosed()->new BranchEndedStateHandler()->XAResourceTransaction#suspend()->XAResource#end()->XA END 指令

准备

AbstractPlatformTransactionManager#processCommit->JtaTransactionManager#doCommit()->UserTransactionImp#commit() ->UserTransactionManager#commit()->TransactionManagerImp#commit()->TransactionImp#commit()->CompositeTransactionImp#commit() ->CoordinatorImp#terminate()->CoordinatorImp#prepare()->XAResourceTransaction#prepare()->XAResource#prepare()

CoordinatorImp#terminate()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected void terminate ( boolean commit ) throws HeurRollbackException,
        HeurMixedException, SysException, java.lang.SecurityException,
        HeurCommitException, HeurHazardException, RollbackException,
        IllegalStateException{
   synchronized ( fsm_ ) {
      if ( commit ) {
         if ( participants_.size () <= 1 ) {
            commit ( true );
         } else {
            int prepareResult = prepare ();
            // make sure to only do commit if NOT read only
            if ( prepareResult != Participant.READ_ONLY )
               commit ( false );
         }
      } else {
         rollback ();
      }
   }
}

CoordinatorImp#prepare()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public int prepare () throws RollbackException,
        java.lang.IllegalStateException, HeurHazardException,
        HeurMixedException, SysException{
    // FIRST, TAKE CARE OF DUPLICATE PREPARES
    // Recursive prepare-calls should be avoided for not deadlocking rollback/commit methods
    // If a recursive prepare re-enters, then it will see a voting state -> reject.
    // Note that this may also avoid some legal prepares, but only rarely
    if ( getState ().equals ( TxState.PREPARING ) )
        throw new RollbackException ( "Recursion detected" );

    int ret = Participant.READ_ONLY + 1;
    synchronized ( fsm_ ) {
       ret = stateHandler_.prepare ();
       if ( ret == Participant.READ_ONLY ) {
           if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace (  "prepare() of Coordinator  " + getCoordinatorId ()
                + " returning READONLY" );
       } else {
           if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( "prepare() of Coordinator  " + getCoordinatorId ()
                + " returning YES vote");
       }
   }
    return ret;
}

XAResourceTransaction#prepare()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public synchronized int prepare() throws RollbackException,
      HeurHazardException, HeurMixedException, SysException {
   int ret = 0;
   terminateInResource();

   if (TxState.ACTIVE == this.state) {
      // tolerate non-delisting apps/servers
      suspend();
   }

   // duplicate prepares can happen for siblings in serial subtxs!!!
   // in that case, the second prepare just returns READONLY
   if (this.state == TxState.IN_DOUBT)
      return Participant.READ_ONLY;
   else if (!(this.state == TxState.LOCALLY_DONE))
      throw new SysException("Wrong state for prepare: " + this.state);
   try {
      // refresh xaresource for MQSeries: seems to close XAResource after
      // suspend???
      testOrRefreshXAResourceFor2PC();
      if (LOGGER.isTraceEnabled()) {
         LOGGER.logTrace("About to call prepare on XAResource instance: "
               + this.xaresource);
      }
      ret = this.xaresource.prepare(this.xid);
   } catch (XAException xaerr) {
      ...
   }
   setState(TxState.IN_DOUBT);
   ...
}

事务提交

CoordinatorImp#commit()->XAResourceTransaction#commit()->Xaresource.commit()

事务回滚

CoordinatorImp#rollback()->XAResourceTransaction#rollback()->Xaresource.rollback()