RocketMQTemplate发送带tags的消息
RocketMQTemplate是RocketMQ集成到Spring cloud之后提供的个方便发送消息的模板类,它是基本Spring 的消息机制实现的,对外只提供了Spring抽象出来的消息发送接口。
在单独使用RocketMQ的时候,发送消息使用的Message是‘org.apache.rocketmq测试数据mon.message'包下面的Message,而使用RocketMQTemplate发送消息时,使用的Message是org.springframework.messaging的Message,猛一看,没办法发送带tags的消息了,其实在RocketMQ集成的时候已经解决了这个问题。
在RocketMQTemplate发送消息时,调用的方法是:
public SendResult syncSendOrderly ( String destination , Message <?> message , String hashKey , long timeout ) { if ( Objects . isNull ( message ) || Objects . isNull ( message . getPayload ())) { log . error ( "syncSendOrderly failed. destination:{}, message is null " , destination ); throw new IllegalArgumentException ( "`message` and `message.payload` cannot be null" ); } try { long now = System . currentTimeMillis (); //在这里对消息进行了转化,将Spring的message转化为rocketmq自己的message org . apache . rocketmq . common . message . Message rocketMsg = RocketMQUtil . convertToRocketMessage ( objectMapper , charset , destination , message ); SendResult sendResult = producer . send ( rocketMsg , messageQueueSelector , hashKey , timeout ); long costTime = System . currentTimeMillis () - now ; log . debug ( "send message cost: {} ms, msgId:{}" , costTime , sendResult . getMsgId ()); return sendResult ; } catch ( Exception e ) { log . error ( "syncSendOrderly failed. destination:{}, message:{} " , destination , message ); throw new MessagingException ( e . getMessage (), e ); } }在上面的代码中,对消息进行了转化,将Spring的message转化为rocketmq自己的message,在RocketMQUtil.convertToRocketMessage方法中有个地方就是获取tags的:
String [] tempArr = destination . split ( ":" , 2 ); String topic = tempArr [ 0 ]; String tags = "" ; if ( tempArr . length > 1 ) { tags = tempArr [ 1 ]; }所以,在发送消息的时候,我们只要把tags使用":"添加到topic后面就可以了。
例如:xxxx:tag1 || tag2 || tag3
使用RocketMQ 处理消息
消息发送(生产者)
以maven + SpringBoot 工程为例,先在pom.xml增加依赖
<dependency> <groupId> org.apache.rocketmq </groupId> <artifactId> rocketmq-spring-boot-starter </artifactId> <version> 2.0.1 </version> </dependency>由于,这个依赖是一个starter,直接引入依赖就可以开始写投递消息的代码了。这个starter注册了一个叫org.apache.rocketmq.spring.core.RocketMQTemplate的bean,用它就可以直接把消息投递出去。 具体的API是这样的
XXXEvent xxxDto = new XXXEvent (); Message < XXXEvent > message = MessageBuilder . withPayload ( xxxDto ). build (); String dest = String . format ( "%s:%s" , topic - name "," tag - name "); //默认投递:同步发送 不会丢失消息。如果在投递成功后发生网络异常,客户端会认为投递失败而回滚本地事务 this.rocketMQTemplate.send(dest, xxxDto);这种投递方式能保证投递成功的消息不会丢失,但是不能保证投递一定成功。假设一次调用的流程是这样的
如果在步骤3的时候发生错误,因为出错mqClient会认为消息投递失败而把事务回滚。如果消息已经被消费,那就会导致业务错误。我们可以用事务消息解决这个问题。
以带事务方式投递的消息,正常情况下的处理流程是这样的
出错的时候是这样的
由于普通消息没有消息回查,普通消息用的producer不支持回查操作,不同业务的回查处理也不一样,事务消息需要使用单独的producer。消息发送代码大概是这样的
//调用这段代码之前别做会影响数据的操作 XXXEvent xxxDto = new XXXEvent (); Message < XXXEvent > message = MessageBuilder . withPayload ( xxxDto ). build (); String dest = String . format ( "%s:%s" , topic - name "," tag - name "); TransactionSendResult transactionSendResult = this.rocketMQTemplate.sendMessageInTransaction(" poducer - name "," topic - name : tag - name ",message," xxxid "); if (LocalTransactionState.ROLLBACK_MESSAGE.equals(transactionSendResult.getLocalTransactionState())){ throw new RuntimeException(" 事务消息投递失败 "); } //按照RocketMQ的写法,这个地方不应该有别的代码 @RocketMQTransactionListener ( txProducerGroup = "producer" ) class TransactionListenerImpl implements RocketMQLocalTransactionListener { //消息投递成功后执行的逻辑(半消息) //原文:When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction. @Override public RocketMQLocalTransactionState executeLocalTransaction ( Message msg , Object arg ) { try { // xxxService . doSomething (); return RocketMQLocalTransactionState . COMMIT ; catch ( IOException e ){ //不确定最终是否成功 return RocketMQLocalTransactionState . UNKNOWN ; } catch ( Exception e ){ return RocketMQLocalTransactionState . ROLLBACK ; } } //回查事务执行状态 @Override public RocketMQLocalTransactionState checkLocalTransaction ( Message msg ) { Boolean result = xxxService . isSuccess ( msg , arg ); if ( result != null ){ if ( result ){ return RocketMQLocalTransactionState . COMMIT ; } else { return RocketMQLocalTransactionState . ROLLBACK ; } } return RocketMQLocalTransactionState . UNKNOWN ; } }处理消息(消费)
普通消息和事务消息的区别只在投递的时候才明显,对应的消费端代码比较简单
import lombok . extern . slf4j . Slf4j ; import org . apache . rocketmq . spring . annotation . RocketMQMessageListener ; import org . apache . rocketmq . spring . core . RocketMQListener ; import org . springframework . beans . factory . annotation . Autowired ; import org . springframework . data . redis . core . RedisTemplate ; import org . springframework . data . redis . core . StringRedisTemplate ; import org . springframework . stereotype . Component ; @Slf4j @Component @RocketMQMessageListener ( consumerGroup = "xxx-consumer" , topic = "topic-name" , selectorExpression = "tag-name" ) public class XXXEventMQListener implements RocketMQListener < XXXEvent > { private String repeatCheckRedisKeyTemplate = "topic-name:tag:repeat-check:%s" ; @Autowired private StringRedisTemplate redisTemplate ; @Override public void onMessage ( XXXEvent message ) { log . info ( "consumer message {}" , message ); //处理消息 try { xxxService . doSomething ( message ); } catch ( Exception ex ){ log . warn ( String . format ( "message [%s] 消费失败" , message ), ex ); //抛出异常后,MQClient会返回ConsumeConcurrentlyStatus.RECONSUME_LATER,这条消息会再次尝试消费 throw new RuntimException ( ex ); } } }RocketMQ用ACK机制保证NameServer知道消息是否被消费在
org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer里是这么处理的
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently { @SuppressWarnings ( "unchecked" ) @Override public ConsumeConcurrentlyStatus consumeMessage ( List < MessageExt > msgs , ConsumeConcurrentlyContext context ) { for ( MessageExt messageExt : msgs ) { log . debug ( "received msg: {}" , messageExt ); try { long now = System . currentTimeMillis (); rocketMQListener . onMessage ( doConvertMessage ( messageExt )); long costTime = System . currentTimeMillis () - now ; log . debug ( "consume {} cost: {} ms" , messageExt . getMsgId (), costTime ); } catch ( Exception e ) { log . warn ( "consume message failed. messageExt:{}" , messageExt , e ); context . setDelayLevelWhenNextConsume ( delayLevelWhenNextConsume ); return ConsumeConcurrentlyStatus . RECONSUME_LATER ; } } return ConsumeConcurrentlyStatus . CONSUME_SUCCESS ; } }以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。
原文链接:https://blog.csdn.net/youxijishu/article/details/105042136
查看更多关于使用RocketMQTemplate发送带tags的消息的详细内容...