好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

JPA多数据源分布式事务处理方案

前言

多数据源的事务处理是个老生常谈的话题,跨两个数据源的事务管理也算是分布式事务的范畴,在同一个JVM里处理多数据源的事务,比较经典的处理方案是JTA(基于XA协议建模的java标准事务抽象)+XA(XA事务协议),常见的JTA实现框架有Atomikos、Bitronix、Narayana,Spring对这些框架都有组件封装,基本可以做到开箱即用程度。本文除了分享XA事务方案外,提供了一种新的多数据源事务解决思路和视角。

问题背景

在解决mysql字段脱敏处理时,结合sharding-jdbc的脱敏组件功能,为了sql兼容和最小化应用改造,博主给出了一个多数据源融合的字段脱敏解决方案(只把包含脱敏字段表的操作走sharding-jdbc脱敏代理数据源)。这个方案解决了问题的同时,带来了一个新的问题,数据源的事务是独立的,正如我文中所述《JPA项目多数据源模式整合sharding-jdbc实现数据脱敏》,在spring上下文中,每个数据源对应一个独立的事务管理器,默认的事务管理器的数据源就用业务本身的数据源,所以需要加密的业务使用时,需要指定@Transactional注解里的事务管理器名称为脱敏对应的事务管理器名称。简单的业务场景这样用也就没有问题了,但是一般的业务场景总有一个事务覆盖两个数据源的操作,这个时候单指定哪个事务管理器都不行,so,这里需要一种多数据源的事务管理器。

XA事务方案

XA协议采用2PC(两阶段提交)的方式来管理分布式事务。XA接口提供资源管理器与事务管理器之间进行通信的标准接口。在JDBC的XA事务相关api抽象里,相关接口定义如下

XADataSource,XA协议数据源

?

1

2

3

4

5

6

7

public interface XADataSource extends CommonDataSource {

   /**

    * 尝试建立物理数据库连接,使用给定的用户名和密码。返回的连接可以在分布式事务中使用

    */

   XAConnection getXAConnection() throws SQLException;

    //省略getLogWriter等非关键方法

  }

XAConnection

?

1

2

3

4

5

6

public interface XAConnection extends PooledConnection {

     /**

      * 检索一个{@code XAResource}对象,事务管理器将使用该对象管理该{@code XAConnection}对象在分布式事务中的事务行为

      */

     javax.transaction.xa.XAResource getXAResource() throws SQLException;

}

XAResource

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

public interface XAResource {

     /**

      * 提交xid指定的全局事务

      */

     void commit(Xid xid, boolean onePhase) throws XAException;

     /**

      * 结束代表事务分支执行的工作。资源管理器从指定的事务分支中分离XA资源,并让事务完成。

      */

     void end(Xid xid, int flags) throws XAException;

     /**

      * 通知事务管理器忽略此xid事务分支

      */

     void forget(Xid xid) throws XAException;

     /**

      * 判断是否同一个资源管理器

      */

     boolean isSameRM(XAResource xares) throws XAException;

     /**

      * 指定xid事务准备阶段

      */

     int prepare(Xid xid) throws XAException;

     /**

      * 从资源管理器获取准备好的事务分支的列表。事务管理器在恢复期间调用此方法,

      * 以获取当前处于准备状态或初步完成状态的事务分支的列表。

      */

     Xid[] recover( int flag) throws XAException;

     /**

      * 通知资源管理器回滚代表事务分支完成的工作。

      */

     void rollback(Xid xid) throws XAException;

     /**

      * 代表xid中指定的事务分支开始工作。

      */

     void start(Xid xid, int flags) throws XAException;

     //省略非关键方法

}

相比较普通的事务管理,JDBC的XA协议管理多了一个XAResource资源管理器,XA事务相关的行为(开启、准备、提交、回滚、结束)都由这个资源管理器来控制,这些都是框架内部的行为,体现在开发层面提供的数据源也变成了XADataSource。而JTA的抽象里,定义了UserTransaction、TransactionManager。想要使用JTA事务,必须先实现这两个接口。所以,如果我们要使用JTA+XA控制多数据源的事务,在sprign boot里以Atomikos为例,

引入Atomikos依赖

?

1

2

3

4

< dependency >

             < groupId >org.springframework.boot</ groupId >

             < artifactId >spring-boot-starter-jta-atomikos</ artifactId >

</ dependency >

spring boot已经帮我们把XA事务管理器自动装载类定义好了,如:

创建JTA事务管理器

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

@Configuration (proxyBeanMethods = false )

@EnableConfigurationProperties ({ AtomikosProperties. class , JtaProperties. class })

@ConditionalOnClass ({ JtaTransactionManager. class , UserTransactionManager. class })

@ConditionalOnMissingBean (PlatformTransactionManager. class )

class AtomikosJtaConfiguration {

     @Bean (initMethod = "init" , destroyMethod = "shutdownWait" )

     @ConditionalOnMissingBean (UserTransactionService. class )

     UserTransactionServiceImp userTransactionService(AtomikosProperties atomikosProperties,

             JtaProperties jtaProperties) {

         Properties properties = new Properties();

         if (StringUtils.hasText(jtaProperties.getTransactionManagerId())) {

             properties.setProperty( "com.atomikos.icatch.tm_unique_name" , jtaProperties.getTransactionManagerId());

         }

         properties.setProperty( "com.atomikos.icatch.log_base_dir" , getLogBaseDir(jtaProperties));

         properties.putAll(atomikosProperties.asProperties());

         return new UserTransactionServiceImp(properties);

     }

     @Bean (initMethod = "init" , destroyMethod = "close" )

     @ConditionalOnMissingBean (TransactionManager. class )

     UserTransactionManager atomikosTransactionManager(UserTransactionService userTransactionService) throws Exception {

         UserTransactionManager manager = new UserTransactionManager();

         manager.setStartupTransactionService( false );

         manager.setForceShutdown( true );

         return manager;

     }

     @Bean

     @ConditionalOnMissingBean (XADataSourceWrapper. class )

     AtomikosXADataSourceWrapper xaDataSourceWrapper() {

         return new AtomikosXADataSourceWrapper();

     }

     @Bean

     JtaTransactionManager transactionManager(UserTransaction userTransaction, TransactionManager transactionManager,

             ObjectProvidertransactionManagerCustomizers) {

         JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(userTransaction, transactionManager);

         transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(jtaTransactionManager));

         return jtaTransactionManager;

     }

}

显然,想要使用XA事务,除了需要提供UserTransaction、TransactionManager的实现。还必须要有一个XADataSource,而sharding-jdbc代理的数据源是DataSource的,我们需要将XADataSource包装成普通的DataSource,spring已经提供了一个AtomikosXADataSourceWrapper的XA数据源包装器,而且在AtomikosJtaConfiguration里已经注册到Spring上下文中,所以我们在自定义数据源时可以直接注入包装器实例,然后,因为是JPA环境,所以在创建EntityManagerFactory实例时,需要指定JPA的事务管理类型为JTA,综上,普通的业务默认数据源配置如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

/**

  * @author: kl @kailing.pub

  * @date: 2020/5/18

  */

@Configuration

@EnableConfigurationProperties ({JpaProperties. class , DataSourceProperties. class })

public class DataSourceConfiguration{

     @Primary

     @Bean

     public DataSource dataSource(AtomikosXADataSourceWrapper wrapper, DataSourceProperties dataSourceProperties) throws Exception {

         MysqlXADataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(MysqlXADataSource. class ).build();

         return wrapper.wrapDataSource(dataSource);

     }

     @Primary

     @Bean (initMethod = "afterPropertiesSet" )

     public LocalContainerEntityManagerFactoryBean entityManagerFactory(JpaProperties jpaProperties, DataSource dataSource, EntityManagerFactoryBuilder factoryBuilder) {

         return factoryBuilder.dataSource(dataSource)

                 .packages(Constants.BASE_PACKAGES)

                 .properties(jpaProperties.getProperties())

                 .persistenceUnit( "default" )

                 .jta( true )

                 .build();

     }

     @Bean

     @Primary

     public EntityManager entityManager(EntityManagerFactory entityManagerFactory){

         //必须使用SharedEntityManagerCreator创建SharedEntityManager实例,否则SimpleJpaRepository中的事务不生效

         return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);

     }

}

sharding-jdbc加密数据源和普通业务数据源其实是同一个数据源,只是走加解密逻辑的数据源需要被sharding-jdbc的加密组件代理一层,加上了加解密的处理逻辑。所以配置如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

/**

  * @author: kl @kailing.pub

  * @date: 2020/5/18

  */

@Configuration

@EnableConfigurationProperties ({JpaProperties. class ,SpringBootEncryptRuleConfigurationProperties. class , SpringBootPropertiesConfigurationProperties. class })

public class EncryptDataSourceConfiguration {

     @Bean

     public DataSource encryptDataSource(DataSource dataSource,SpringBootPropertiesConfigurationProperties props,SpringBootEncryptRuleConfigurationProperties encryptRule) throws SQLException {

         return EncryptDataSourceFactory.createDataSource(dataSource, new EncryptRuleConfigurationYamlSwapper().swap(encryptRule), props.getProps());

     }

     @Bean (initMethod = "afterPropertiesSet" )

     public LocalContainerEntityManagerFactoryBean encryptEntityManagerFactory( @Qualifier ( "encryptDataSource" ) DataSource dataSource,JpaProperties jpaProperties, EntityManagerFactoryBuilder factoryBuilder) throws SQLException {

         return factoryBuilder.dataSource(dataSource)

                 .packages(Constants.BASE_PACKAGES)

                 .properties(jpaProperties.getProperties())

                 .persistenceUnit( "encryptPersistenceUnit" )

                 .jta( true )

                 .build();

     }

     @Bean

     public EntityManager encryptEntityManager( @Qualifier ( "encryptEntityManagerFactory" ) EntityManagerFactory entityManagerFactory){

         //必须使用SharedEntityManagerCreator创建SharedEntityManager实例,否则SimpleJpaRepository中的事务不生效

         return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);

     }

}

遇到问题1、

Connection pool exhausted - try increasing 'maxPoolSize' and/or 'borrowConnectionTimeout' on the DataSourceBean.

解决问题:默认AtomikosXADataSourceWrapper包装器初始化的数据源连接池最大为1,所以需要添加配置参数如:

spring.jta.atomikos.datasource.max-pool-size=20

遇到问题2、

XAER_INVAL: Invalid arguments (or unsupported command)

解决问题:这个是mysql实现XA的bug,仅当您在同一事务中多次访问同一MySQL数据库时,才会发生此问题,在mysql连接url加上如下参数即可,如:

spring.datasource.url = jdbc:mysql://127.0.0.1:3306/xxx?pinGlobalTxToPhysicalConnection=true

Mysql XA事务行为

在这个场景中,虽然是多数据源,但是底层链接的是同一个mysql数据库,所以XA事务行为为,从第一个执行的sql开始(并不是JTA事务begin阶段),生成xid并XA START事务,然后XA END。第二个数据源的sql执行时会判断是否同一个mysql资源,如果是同一个则用刚生成的xid重新XA START RESUME,然后XA END,最终虽然在应用层是两个DataSource,其实最后只会调用XA COMMIT一次。mysql驱动实现的XAResource的start如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

public void start(Xid xid, int flags) throws XAException {

         StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH);

         commandBuf.append( "XA START " );

         appendXid(commandBuf, xid);

         switch (flags) {

             case TMJOIN:

                 commandBuf.append( " JOIN" );

                 break ;

             case TMRESUME:

                 commandBuf.append( " RESUME" );

                 break ;

             case TMNOFLAGS:

                 // no-op

                 break ;

             default :

                 throw new XAException(XAException.XAER_INVAL);

         }

         dispatchCommand(commandBuf.toString());

         this .underlyingConnection.setInGlobalTx( true );

     }

第一次sql执行时,flags=0,走的TMNOFLAGS逻辑,第二次sql执行时,flags=134217728,走的TMRESUME,重新开启事务的逻辑。以上是Mysql XA的真实事务逻辑,但是博主研究下来发现,msyql xa并不支持XA START RESUME这种语句,而且有很多限制《Mysql XA交易限制》,所以在mysql数据库使用XA事务时,最好了解下mysql xa的缺陷

链式事务方案

链式事务不是我首创的叫法,在spring-data-common项目的Transaction包下,已经有一个默认实现ChainedTransactionManager,前文中《深入理解spring的@Transactional工作原理》已经分析了Spring的事务抽象,由PlatformTransactionManager(事务管理器)、TransactionStatus(事务状态)、TransactionDefinition(事务定义)等形态组成,ChainedTransactionManager也是实现了PlatformTransactionManager和TransactionStatus。实现原理也很简单,在ChainedTransactionManager内部维护了事务管理器的集合,通过代理编排真实的事务管理器,在事务开启、提交、回滚时,都分别操作集合里的事务。以达到对多个事务的统一管理。这个方案比较简陋,而且有缺陷,在提交阶段,如果异常不是发生在第一个数据源,那么会存在之前的提交不会回滚,所以在使用ChainedTransactionManager时,尽量把出问题可能性比较大的事务管理器放链的后面(开启事务、提交事务顺序相反)。这里只是抛出了一种新的多数据源事务管理的思路,能用XA尽量用XA管理。

普通的业务默认数据源配置如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

/**

  * @author: kl @kailing.pub

  * @date: 2020/5/18

  */

@Configuration

@EnableConfigurationProperties ({JpaProperties. class , DataSourceProperties. class })

public class DataSourceConfiguration{

     @Primary

     @Bean

     public DataSource dataSource(DataSourceProperties dataSourceProperties){

        return dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource. class ).build();

     }

     @Primary

     @Bean (initMethod = "afterPropertiesSet" )

     public LocalContainerEntityManagerFactoryBean entityManagerFactory(JpaProperties jpaProperties, DataSource dataSource, EntityManagerFactoryBuilder factoryBuilder) {

         return factoryBuilder.dataSource(dataSource)

                 .packages(Constants.BASE_PACKAGES)

                 .properties(jpaProperties.getProperties())

                 .persistenceUnit( "default" )

                 .build();

     }

     @Bean

     @Primary

     public EntityManager entityManager(EntityManagerFactory entityManagerFactory){

         //必须使用SharedEntityManagerCreator创建SharedEntityManager实例,否则SimpleJpaRepository中的事务不生效

         return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);

     }

     @Primary

     @Bean

     public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory){

         JpaTransactionManager txManager = new JpaTransactionManager();

         txManager.setEntityManagerFactory(entityManagerFactory);

         return txManager;

     }

}

sharding-jdbc加密数据源配置如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

/**

  * @author: kl @kailing.pub

  * @date: 2020/5/18

  */

@Configuration

@EnableConfigurationProperties ({JpaProperties. class ,SpringBootEncryptRuleConfigurationProperties. class , SpringBootPropertiesConfigurationProperties. class })

public class EncryptDataSourceConfiguration {

     @Bean

     public DataSource encryptDataSource(DataSource dataSource,SpringBootPropertiesConfigurationProperties props,SpringBootEncryptRuleConfigurationProperties encryptRule) throws SQLException {

         return EncryptDataSourceFactory.createDataSource(dataSource, new EncryptRuleConfigurationYamlSwapper().swap(encryptRule), props.getProps());

     }

     @Bean (initMethod = "afterPropertiesSet" )

     public LocalContainerEntityManagerFactoryBean encryptEntityManagerFactory( @Qualifier ( "encryptDataSource" ) DataSource dataSource,JpaProperties jpaProperties, EntityManagerFactoryBuilder factoryBuilder) throws SQLException {

         return factoryBuilder.dataSource(dataSource)

                 .packages(Constants.BASE_PACKAGES)

                 .properties(jpaProperties.getProperties())

                 .persistenceUnit( "encryptPersistenceUnit" )

                 .build();

     }

     @Bean

     public EntityManager encryptEntityManager( @Qualifier ( "encryptEntityManagerFactory" ) EntityManagerFactory entityManagerFactory){

         //必须使用SharedEntityManagerCreator创建SharedEntityManager实例,否则SimpleJpaRepository中的事务不生效

         return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);

     }

     @Bean

     public PlatformTransactionManager chainedTransactionManager(PlatformTransactionManager transactionManager) throws SQLException {

         JpaTransactionManager encryptTransactionManager = new JpaTransactionManager();

         encryptTransactionManager.setEntityManagerFactory(encryptEntityManagerFactory());

         //使用链式事务管理器包装真正的transactionManager、txManager事务

         ChainedTransactionManager chainedTransactionManager = new ChainedTransactionManager(encryptTransactionManager,transactionManager);

         return chainedTransactionManager;

     }

}

使用这种方案,在涉及到多数据源的业务时,需要指定使用哪个事务管理器,如:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

@PersistenceContext (unitName = "encryptPersistenceUnit" )

     private EntityManager entityManager;

     @PersistenceContext

     private EntityManager manager;

     @Transactional (transactionManager = "chainedTransactionManager" )

     public AccountModel  save(AccountDTO dto){

         AccountModel accountModel = AccountMapper.INSTANCE.dtoTo(dto);

         entityManager.persist(accountModel);

         entityManager.flush();

         AccountModel accountMode2 = AccountMapper.INSTANCE.dtoTo(dto);

         manager.persist(accountMode2);

         manager.flush();

         return accountModel;

     }

结语

综上,对于JPA的多数据源分布式事务处理,JTA的事务管理器经过spring boot的封装已经可以开箱即用了。重点在JPA环境下,需要指定EntityManagerFactory的事务使用JTA事务。另本文分享了一种链式事务编排的方式也可以应用在这种场景,但是特殊的场景下不能保证事务的完整性,所以博主推荐使用JtaTransactionManager,有符合的场景也可以试试ChainedTransactionManager。

以上就是JPA多数据源分布式事务处理方案的详细内容,更多关于JPA多数据源分布式事务处理的资料请关注其它相关文章!

原文链接:http://HdhCmsTestkailing.pub/article/index/arcid/282.html

查看更多关于JPA多数据源分布式事务处理方案的详细内容...

  阅读:23次