好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Springboot整合mqtt服务的示例代码

首先在pom文件里引入mqtt的依赖配置

?

1

2

3

4

5

6

<!--mqtt-->

< dependency >

     < groupId >org.eclipse.paho</ groupId >

     < artifactId >org.eclipse.paho.client.mqttv3</ artifactId >

     < version >1.2.4</ version >

</ dependency >

其次在springboot 的配置yml文件,配置mqtt的服务配置

?

1

2

3

4

5

6

7

spring: 

   mqtt:

     url: tcp://127.0.0.1:1883

     client-id: niubility-tiger

     username:

     password:

     topic: [/unify/test]

创建 MqttProperties配置参数类

?

1

2

3

4

5

6

7

8

9

10

11

12

import lombok.Data;

import org.springframework.boot.context.properties.ConfigurationProperties;

 

@Data

@ConfigurationProperties ( "spring.mqtt" )

public class MqttProperties {

     private String url;

     private String clientId;

     private String username;

     private String password;

     private String[] topic;

}

创建 MqttConfiguration 配置类

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

import org.eclipse.paho.client.mqttv3.IMqttClient;

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springblade.core.tool.utils.Func;

import org.springblade.ubw.listener.MqttSubscribeListener;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.context.properties.EnableConfigurationProperties;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

 

@Configuration

@EnableConfigurationProperties ({MqttProperties. class })

public class MqttConfiguration {

     private static final Logger log = LoggerFactory.getLogger(MqttConfiguration. class );

     @Autowired

     private MqttProperties mqttProperties;

 

     public MqttConfiguration() {

     }

 

     @Bean

     public MqttConnectOptions mqttConnectOptions() {

         MqttConnectOptions connectOptions = new MqttConnectOptions();

         connectOptions.setServerURIs( new String[]{ this .mqttProperties.getUrl()});

         if (Func.isNotBlank( this .mqttProperties.getUrl())) {

             connectOptions.setUserName( this .mqttProperties.getUsername());

         }

 

         if (Func.isNotBlank( this .mqttProperties.getPassword())) {

             connectOptions.setPassword( this .mqttProperties.getPassword().toCharArray());

         }

 

         connectOptions.setKeepAliveInterval( 60 );

         return connectOptions;

     }

 

     @Bean

     public IMqttClient mqttClient(MqttConnectOptions options) throws MqttException {

         IMqttClient mqttClient = new MqttClient( this .mqttProperties.getUrl(), this .mqttProperties.getClientId());

         mqttClient.connect(options);

         for ( int i = 0 ; i< this .mqttProperties.getTopic().length; ++i) {

             mqttClient.subscribe( this .mqttProperties.getTopic()[i], new MqttSubscribeListener());

         }

         return mqttClient;

     }

}

创建 订阅事件类

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

import org.springframework.context.ApplicationEvent;

 

 

public class UWBMqttSubscribeEvent extends ApplicationEvent {

     private String topic;

 

     public UWBMqttSubscribeEvent(String topic, Object source) {

         super (source);

         this .topic = topic;

     }

 

     public String getTopic() {

         return this .topic;

     }

}

创建订阅事件监听器

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

import org.eclipse.paho.client.mqttv3.IMqttMessageListener;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.springblade.core.tool.utils.SpringUtil;

import org.springblade.ubw.event.UWBMqttSubscribeEvent;

 

 

public class MqttSubscribeListener implements IMqttMessageListener {

 

     @Override

     public void messageArrived(String s, MqttMessage mqttMessage) {

         String content = new String(mqttMessage.getPayload());

         UWBMqttSubscribeEvent event = new UWBMqttSubscribeEvent(s, content);

         SpringUtil.publishEvent(event);

     }

}

创建mqtt消息事件异步处理监听器

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

import com.baomidou.mybatisplus.core.toolkit.StringPool;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springblade.core.tool.utils.Func;

import org.springblade.ubw.config.MqttProperties;

import org.springblade.ubw.event.UWBMqttSubscribeEvent;

import org.springblade.ubw.service.MqttService;

import org.springframework.context.annotation.Configuration;

import org.springframework.context.event.EventListener;

import org.springframework.scheduling.annotation.Async;

 

import javax.annotation.Resource;

import java.util.Arrays;

import java.util.List;

 

 

@Configuration

public class MqttEventListener {

 

 

     private static final Logger log = LoggerFactory.getLogger(MqttEventListener. class );

 

     @Resource

     private MqttProperties mqttProperties;

 

     @Resource

     private MqttService mqttService;

 

     private String processTopic (String topic) {

         List<String> topics = Arrays.asList(mqttProperties.getTopic());

         for (String wild : topics) {

             wild = wild.replace(StringPool.HASH, StringPool.EMPTY);

             if (topic.startsWith(wild)) {

                 return topic.replace(wild, StringPool.EMPTY);

             }

         }

         return StringPool.EMPTY;

     }

 

 

     @Async

     @EventListener (UWBMqttSubscribeEvent. class )

     public void listen (UWBMqttSubscribeEvent event) {

         String topic = processTopic(event.getTopic());

         Object source = event.getSource();

         if (Func.isEmpty(source)) {

             return ;

         }

         mqttService.issue(topic,source);

//        log.info("mqtt接收到 通道 {} 的信息为:{}",topic,source);

     }

}

创建MqttService 数据处理服务类

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springblade.core.tool.utils.Func;

import org.springblade.ubw.area.entity.WorkArea;

import org.springblade.ubw.area.entity.WorkSite;

import org.springblade.ubw.area.entity.WorkSiteNeighbourInfo;

import org.springblade.ubw.area.entity.WorkSitePassInfo;

import org.springblade.ubw.area.service.WorkAreaService;

import org.springblade.ubw.area.service.WorkSiteNeighbourInfoService;

import org.springblade.ubw.area.service.WorkSitePassInfoService;

import org.springblade.ubw.area.service.WorkSiteService;

import org.springblade.ubw.constant.UbwConstant;

import org.springblade.ubw.history.entity.HistoryLocusInfo;

import org.springblade.ubw.history.entity.HistoryOverTimeSosAlarmInfo;

import org.springblade.ubw.history.service.HistoryLocusInfoService;

import org.springblade.ubw.history.service.HistoryOverTimeSosAlarmInfoService;

import org.springblade.ubw.loc.entity.LocStatusInfo;

import org.springblade.ubw.loc.entity.LocStatusInfoHistory;

import org.springblade.ubw.loc.service.LocStatusInfoHistoryService;

import org.springblade.ubw.loc.service.LocStatusInfoService;

import org.springblade.ubw.msg.entity.*;

import org.springblade.ubw.msg.service.*;

import org.springblade.ubw.system.entity.*;

import org.springblade.ubw.system.service.*;

import org.springblade.ubw.system.wrapper.MqttWrapper;

import org.springframework.stereotype.Service;

 

import javax.annotation.Resource;

import java.util.List;

import java.util.stream.Collectors;

 

 

@Service

public class MqttService {

 

     private static final Logger log = LoggerFactory.getLogger(MqttService. class );

 

     @Resource

     private EmployeeAndDepartmentService employeeAndDepartmentService;

 

     @Resource

     private VehicleInfoService vehicleInfoService;

 

     @Resource

     private WorkSiteService workSiteService;

 

     @Resource

     private LocStatusInfoService locStatusInfoService;

 

     @Resource

     private LocStatusInfoHistoryService locStatusInfoHistoryService;

 

     @Resource

     private LocOverTimeSosAlarminfoService locOverTimeSosAlarminfoService;

 

     @Resource

     private LocAreaOverSosAlarminfoService locAreaOverSosAlarmInfoService;

 

     @Resource

     private LocSosAlarminfoService locSosAlarminfoService;

 

     @Resource

     private AttendanceInfoService attendanceInfoService;

 

     @Resource

     private HistoryLocusInfoService historyLocusInfoService;

 

     @Resource

     private WorkSitePassInfoService workSitePassInfoService;

 

     @Resource

     private EnvironmentalMonitorInfoService environmentalMonitorInfoService;

 

     @Resource

     private TrAlertService trAlertService;

 

     @Resource

     private AddEvacuateInfoService addEvacuateInfoService;

 

     @Resource

     private CancelEvacuateInfoService cancelEvacuateInfoService;

 

     @Resource

     private WorkSiteNeighbourInfoService workSiteNeighbourInfoService;

 

     @Resource

     private LinkMsgAlarmInfoService linkMsgAlarmInfoService;

 

     @Resource

     private LeaderEmployeeInfoService leaderEmployeeInfoService;

 

     @Resource

     private ElectricMsgInfoService electricMsgInfoService;

 

     @Resource

     private WorkAreaService workAreaService;

 

     @Resource

     private HistoryOverTimeSosAlarmInfoService historyOverTimeSosAlarmInfoService;

 

     @Resource

     private SpecialWorksService specialWorksService;

 

     @Resource

     private AttendanceLocusInfoService attendanceLocusInfoService;

 

     @Resource

     private WorkTypeService workTypeService;

 

     @Resource

     private OfficePositionService officePositionService;

 

     @Resource

     private ClassTeamService classTeamService;

 

     /**

      * 方法描述: 消息分发

      *

      * @param topic

      * @param source

      * @author liwenbin

      * @date 2021年12月14日 14:14:09

      */

     public void issue(String topic,Object source){

         switch (topic){

             case UbwConstant.TOPIC_EMP :

                 //人员和部门信息

                 employeeAndDepartmentService.saveBatch(source);

                 break ;

             case UbwConstant.TOPIC_VEHICLE :

                 //车辆信息

                 List<VehicleInfo> vehicleInfos = MqttWrapper.build().toEntityList(source, new VehicleInfo());

                 vehicleInfoService.deleteAll();

                 vehicleInfoService.saveBatch(vehicleInfos);

                 break ;

             case UbwConstant.TOPIC_WORK_SITE :

                 //基站信息

                 List<WorkSite> workSites = MqttWrapper.build().toEntityList(source, new WorkSite());

                 workSiteService.deleteAll();

                 workSiteService.saveBatch(workSites);

                 break ;

             case UbwConstant.TOPIC_LOC_STATUS:

                 //井下车辆人员实时

                 List<LocStatusInfo> locStatusInfos = MqttWrapper.build().toEntityList(source, new LocStatusInfo());

                 if (Func.isEmpty(locStatusInfos)){

                     break ;

                 }

                 locStatusInfoService.deleteAll();

                 //筛选入井人员列表

                 List<LocStatusInfo> inWellList = locStatusInfos.stream().filter(s -> s.getIsInWell() == 1 ).collect(Collectors.toList());

                 locStatusInfoService.saveBatch(inWellList);

                 //人员历史数据入库

                 List<LocStatusInfoHistory> locStatusInfoHistorys = MqttWrapper.build().toEntityList(source, new LocStatusInfoHistory());

                 locStatusInfoHistoryService.saveBatch(locStatusInfoHistorys);

                 break ;

             case UbwConstant.TOPIC_LOC_OVER_TIME:

                 //超时报警信息

                 List<LocOverTimeSosAlarminfo> locOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source, new LocOverTimeSosAlarminfo());

                 locOverTimeSosAlarminfoService.saveBatch(locOverTimeSosAlarmInfos);

                 break ;

             case UbwConstant.TOPIC_LOC_OVER_AREA:

                 //超员报警信息

                 List<LocAreaOverSosAlarminfo> locAreaOverSosAlarmInfos = MqttWrapper.build().toEntityList(source, new LocAreaOverSosAlarminfo());

                 locAreaOverSosAlarmInfoService.saveBatch(locAreaOverSosAlarmInfos);

                 break ;

             case UbwConstant.TOPIC_LOC_SOS:

                 //求救报警信息

                 List<LocSosAlarminfo> locSosAlarmInfos = MqttWrapper.build().toEntityList(source, new LocSosAlarminfo());

                 locSosAlarminfoService.saveBatch(locSosAlarmInfos);

                 break ;

             case UbwConstant.TOPIC_ATTEND:

                 //考勤信息

                 List<AttendanceInfo> attendanceInfos = MqttWrapper.build().toEntityList(source, new AttendanceInfo());

                 attendanceInfoService.saveBatch(attendanceInfos);

                 break ;

             case UbwConstant.TOPIC_HISTORY_LOCUS:

                 //精确轨迹信息

                 List<HistoryLocusInfo> historyLocusInfos = MqttWrapper.build().toEntityList(source, new HistoryLocusInfo());

                 historyLocusInfoService.saveBatch(historyLocusInfos);

                 break ;

             case UbwConstant.TOPIC_WORK_SITE_PASS:

                 //基站经过信息

                 List<WorkSitePassInfo> workSitePassInfos = MqttWrapper.build().toEntityList(source, new WorkSitePassInfo());

                 workSitePassInfoService.saveBatch(workSitePassInfos);

                 break ;

             case UbwConstant.TOPIC_ENV_MON:

                 //环境监测信息

                 List<EnvironmentalMonitorInfo> environmentalMonitorInfos = MqttWrapper.build().toEntityList(source, new EnvironmentalMonitorInfo());

                 environmentalMonitorInfoService.saveBatch(environmentalMonitorInfos);

                 break ;

             case UbwConstant.TOPIC_TR_ALERT:

                 //环境监测报警信息

                 List<TrAlert> trAlerts = MqttWrapper.build().toEntityList(source, new TrAlert());

                 trAlertService.saveBatch(trAlerts);

                 break ;

             case UbwConstant.TOPIC_ADD_EVA:

                 //下发撤离信息

                 List<AddEvacuateInfo> addEvacuateInfos = MqttWrapper.build().toEntityList(source, new AddEvacuateInfo());

                 addEvacuateInfoService.saveBatch(addEvacuateInfos);

                 break ;

             case UbwConstant.TOPIC_CANCEL_EVA:

                 //取消撤离信息

                 List<CancelEvacuateInfo> cancelEvacuateInfos = MqttWrapper.build().toEntityList(source, new CancelEvacuateInfo());

                 cancelEvacuateInfoService.saveBatch(cancelEvacuateInfos);

                 break ;

             case UbwConstant.TOPIC_WORK_SITE_NEI:

                 //相邻基站关系信息

                 workSiteNeighbourInfoService.deleteAll();

                 List<WorkSiteNeighbourInfo> workSiteNeighbourInfos = MqttWrapper.build().toEntityList(source, new WorkSiteNeighbourInfo());

                 workSiteNeighbourInfoService.saveBatch(workSiteNeighbourInfos);

                 break ;

             case UbwConstant.TOPIC_LINK_MSG:

                 //基站链路信息

                 linkMsgAlarmInfoService.deleteAll();

                 List<LinkMsgAlarmInfo> linkMsgAlarmInfos = MqttWrapper.build().toEntityList(source, new LinkMsgAlarmInfo());

                 linkMsgAlarmInfoService.saveBatch(linkMsgAlarmInfos);

                 break ;

             case UbwConstant.TOPIC_LEADER_EMP:

                 //带班领导信息

                 leaderEmployeeInfoService.deleteAll();

                 List<LeaderEmployeeInfo> leaderEmployeeInfos = MqttWrapper.build().toEntityList(source, new LeaderEmployeeInfo());

                 leaderEmployeeInfoService.saveBatch(leaderEmployeeInfos);

                 break ;

             case UbwConstant.TOPIC_ELE_MSG:

                 //低电报警信息

                 List<ElectricMsgInfo> electricMsgInfos = MqttWrapper.build().toEntityList(source, new ElectricMsgInfo());

                 electricMsgInfoService.saveBatch(electricMsgInfos);

                 break ;

             case UbwConstant.TOPIC_WORK_AREA:

                 //区域信息

                 workAreaService.deleteAll();

                 List<WorkArea> workAreas = MqttWrapper.build().toEntityList(source, new WorkArea());

                 workAreaService.saveBatch(workAreas);

                 break ;

             case UbwConstant.TOPIC_HIS_OVER_TIME_SOS:

                 //历史超时报警信息

                 List<HistoryOverTimeSosAlarmInfo> historyOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source, new HistoryOverTimeSosAlarmInfo());

                 historyOverTimeSosAlarmInfoService.saveBatch(historyOverTimeSosAlarmInfos);

                 break ;

             case UbwConstant.TOPIC_SPECIAL_WORK:

                 //特种人员预设线路信息

                 specialWorksService.deleteAll();

                 List<SpecialWorks> specialWorks = MqttWrapper.build().toEntityList(source, new SpecialWorks());

                 specialWorksService.saveBatch(specialWorks);

                 break ;

             case UbwConstant.TOPIC_ATTEND_LOC:

                 //历史考勤轨迹信息

                 List<AttendanceLocusInfo> attendanceLocusInfos = MqttWrapper.build().toEntityList(source, new AttendanceLocusInfo());

                 attendanceLocusInfoService.saveBatch(attendanceLocusInfos);

                 break ;

             case UbwConstant.TOPIC_WORK_TYPE:

                 //工种信息

                 workTypeService.deleteAll();

                 List<WorkType> workTypes = MqttWrapper.build().toEntityList(source, new WorkType());

                 workTypeService.saveBatch(workTypes);

                 break ;

             case UbwConstant.TOPIC_OFFICE_POS:

                 //职务信息

                 officePositionService.deleteAll();

                 List<OfficePosition> officePositions = MqttWrapper.build().toEntityList(source, new OfficePosition());

                 officePositionService.saveBatch(officePositions);

                 break ;

             case UbwConstant.TOPIC_CLASS_TEAM:

                 //班组信息

                 classTeamService.deleteAll();

                 List<ClassTeam> classTeams = MqttWrapper.build().toEntityList(source, new ClassTeam());

                 classTeamService.saveBatch(classTeams);

                 break ;

             default : //可选

                 break ;

         }

     }

}

完结,小伙伴们,可以根据这个demo 改造自己的mqtt服务处理!!!

以上就是Springboot整合mqtt服务的示例代码的详细内容,更多关于Springboot整合mqtt的资料请关注其它相关文章!

原文链接:https://blog.csdn.net/weixin_40986713/article/details/123572101

查看更多关于Springboot整合mqtt服务的示例代码的详细内容...

  阅读:21次