1.添加依赖
1 2 3 4 5 6 7 8 9 10 11 |
< dependency > < groupId >org.jetlinks</ groupId > < artifactId >netty-mqtt-client</ artifactId > < version >1.0.0</ version > </ dependency > < dependency > < groupId >junit</ groupId > < artifactId >junit</ artifactId > < version >4.13.2</ version > < scope >test</ scope > </ dependency > |
2.源码
application.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
#mqtt配置 mqtt: username: admin password: 123456 #推送信息的连接地址 url: localhost port: 1884 #默认发送的主题 defaultTopic: topic #clientid clientId: client #连接超时时间 单位为秒 completionTimeout: 300 #设置会话心跳时间 单位为秒 keepAliveInterval: 20 |
MqttProperties.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties;
@Data @ConfigurationProperties (prefix = "mqtt" ) public class MqttProperties {
private String username; private String password; private String url; private int port; private String clientId; private String defaultTopic; private int completionTimeout; private int keepAliveInterval; } |
MqttConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
import com.xingyun.netty.mqtt.prop.MqttProperties; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import lombok.AllArgsConstructor; import org.jetlinks.mqtt.client.*; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@AllArgsConstructor @Configuration @EnableConfigurationProperties (MqttProperties. class ) public class MqttConfig {
private final MqttProperties mqttProperties;
@Bean public MqttClientConfig getMqttClientConfig() { MqttClientConfig mqttClientConfig = new MqttClientConfig(); mqttClientConfig.setClientId(mqttProperties.getClientId()); mqttClientConfig.setUsername(mqttProperties.getClientId()); mqttClientConfig.setPassword(mqttProperties.getPassword()); /*mqttClientConfig.setTimeoutSeconds(mqttProperties.getCompletionTimeout()); mqttClientConfig.setRetryInterval(mqttProperties.getKeepAliveInterval()); mqttClientConfig.setProtocolVersion(MqttVersion.MQTT_3_1_1); mqttClientConfig.setReconnect(true);*/ return mqttClientConfig; }
@Bean public MqttClient getMqttClient(){ EventLoopGroup loop = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2 );
MqttClient mqttClient = new MqttClientImpl(getMqttClientConfig(), null ); mqttClient.setEventLoop(loop); mqttClient.setCallback(getMqttClientCallback()); mqttClient.connect(mqttProperties.getUrl(), mqttProperties.getPort()).addListener(future -> { if (future.isSuccess()){ System.out.println( "mqtt客户端已建立连接" ); //#为多层通配符,+为单层通配符 mqttClient.on( "#" ,getMqttHandler()); } }); return mqttClient; }
@Bean public MqttHandler getMqttHandler(){ return (topic,payload) -> { System.out.println( "消息主题:" + topic); System.out.println( "消息内容:" + payload); }; }
@Bean public MqttClientCallback getMqttClientCallback(){ return new MqttClientCallback() { @Override public void connectionLost(Throwable cause) { cause.printStackTrace(); }
@Override public void onSuccessfulReconnect() { System.out.println( "客户端已重连" ); } }; }
} |
3.运行测试
客户端利用不同主题,发送消息
控制台
消息主题:testTopic/001
消息内容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 32, widx: 32, cap: 512))
消息主题:testTopic/001
消息内容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 32, widx: 32, cap: 512))
消息主题:test/sub/001
消息内容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 31, widx: 31, cap: 496))
消息主题:test1
消息内容:PooledSlicedByteBuf(ridx: 0, widx: 15, cap: 15/15, unwrapped: PooledUnsafeDirectByteBuf(ridx: 24, widx: 24, cap: 496))
单元测试发布消息
MqttSeviceDemo.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.jetlinks.mqtt.client.MqttClient; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;
@RunWith (SpringRunner. class ) @SpringBootTest public class MqttSeviceDemo {
@Autowired private MqttClient mqttClient;
@Test public void publishMessage(){ String test = "I am client9527" ; byte [] bytes = test.getBytes(); ByteBuf byteBuf = Unpooled.copiedBuffer(bytes); mqttClient.publish( "test/pub/001" ,byteBuf); System.out.println( "消息已发布" ); }
} |
客户端订阅到消息
到此这篇关于springboot整合netty-mqtt-client实现Mqtt消息的订阅和发布示例的文章就介绍到这了,更多相关springboot Mqtt消息订阅和发布内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
原文链接:https://blog.csdn.net/juligang320/article/details/122744232
查看更多关于springboot整合netty-mqtt-client实现Mqtt消息的订阅和发布示例的详细内容...