好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

springboot整合netty-mqtt-client实现Mqtt消息的订阅和发布示例

1.添加依赖

?

1

2

3

4

5

6

7

8

9

10

11

< dependency >

     < groupId >org.jetlinks</ groupId >

     < artifactId >netty-mqtt-client</ artifactId >

     < version >1.0.0</ version >

</ dependency >

< dependency >

     < groupId >junit</ groupId >

     < artifactId >junit</ artifactId >

     < version >4.13.2</ version >

     < scope >test</ scope >

</ dependency >

2.源码

application.yml

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

#mqtt配置

mqtt:

   username: admin

   password: 123456

   #推送信息的连接地址

   url: localhost

   port: 1884

   #默认发送的主题

   defaultTopic: topic

   #clientid

   clientId: client

   #连接超时时间 单位为秒

   completionTimeout: 300

   #设置会话心跳时间 单位为秒

   keepAliveInterval: 20

MqttProperties.java

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

import lombok.Data;

import org.springframework.boot.context.properties.ConfigurationProperties;

 

@Data

@ConfigurationProperties (prefix = "mqtt" )

public class MqttProperties {

 

     private String username;

     private String password;

     private String url;

     private int port;

     private String clientId;

     private String defaultTopic;

     private int completionTimeout;

     private int keepAliveInterval;

}

MqttConfig.java

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

import com.xingyun.netty.mqtt.prop.MqttProperties;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import lombok.AllArgsConstructor;

import org.jetlinks.mqtt.client.*;

import org.springframework.boot.context.properties.EnableConfigurationProperties;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

 

 

@AllArgsConstructor

@Configuration

@EnableConfigurationProperties (MqttProperties. class )

public class MqttConfig {

 

     private final MqttProperties mqttProperties;

 

     @Bean

     public MqttClientConfig getMqttClientConfig() {

         MqttClientConfig mqttClientConfig = new MqttClientConfig();

         mqttClientConfig.setClientId(mqttProperties.getClientId());

         mqttClientConfig.setUsername(mqttProperties.getClientId());

         mqttClientConfig.setPassword(mqttProperties.getPassword());

         /*mqttClientConfig.setTimeoutSeconds(mqttProperties.getCompletionTimeout());

         mqttClientConfig.setRetryInterval(mqttProperties.getKeepAliveInterval());

         mqttClientConfig.setProtocolVersion(MqttVersion.MQTT_3_1_1);

         mqttClientConfig.setReconnect(true);*/

         return mqttClientConfig;

     }

 

     @Bean

     public MqttClient getMqttClient(){

         EventLoopGroup loop = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2 );

 

         MqttClient mqttClient = new MqttClientImpl(getMqttClientConfig(), null );

         mqttClient.setEventLoop(loop);

         mqttClient.setCallback(getMqttClientCallback());

         mqttClient.connect(mqttProperties.getUrl(), mqttProperties.getPort()).addListener(future -> {

             if (future.isSuccess()){

                 System.out.println( "mqtt客户端已建立连接" );

                 //#为多层通配符,+为单层通配符

                 mqttClient.on( "#" ,getMqttHandler());

             }

         });

         return mqttClient;

     }

 

     @Bean

     public MqttHandler getMqttHandler(){

         return (topic,payload) ->  {

             System.out.println( "消息主题:" + topic);

             System.out.println( "消息内容:" + payload);

         };

     }

 

     @Bean

     public MqttClientCallback getMqttClientCallback(){

         return new MqttClientCallback() {

             @Override

             public void connectionLost(Throwable cause) {

                 cause.printStackTrace();

             }

 

             @Override

             public void onSuccessfulReconnect() {

                 System.out.println( "客户端已重连" );

             }

         };

     }

 

}

3.运行测试

客户端利用不同主题,发送消息

控制台

消息主题:testTopic/001
消息内容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 32, widx: 32, cap: 512))
消息主题:testTopic/001
消息内容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 32, widx: 32, cap: 512))
消息主题:test/sub/001
消息内容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 31, widx: 31, cap: 496))
消息主题:test1
消息内容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 24, cap: 496))

单元测试发布消息
MqttSeviceDemo.java

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import org.jetlinks.mqtt.client.MqttClient;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

 

@RunWith (SpringRunner. class )

@SpringBootTest

public class MqttSeviceDemo {

 

     @Autowired

     private MqttClient mqttClient;

 

     @Test

     public void publishMessage(){

         String test = "I am client9527" ;

         byte [] bytes = test.getBytes();

         ByteBuf byteBuf = Unpooled.copiedBuffer(bytes);

         mqttClient.publish( "test/pub/001" ,byteBuf);

         System.out.println( "消息已发布" );

     }

 

}

客户端订阅到消息

 到此这篇关于springboot整合netty-mqtt-client实现Mqtt消息的订阅和发布示例的文章就介绍到这了,更多相关springboot Mqtt消息订阅和发布内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://blog.csdn.net/juligang320/article/details/122744232

查看更多关于springboot整合netty-mqtt-client实现Mqtt消息的订阅和发布示例的详细内容...

  阅读:14次