好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

使用@TransactionalEventListener监听事务教程

@TransactionalEventListener监听事务

项目背景

最近在项目遇到一个问题

A方法体内有 INSERT、UPDATE或者DELETE操作,最后会发送一段MQ给外部,外部接收到MQ后会再发送一段请求过来,系统收到请求后会执行B方法,B方法会依赖A方法修改后的结果,这就有一个问题,如果A方法事务没有提交;且B方法的请求过来了会查询到事务未提交前的状态,这就会有问题

解决办法:@TransactionalEventListener

在Spring4.2+,有一种叫做TransactionEventListener的方式,能够控制在事务的时候Event事件的处理方式。 我们知道,Spring的发布订阅模型实际上并不是异步的,而是同步的来将代码进行解耦。而TransactionEventListener仍是通过这种方式,只不过加入了回调的方式来解决,这样就能够在事务进行Commited,Rollback…等的时候才会去进行Event的处理。

具体实现

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

//创建一个事件类

package com.qk.cas.config;

import org.springframework.context.ApplicationEvent;

public class MyTransactionEvent extends ApplicationEvent {

     private static final long serialVersionUID = 1L;

     private IProcesser processer;

     public MyTransactionEvent(IProcesser processer) {

         super (processer);

         this .processer = processer;

     }

     public IProcesser getProcesser() {

         return this .processer;

     }

     @FunctionalInterface

     public interface IProcesser {

         void handle();

     }

}

//创建一个监听类

package com.qk.cas.config;

import org.springframework.stereotype.Component;

import org.springframework.transaction.event.TransactionPhase;

import org.springframework.transaction.event.TransactionalEventListener;

@Component

public class MyTransactionListener {

     @TransactionalEventListener (phase = TransactionPhase.AFTER_COMMIT)

     public void hanldeOrderCreatedEvent(MyTransactionEvent event) {

         event.getProcesser().handle();

     }

}

//MQ方法的变动

     @Autowired

     private ApplicationEventPublisher eventPublisher;

     @Autowired

     private AmqpTemplate rabbitTemplate;

     public void sendCreditResult(String applyNo, String jsonString) {

         eventPublisher.publishEvent( new MyTransactionEvent(() -> {

             LOGGER.info( "MQ。APPLY_NO:[{}]。KEY:[{}]。通知报文:[{}]" , applyNo, Queues.CREDIT_RESULT, jsonString);

             rabbitTemplate.convertAndSend(Queues.CREDIT_RESULT, jsonString);

         }));

     }

拓展

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) 只有当前事务提交之后,才会执行事件监听的方法,其中参数phase默认为AFTER_COMMIT,共有四个枚举:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

public enum TransactionPhase {

     /**

      * Fire the event before transaction commit.

      * @see TransactionSynchronization#beforeCommit(boolean)

      */

     BEFORE_COMMIT,

     /**

      * Fire the event after the commit has completed successfully.

      * <p>Note: This is a specialization of {@link #AFTER_COMPLETION} and

      * therefore executes in the same after-completion sequence of events,

      * (and not in {@link TransactionSynchronization#afterCommit()}).

      * @see TransactionSynchronization#afterCompletion(int)

      * @see TransactionSynchronization#STATUS_COMMITTED

      */

     AFTER_COMMIT,

     /**

      * Fire the event if the transaction has rolled back.

      * <p>Note: This is a specialization of {@link #AFTER_COMPLETION} and

      * therefore executes in the same after-completion sequence of events.

      * @see TransactionSynchronization#afterCompletion(int)

      * @see TransactionSynchronization#STATUS_ROLLED_BACK

      */

     AFTER_ROLLBACK,

     /**

      * Fire the event after the transaction has completed.

      * <p>For more fine-grained events, use {@link #AFTER_COMMIT} or

      * {@link #AFTER_ROLLBACK} to intercept transaction commit

      * or rollback, respectively.

      * @see TransactionSynchronization#afterCompletion(int)

      */

     AFTER_COMPLETION

}

注解@TransactionalEventListener

例如 用户注册之后需要计算用户的邀请关系,递归操作。如果注册的时候包含多步验证,生成基本初始化数据,这时候我们通过mq发送消息来处理这个邀请关系,会出现一个问题,就是用户还没注册数据还没入库,邀请关系就开始执行,但是查不到数据,导致出错。

@TransactionalEventListener 可以实现事务的监听,可以在提交之后再进行操作。

监听的对象

?

1

2

3

4

5

6

7

8

9

package com.jinglitong.springshop.interceptor;

import com.jinglitong.springshop.entity.Customer;

import org.springframework.context.ApplicationEvent;

  

public class RegCustomerEvent extends ApplicationEvent{

     public RegCustomerEvent(Customer customer){

         super (customer);

     }

}

监听到之后的操作

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

package com.jinglitong.springshop.interceptor;

import com.alibaba.fastjson.JSON;

import com.jinglitong.springshop.entity.Customer;

import com.jinglitong.springshop.entity.MqMessageRecord;

import com.jinglitong.springshop.servcie.MqMessageRecordService;

import com.jinglitong.springshop.util.AliMQServiceUtil;

import lombok.extern.slf4j.Slf4j;

import org.apache测试数据mons.lang3.StringUtils;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Component;

import org.springframework.transaction.event.TransactionPhase;

import org.springframework.transaction.event.TransactionalEventListener; 

import java.util.Date;

import java.util.HashMap;

import java.util.Map;

 

@Component

@Slf4j

public class RegCustomerListener {

 

     @Value ( "${aliyun.mq.order.topic}" )

     private String topic;

 

     @Value ( "${aliyun.mq.regist.product}" )

     private String registGroup;

 

     @Value ( "${aliyun.mq.regist.tag}" )

     private String registTag;

 

     @Autowired

     MqMessageRecordService mqMessageRecordService;

 

     @TransactionalEventListener (phase = TransactionPhase.AFTER_COMMIT)

     public void hanldeRegCustomerEvent(RegCustomerEvent regCustomerEvent) {

         Customer cust = (Customer) regCustomerEvent.getSource();

         Map<String, String> map = new HashMap<String, String>();

         map.put( "custId" , cust.getZid());

         map.put( "account" , cust.getAccount());

         log.info( "put regist notice to Mq start" );

         String hdResult = AliMQServiceUtil.createNewOrder(cust.getZid(), JSON.toJSONString(map),topic,registTag,registGroup);

         MqMessageRecord insert = buidBean(cust.getZid(),hdResult,registTag,JSON.toJSONString(map),registGroup);

         if (StringUtils.isEmpty(hdResult)) {

             insert.setStatus( false );

         } else {

             insert.setStatus( true );

         }

         mqMessageRecordService.insertRecord(insert);

         log.info( "put regist notice to Mq end" );

         log.info( "regist notice userId : " + cust.getAccount());

     }

 

     private MqMessageRecord buidBean (String custId,String result ,String tag,String jsonStr,String groupId) {

         MqMessageRecord msg = new MqMessageRecord();

         msg.setFlowId(custId);

         msg.setGroupName(groupId);

         msg.setTopic(topic);

         msg.setTag(tag);

         msg.setMsgId(result);

         msg.setDataBody(jsonStr);

         msg.setSendType( 3 );

         msg.setGroupType( 1 );

         msg.setCreateTime( new Date());

         return msg;

     }

}

@Autowired

     private ApplicationEventPublisher applicationEventPublisher;

 

applicationEventPublisher.publishEvent( new RegCustomerEvent (XXX));

这样可以确保数据入库之后再进行异步计算

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。

原文链接:https://my.oschina.net/u/4021946/blog/4915478

查看更多关于使用@TransactionalEventListener监听事务教程的详细内容...

  阅读:37次