好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

SpringBoot集成Redis实现消息队列的方法

list 原理说明

Redis 的 list 是按照插入顺序排序的字符串链表。

如图所示,可以通过 lpush 和 rpop 或者 rpush 和 lpop 实现消息队列。

1 lpush 和 rpop

2 rpush 和 lpop

消息队列功能实现

引入 Redis 依赖
 <dependency>  
      <groupId>  org.springframework.boot  </groupId>  
      <artifactId>  spring-boot-starter-data-redis  </artifactId>  
  </dependency> 
applicat.yml添加Redis配置
 spring  :  
  redis  :  
    host  :     127.0  .  0.1  
    database  :     0  
    port  :     6379  
    jedis  :  
      pool  :  
        max  -  active  :     256  
        max  -  idle  :     8  
        min  -  idle  :     1  
 
Redis配置类
 package   com  .  sb  .  config  ;  

  import   org  .  springframework  .  beans  .  factory  .  annotation  .  Autowired  ;  
  import   org  .  springframework  .  context  .  annotation  .  Bean  ;  
  import   org  .  springframework  .  context  .  annotation  .  Configuration  ;  
  import   org  .  springframework  .  data  .  redis  .  connection  .  RedisConnectionFactory  ;  
  import   org  .  springframework  .  data  .  redis  .  core  .  RedisTemplate  ;  
  import   org  .  springframework  .  data  .  redis  .  serializer  .  StringRedisSerializer  ;  

  @Configuration  
  public     class     RedisConfig     {  

      @Autowired  
      private     RedisConnectionFactory   redisConnectionFactory  ;  

      @Bean  
      public     RedisTemplate  <  String  ,     Object  >   redisTemplate  ()     {  
          RedisTemplate  <  String  ,     Object  >     template     =     new     RedisTemplate  <>();  
          template  .  setConnectionFactory  (  redisConnectionFactory  );  
          template  .  setKeySerializer  (  new     StringRedisSerializer  ());  
          template  .  setValueSerializer  (  new     StringRedisSerializer  ());  
          template  .  afterPropertiesSet  ();  
          return     template  ;  
      }  

  } 
MQ发送和接收接口
 package   com  .  sb  .  service  ;  

  public     interface     MQService     {  

      void   produce  (  String     string  );  

      void   consume  ();  
  }  

 
MQ发送和接收实现类
 package   com  .  sb  .  service  .  impl  ;  

  import   com  .  sb  .  service  .  MQService  ;  
  import   org  .  slf4j  .  Logger  ;  
  import   org  .  slf4j  .  LoggerFactory  ;  
  import   org  .  springframework  .  dao  .  DataAccessException  ;  
  import   org  .  springframework  .  data  .  redis  .  connection  .  RedisConnection  ;  
  import   org  .  springframework  .  data  .  redis  .  core  .  RedisCallback  ;  
  import   org  .  springframework  .  data  .  redis  .  core  .  RedisTemplate  ;  
  import   org  .  springframework  .  data  .  redis  .  serializer  .  StringRedisSerializer  ;  
  import   org  .  springframework  .  lang  .  Nullable  ;  
  import   org  .  springframework  .  stereotype  .  Service  ;  

  import   javax  .  annotation  .  Resource  ;  
  import   java  .  util  .  List  ;  

  @Service  
  public     class     MQServiceImpl     implements     MQService     {  

      private     static     Logger   log   =     LoggerFactory  .  getLogger  (  MQServiceImpl  .  class  );  

      private     static     final     String   MESSAGE_KEY   =     "message:queue"  ;  

      @Resource  
      private     RedisTemplate   redisTemplate  ;  

      @Override  
      public     void   produce  (  String     string  )     {  
        redisTemplate  .  opsForList  ().  leftPush  (  MESSAGE_KEY  ,     string  );  
      }  

      @Override  
      public     void   consume  ()     {  
          String     string     =     (  String  )   redisTemplate  .  opsForList  ().  rightPop  (  MESSAGE_KEY  );  
        log  .  info  (  "consume : {}"  ,     string  );  
      }  

  }  
 
MQ发送和接收API接口
 package   com  .  sb  .  controller  ;  

  import   com  .  sb  .  service  .  MQService  ;  
  import   org  .  springframework  .  web  .  bind  .  annotation  .  RequestMapping  ;  
  import   org  .  springframework  .  web  .  bind  .  annotation  .  RequestMethod  ;  
  import   org  .  springframework  .  web  .  bind  .  annotation  .  RequestParam  ;  
  import   org  .  springframework  .  web  .  bind  .  annotation  .  RestController  ;  

  import   javax  .  annotation  .  Resource  ;  

  @RestController  
  @RequestMapping  (  value  =  "/api"  )  
  public     class     MQController     {  

      @Resource  
      private     MQService   mQService  ;  

      @RequestMapping  (  value     =     "/produce"  ,   method  =  RequestMethod  .  GET  )  
      public     void   produce  (  @RequestParam  (  name   =     "key"  )     String   key  )     {  
        mQService  .  produce  (  key  );  
      }  

      @RequestMapping  (  value  =  "/consume"  ,   method  =  RequestMethod  .  GET  )  
      public     void   consume  ()     {  
          while     (  true  )     {  
            mQService  .  consume  ();  
          }  
      }  

  }  
 

消息队列功能测试

调用 http://localhost:8080/api/produce 接口往队列里面添加 a、b、c、d元素。

调用 http://localhost:8080/api/consume 消费队列里面的元素。

从截图我们可以看到,即使当队列为空,消费者依然在不停的 pop 数据,这就是浪费生命的空轮询。

那如何解决这个空轮询的问题呢?

你也许会想使用 Thread.sleep() 让消费者线程隔一段时间再消费。

使用 Thread.sleep() 会有什么问题么?

A 如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。

B 如果睡眠时间过长,这样不能处理一些时效性的消息,睡眠时间过短,也会在连接上造成比较大的开销。

有没有更优雅和更合适的方式呢?

brpop 和 blpop 实现阻塞读取,下面以 blpop 为例来说明问题。

blpop 理论说明

blpop 命令
 blpop key1  ...  keyN timeout 

blpop 说明

blpop 是阻塞式列表的弹出原语。当给定列表内没有任何元素可供弹出的时候, 连接将被 blpop 命令阻塞。直到有另一个客户端对给定的这些 key 的任意一个执行 lpush 或 rpush 命令为止。 

当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。

key1...keyN :表示不同的队列名。

timeout :阻塞队列超时时间。

blpop 代码实现

 public     void   blockingConsume  ()     {  
      List  <  Object  >   obj   =   redisTemplate  .  executePipelined  (  new     RedisCallback  <  Object  >()     {  
          @Nullable  
          @Override  
          public     Object   doInRedis  (  RedisConnection   connection  )     throws     DataAccessException     {  
              //队列没有元素会阻塞操作,直到队列获取新的元素或超时  
              return   connection  .  bLPop  (  TIME_OUT  ,   MESSAGE_KEY  .  getBytes  ());  
          }  
      },  new     StringRedisSerializer  ());  

      for     (  Object   str  :   obj  )     {  
        log  .  info  (  "blockingConsume : {}"  ,   str  );  
      }  
  } 

阻塞线程每隔10s超时执行一次。该方法解决了 CPU 空转的问题。

到此这篇关于SpringBoot集成Redis实现消息队列的方法的文章就介绍到这了,更多相关SpringBoot Redis消息队列内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://blog.csdn.net/jack1liu/article/details/113725818

查看更多关于SpringBoot集成Redis实现消息队列的方法的详细内容...

  阅读:25次