好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

解决SpringBoot整合RocketMQ遇到的坑

应用场景

在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group、Topic以及selectorExpression(数据过滤、选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换。

引入依赖

?

1

2

3

4

5

6

<!-- RocketMq Spring Boot Starter-->

< dependency >

  < groupId >org.apache.rocketmq</ groupId >

  < artifactId >rocketmq-spring-boot-starter</ artifactId >

  < version >2.0.4</ version >

  </ dependency >

消费者代码

?

1

2

3

4

5

6

7

@RocketMQMessageListener (consumerGroup = "${rocketmq.group}" ,topic = "${rocketmq.topic}" ,selectorExpression = "${rocketmq.selectorExpression}" )

public class Consumer implements RocketMQListener<String> {

     @Override

     public void onMessage(String s) {

         System.out.println( "消费到的数据为:" +s);

     }

}

问题排查

RocketMQMessageListener整个注解默认selectorExpression为*,表示接收当前Topic下的所有数据,如果我们想对tags进行动态配置,在使用${rocketmq.selectorExpression}表达式时会发现所有数据全被过滤了,跟踪源码(ListenerContainerConfiguration.java)发现在创建listener时selectorExpression的数据在通environment环境变量中获取对应的数据后又被覆盖了,导致整个过滤条件被变更为表达式。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

@Override

     public void afterSingletonsInstantiated() {

         // 获取所有所有使用了RocketMQMessageListener注解的bean

         Map<String, Object> beans = this .applicationContext.getBeansWithAnnotation(RocketMQMessageListener. class );

         if (Objects.nonNull(beans)) {

             // 循环注册容器

             beans.forEach( this ::registerContainer);

         }

     }

     private void registerContainer(String beanName, Object bean) {

         Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);

         // 校验当前bean是否实现了RocketMQListener接口

         if (!RocketMQListener. class .isAssignableFrom(bean.getClass())) {

             throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener. class .getName());

         }

         // 获取bean上的annotation

         RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener. class );

         // 解析group及topic,可支持表达式

         String consumerGroup = this .environment.resolvePlaceholders(annotation.consumerGroup());

         String topic = this .environment.resolvePlaceholders(annotation.topic());

         boolean listenerEnabled =

             ( boolean )rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)

                 .getOrDefault(topic, true );

         if (!listenerEnabled) {

             log.debug(

                 "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization." ,

                 consumerGroup, topic);

             return ;

         }

         validate(annotation);

         String containerBeanName = String.format( "%s_%s" , DefaultRocketMQListenerContainer. class .getName(),

             counter.incrementAndGet());

         GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;

         // 注册bean的,调用createRocketMQListenerContainer

         genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer. class ,

             () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));

         DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,

             DefaultRocketMQListenerContainer. class );

         if (!container.isRunning()) {

             try {

                 container.start();

             } catch (Exception e) {

                 log.error( "Started container failed. {}" , container, e);

                 throw new RuntimeException(e);

             }

         }

         log.info( "Register the listener to container, listenerBeanName:{}, containerBeanName:{}" , beanName, containerBeanName);

     }

     private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,

         RocketMQMessageListener annotation) {

         DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();       

         container.setRocketMQMessageListener(annotation);       

         String nameServer = environment.resolvePlaceholders(annotation.nameServer());

         nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;

         String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());

         container.setNameServer(nameServer);

         if (!StringUtils.isEmpty(accessChannel)) {

             container.setAccessChannel(AccessChannel.valueOf(accessChannel));

         }

         container.setTopic(environment.resolvePlaceholders(annotation.topic()));

         // 此处已经根据表达式将数据取出

         String tags = environment.resolvePlaceholders(annotation.selectorExpression());

         if (!StringUtils.isEmpty(tags)) {

             container.setSelectorExpression(tags);

         }

         container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));

         // 此处将SelectorExpression的数据覆盖成了表达式

         container.setRocketMQMessageListener(annotation);

         container.setRocketMQListener((RocketMQListener)bean);

         container.setObjectMapper(objectMapper);

         container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());

         container.setName(name);  // REVIEW ME, use the same clientId or multiple?

         return container;

     }

问题解决

因为ListenerContainerConfiguration类是实现了SmartInitializingSingleton接口的afterSingletonsInstantiated方法,我们可以通过反射对selectorExpression的数据在ListenerContainerConfiguration进行初始化前进行解析并赋值回去。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

/**

  * 在springboot初始化后,RocketMQ容器初始化前利用反射动态改变数据

**/

@Configuration

public class ChangeSelectorExpressionBeforeMQInit implements InitializingBean {

     @Autowired

     private ApplicationContext applicationContext;

     @Autowired

     private StandardEnvironment environment;

     @Override

     public void afterPropertiesSet() throws Exception {

         Map<String,Object> beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener. class );

         for (Object bean : beans.values()){

             Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);

             if (!RocketMQListener. class .isAssignableFrom(bean.getClass())) {

                 continue ;

             }

             RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener. class );

             InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation);

             Field field = invocationHandler.getClass().getDeclaredField( "memberValues" );

             field.setAccessible( true );

             Map<String, Object> memberValues = (Map<String, Object>) field.get(invocationHandler);

             for (Map.Entry<String,Object> entry: memberValues.entrySet()) {

                 if (Objects.nonNull(entry)){

                     memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue())));

                 }

             }

         }

     }

}

初次之外,在2.1.0版本的依赖包中已经修复了此Bug,在不造成依赖冲突的前提下,建议使用2.1.0以上的版本包。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。

原文链接:https://blog.csdn.net/xiaojun081004/article/details/104954802

查看更多关于解决SpringBoot整合RocketMQ遇到的坑的详细内容...

  阅读:38次