好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

springboot2.5.6集成RabbitMq实现Topic主题模式(推荐)

1.application.yml

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

server:

   port: 8184

spring:

   application:

     name: rabbitmq-demo

   rabbitmq:

     host: 127.0 . 0.1 # ip地址

     port: 5672

     username: admin # 连接账号

     password: 123456 # 连接密码

     template:

       retry:

         enabled: true # 开启失败重试

         initial-interval: 10000ms # 第一次重试的间隔时长

         max-interval: 300000ms # 最长重试间隔,超过这个间隔将不再重试

         multiplier: 2 # 下次重试间隔的倍数,此处是 2 即下次重试间隔是上次的 2 倍

       exchange: topic.exchange # 缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个

     publisher-confirm-type: correlated # 生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试

     publisher-returns: true

     listener:

       type: simple

       simple:

         acknowledge-mode: manual

         prefetch: 1 # 限制每次发送一条数据。

         concurrency: 3 # 同一个队列启动几个消费者

         max-concurrency: 3 # 启动消费者最大数量

         # 重试策略相关配置

         retry:

           enabled: true # 是否支持重试

           max-attempts: 5

           stateless: false

           multiplier: 1.0 # 时间策略乘数因子

           initial-interval: 1000ms

           max-interval: 10000ms

         default -requeue-rejected: true

2.pom.xml引入依赖

?

1

2

3

4

5

<!-- rabbitmq -->

         <dependency>

             <groupId>org.springframework.boot</groupId>

             <artifactId>spring-boot-starter-amqp</artifactId>

         </dependency>

3.常量类创建

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

/**

  * @author kkp

  * @ClassName RabbitMqConstants

  * @date 2021/11/3 14:16

  * @Description

  */

public class RabbitMqConstants {

     public final static String TEST1_QUEUE = "test1-queue" ;

 

     public final static String TEST2_QUEUE = "test2-queue" ;

 

     public final static String EXCHANGE_NAME = "test.topic.exchange" ;

     /**

      * routingKey1

      */

     public final static String TOPIC_TEST1_ROUTINGKEY = "topic.test1.*" ;

 

     public final static String TOPIC_TEST1_ROUTINGKEY_TEST = "topic.test1.test" ;

     /**

      * routingKey1

      */

     public final static String TOPIC_TEST2_ROUTINGKEY = "topic.test2.*" ;

 

     public final static String TOPIC_TEST2_ROUTINGKEY_TEST = "topic.test2.test" ;

}

4.配置Configuration

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

import com.example.demo.common.RabbitMqConstants;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.*;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.beans.factory.config.ConfigurableBeanFactory;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.context.annotation.Scope;

 

/**

  * @author kkp

  * @ClassName RabbitMqConfig

  * @date 2021/11/3 14:16

  * @Description

  */

@Slf4j

@Configuration

public class RabbitMqConfig {

     @Autowired

     private CachingConnectionFactory connectionFactory;

 

     /**

      *  声明交换机

      */

     @Bean (RabbitMqConstants.EXCHANGE_NAME)

     public Exchange exchange(){

  //durable(true) 持久化,mq重启之后交换机还在

         // Topic模式

         //return ExchangeBuilder.topicExchange(RabbitMqConstants.EXCHANGE_NAME).durable(true).build();

         //发布订阅模式

         return ExchangeBuilder.fanoutExchange(RabbitMqConstants.EXCHANGE_NAME).durable( true ).build();

     }

 

     /**

      *  声明队列

      *  new Queue(QUEUE_EMAIL,true,false,false)

      *  durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列

      *  auto-delete 表示消息队列没有在使用时将被自动删除 默认是false

      *  exclusive  表示该消息队列是否只在当前connection生效,默认是false

      */

     @Bean (RabbitMqConstants.TEST1_QUEUE)

     public Queue esQueue() {

         return new Queue(RabbitMqConstants.TEST1_QUEUE);

     }

 

     /**

      *  声明队列

      */

     @Bean (RabbitMqConstants.TEST2_QUEUE)

     public Queue gitalkQueue() {

         return new Queue(RabbitMqConstants.TEST2_QUEUE);

     }

 

     /**

      *  TEST1_QUEUE队列绑定交换机,指定routingKey

      */

     @Bean

     public Binding bindingEs( @Qualifier (RabbitMqConstants.TEST1_QUEUE) Queue queue,

                              @Qualifier (RabbitMqConstants.EXCHANGE_NAME) Exchange exchange) {

         return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY).noargs();

     }

 

     /**

      *  TEST2_QUEUE队列绑定交换机,指定routingKey

      */

     @Bean

     public Binding bindingGitalk( @Qualifier (RabbitMqConstants.TEST2_QUEUE) Queue queue,

                                  @Qualifier (RabbitMqConstants.EXCHANGE_NAME) Exchange exchange) {

         return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY).noargs();

     }

 

     /**

      * 如果需要在生产者需要消息发送后的回调,

      * 需要对rabbitTemplate设置ConfirmCallback对象,

      * 由于不同的生产者需要对应不同的ConfirmCallback,

      * 如果rabbitTemplate设置为单例bean,

      * 则所有的rabbitTemplate实际的ConfirmCallback为最后一次申明的ConfirmCallback。

      * @return

      */

     @Bean

     @Scope (ConfigurableBeanFactory.SCOPE_PROTOTYPE)

     public RabbitTemplate rabbitTemplate() {

          RabbitTemplate template = new RabbitTemplate(connectionFactory);

         return template;

     }

}

5.Rabbit工具类创建

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

 

import java.util.UUID;

/**

  * @author kkp

  * @ClassName RabbitMqUtils

  * @date 2021/11/3 14:21

  * @Description

  */

@Slf4j

@Component

public class RabbitMqUtils implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{

 

     private RabbitTemplate rabbitTemplate;

 

     /**

      * 构造方法注入

      */

     @Autowired

     public RabbitMqUtils(RabbitTemplate rabbitTemplate) {

         this .rabbitTemplate = rabbitTemplate;

         //这是是设置回调能收到发送到响应

         rabbitTemplate.setConfirmCallback( this );

         //如果设置备份队列则不起作用

         rabbitTemplate.setMandatory( true );

         rabbitTemplate.setReturnCallback( this );

     }

 

     /**

      * 回调确认

      */

     @Override

     public void confirm(CorrelationData correlationData, boolean ack, String cause) {

         if (ack){

             log.info( "消息发送成功:correlationData({}),ack({}),cause({})" ,correlationData,ack,cause);

         } else {

             log.info( "消息发送失败:correlationData({}),ack({}),cause({})" ,correlationData,ack,cause);

         }

     }

 

     /**

      * 消息发送到转换器的时候没有对列,配置了备份对列该回调则不生效

      * @param message

      * @param replyCode

      * @param replyText

      * @param exchange

      * @param routingKey

      */

     @Override

     public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

         log.info( "消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}" ,exchange,routingKey,replyCode,replyText,message);

     }

 

     /**

      * 发送到指定Queue

      * @param queueName

      * @param obj

      */

     public void send(String queueName, Object obj){

         CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());

         this .rabbitTemplate.convertAndSend(queueName, obj, correlationId);

     }

 

     /**

      * 1、交换机名称

      * 2、routingKey

      * 3、消息内容

      */

     public void sendByRoutingKey(String exChange, String routingKey, Object obj){

         CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());

         this .rabbitTemplate.convertAndSend(exChange, routingKey, obj, correlationId);

     }

}

6.service创建

?

1

2

3

4

5

6

public interface TestService {

 

     String sendTest1(String content);

 

     String sendTest2(String content);

}

7.impl实现

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

import com.example.demo.common.RabbitMqConstants;

import com.example.demo.util.RabbitMqUtils;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

/**

  * @author kkp

  * @ClassName TestServiceImpl

  * @date 2021/11/3 14:24

  * @Description

  */

@Service

@Slf4j

public class TestServiceImpl implements TestService {

 

     @Autowired

     private RabbitMqUtils rabbitMqUtils;

 

     @Override

     public String sendTest1(String content) {

         rabbitMqUtils.sendByRoutingKey(RabbitMqConstants.EXCHANGE_NAME,

                 RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY_TEST, content);

         log.info(RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY_TEST+ "***************发送成功*****************" );

         return "发送成功!" ;

     }

 

     @Override

     public String sendTest2(String content) {

         rabbitMqUtils.sendByRoutingKey(RabbitMqConstants.EXCHANGE_NAME,

                 RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY_TEST, content);

         log.info(RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY_TEST+ "***************发送成功*****************" );

         return "发送成功!" ;

     }

}

8.监听类

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

import com.example.demo.common.RabbitMqConstants;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.ExchangeTypes;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.Exchange;

import org.springframework.amqp.rabbit.annotation.Queue;

import org.springframework.amqp.rabbit.annotation.QueueBinding;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

 

import com.rabbitmq.client.Channel;

 

/**

  * @author kkp

  * @ClassName RabbitMqListener

  * @date 2021/11/3 14:22

  * @Description

  */

 

@Slf4j

@Component

public class RabbitMqListener {

 

     @RabbitListener (queues = RabbitMqConstants.TEST1_QUEUE)

     public void test1Consumer(Message message, Channel channel) {

         try {

             //手动确认消息已经被消费

             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false );

             log.info( "Counsoum1消费消息:" + message.toString() + "。成功!" );

         } catch (Exception e) {

             e.printStackTrace();

             log.info( "Counsoum1消费消息:" + message.toString() + "。失败!" );

         }

     }

 

     @RabbitListener (queues = RabbitMqConstants.TEST2_QUEUE)

     public void test2Consumer(Message message, Channel channel) {

         try {

             //手动确认消息已经被消费

             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false );

             log.info( "Counsoum2消费消息:" + message.toString() + "。成功!" );

         } catch (Exception e) {

             e.printStackTrace();

             log.info( "Counsoum2消费消息:" + message.toString() + "。失败!" );

         }

     }

 

}

9.Controller测试

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

import com.example.demo.server.TestService;

import jdk.nashorn.internal.objects.annotations.Getter;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.*;

 

import java.util.Map;

 

/**

  * @author kkp

  * @ClassName TestController

  * @date 2021/11/3 14:25

  * @Description

  */

@Slf4j

@RestController

@RequestMapping ( "/enterprise" )

public class TestController {

 

     @Autowired

     private TestService testService;

 

     @GetMapping ( "/finance" )

     public String hello3( @RequestParam (required = false ) Map<String, Object> params) {

         return testService.sendTest2(params.get( "entId" ).toString());

     }

     /**

      * 发送消息test2

      * @param content

      * @return

      */

     @PostMapping (value = "/finance2" )

     public String sendTest2( @RequestBody String content) {

         return testService.sendTest2(content);

     }

 

}

到此这篇关于springboot2.5.6集成RabbitMq实现Topic主题模式的文章就介绍到这了,更多相关springboot集成RabbitMq内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://blog.csdn.net/weixin_44907173/article/details/121124048

查看更多关于springboot2.5.6集成RabbitMq实现Topic主题模式(推荐)的详细内容...

  阅读:15次