@TransactionalEventListener监听事务
项目背景
最近在项目遇到一个问题
A方法体内有 INSERT、UPDATE或者DELETE操作,最后会发送一段MQ给外部,外部接收到MQ后会再发送一段请求过来,系统收到请求后会执行B方法,B方法会依赖A方法修改后的结果,这就有一个问题,如果A方法事务没有提交;且B方法的请求过来了会查询到事务未提交前的状态,这就会有问题
解决办法:@TransactionalEventListener
在Spring4.2+,有一种叫做TransactionEventListener的方式,能够控制在事务的时候Event事件的处理方式。 我们知道,Spring的发布订阅模型实际上并不是异步的,而是同步的来将代码进行解耦。而TransactionEventListener仍是通过这种方式,只不过加入了回调的方式来解决,这样就能够在事务进行Commited,Rollback…等的时候才会去进行Event的处理。
具体实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
//创建一个事件类 package com.qk.cas.config; import org.springframework.context.ApplicationEvent; public class MyTransactionEvent extends ApplicationEvent { private static final long serialVersionUID = 1L; private IProcesser processer; public MyTransactionEvent(IProcesser processer) { super (processer); this .processer = processer; } public IProcesser getProcesser() { return this .processer; } @FunctionalInterface public interface IProcesser { void handle(); } } //创建一个监听类 package com.qk.cas.config; import org.springframework.stereotype.Component; import org.springframework.transaction.event.TransactionPhase; import org.springframework.transaction.event.TransactionalEventListener; @Component public class MyTransactionListener { @TransactionalEventListener (phase = TransactionPhase.AFTER_COMMIT) public void hanldeOrderCreatedEvent(MyTransactionEvent event) { event.getProcesser().handle(); } } //MQ方法的变动 @Autowired private ApplicationEventPublisher eventPublisher; @Autowired private AmqpTemplate rabbitTemplate; public void sendCreditResult(String applyNo, String jsonString) { eventPublisher.publishEvent( new MyTransactionEvent(() -> { LOGGER.info( "MQ。APPLY_NO:[{}]。KEY:[{}]。通知报文:[{}]" , applyNo, Queues.CREDIT_RESULT, jsonString); rabbitTemplate.convertAndSend(Queues.CREDIT_RESULT, jsonString); })); } |
拓展
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) 只有当前事务提交之后,才会执行事件监听的方法,其中参数phase默认为AFTER_COMMIT,共有四个枚举:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
public enum TransactionPhase { /** * Fire the event before transaction commit. * @see TransactionSynchronization#beforeCommit(boolean) */ BEFORE_COMMIT, /** * Fire the event after the commit has completed successfully. * <p>Note: This is a specialization of {@link #AFTER_COMPLETION} and * therefore executes in the same after-completion sequence of events, * (and not in {@link TransactionSynchronization#afterCommit()}). * @see TransactionSynchronization#afterCompletion(int) * @see TransactionSynchronization#STATUS_COMMITTED */ AFTER_COMMIT, /** * Fire the event if the transaction has rolled back. * <p>Note: This is a specialization of {@link #AFTER_COMPLETION} and * therefore executes in the same after-completion sequence of events. * @see TransactionSynchronization#afterCompletion(int) * @see TransactionSynchronization#STATUS_ROLLED_BACK */ AFTER_ROLLBACK, /** * Fire the event after the transaction has completed. * <p>For more fine-grained events, use {@link #AFTER_COMMIT} or * {@link #AFTER_ROLLBACK} to intercept transaction commit * or rollback, respectively. * @see TransactionSynchronization#afterCompletion(int) */ AFTER_COMPLETION } |
注解@TransactionalEventListener
例如 用户注册之后需要计算用户的邀请关系,递归操作。如果注册的时候包含多步验证,生成基本初始化数据,这时候我们通过mq发送消息来处理这个邀请关系,会出现一个问题,就是用户还没注册数据还没入库,邀请关系就开始执行,但是查不到数据,导致出错。
@TransactionalEventListener 可以实现事务的监听,可以在提交之后再进行操作。
监听的对象
1 2 3 4 5 6 7 8 9 |
package com.jinglitong.springshop.interceptor; import com.jinglitong.springshop.entity.Customer; import org.springframework.context.ApplicationEvent;
public class RegCustomerEvent extends ApplicationEvent{ public RegCustomerEvent(Customer customer){ super (customer); } } |
监听到之后的操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
package com.jinglitong.springshop.interceptor; import com.alibaba.fastjson.JSON; import com.jinglitong.springshop.entity.Customer; import com.jinglitong.springshop.entity.MqMessageRecord; import com.jinglitong.springshop.servcie.MqMessageRecordService; import com.jinglitong.springshop.util.AliMQServiceUtil; import lombok.extern.slf4j.Slf4j; import org.apache测试数据mons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.transaction.event.TransactionPhase; import org.springframework.transaction.event.TransactionalEventListener; import java.util.Date; import java.util.HashMap; import java.util.Map;
@Component @Slf4j public class RegCustomerListener {
@Value ( "${aliyun.mq.order.topic}" ) private String topic;
@Value ( "${aliyun.mq.regist.product}" ) private String registGroup;
@Value ( "${aliyun.mq.regist.tag}" ) private String registTag;
@Autowired MqMessageRecordService mqMessageRecordService;
@TransactionalEventListener (phase = TransactionPhase.AFTER_COMMIT) public void hanldeRegCustomerEvent(RegCustomerEvent regCustomerEvent) { Customer cust = (Customer) regCustomerEvent.getSource(); Map<String, String> map = new HashMap<String, String>(); map.put( "custId" , cust.getZid()); map.put( "account" , cust.getAccount()); log.info( "put regist notice to Mq start" ); String hdResult = AliMQServiceUtil.createNewOrder(cust.getZid(), JSON.toJSONString(map),topic,registTag,registGroup); MqMessageRecord insert = buidBean(cust.getZid(),hdResult,registTag,JSON.toJSONString(map),registGroup); if (StringUtils.isEmpty(hdResult)) { insert.setStatus( false ); } else { insert.setStatus( true ); } mqMessageRecordService.insertRecord(insert); log.info( "put regist notice to Mq end" ); log.info( "regist notice userId : " + cust.getAccount()); }
private MqMessageRecord buidBean (String custId,String result ,String tag,String jsonStr,String groupId) { MqMessageRecord msg = new MqMessageRecord(); msg.setFlowId(custId); msg.setGroupName(groupId); msg.setTopic(topic); msg.setTag(tag); msg.setMsgId(result); msg.setDataBody(jsonStr); msg.setSendType( 3 ); msg.setGroupType( 1 ); msg.setCreateTime( new Date()); return msg; } } @Autowired private ApplicationEventPublisher applicationEventPublisher;
applicationEventPublisher.publishEvent( new RegCustomerEvent (XXX)); |
这样可以确保数据入库之后再进行异步计算
以上为个人经验,希望能给大家一个参考,也希望大家多多支持。
原文链接:https://my.oschina.net/u/4021946/blog/4915478
查看更多关于使用@TransactionalEventListener监听事务教程的详细内容...