首先在pom文件里引入mqtt的依赖配置
1 2 3 4 5 6 |
<!--mqtt--> < dependency > < groupId >org.eclipse.paho</ groupId > < artifactId >org.eclipse.paho.client.mqttv3</ artifactId > < version >1.2.4</ version > </ dependency > |
其次在springboot 的配置yml文件,配置mqtt的服务配置
1 2 3 4 5 6 7 |
spring: mqtt: url: tcp://127.0.0.1:1883 client-id: niubility-tiger username: password: topic: [/unify/test] |
创建 MqttProperties配置参数类
1 2 3 4 5 6 7 8 9 10 11 12 |
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties;
@Data @ConfigurationProperties ( "spring.mqtt" ) public class MqttProperties { private String url; private String clientId; private String username; private String password; private String[] topic; } |
创建 MqttConfiguration 配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
import org.eclipse.paho.client.mqttv3.IMqttClient; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springblade.core.tool.utils.Func; import org.springblade.ubw.listener.MqttSubscribeListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration @EnableConfigurationProperties ({MqttProperties. class }) public class MqttConfiguration { private static final Logger log = LoggerFactory.getLogger(MqttConfiguration. class ); @Autowired private MqttProperties mqttProperties;
public MqttConfiguration() { }
@Bean public MqttConnectOptions mqttConnectOptions() { MqttConnectOptions connectOptions = new MqttConnectOptions(); connectOptions.setServerURIs( new String[]{ this .mqttProperties.getUrl()}); if (Func.isNotBlank( this .mqttProperties.getUrl())) { connectOptions.setUserName( this .mqttProperties.getUsername()); }
if (Func.isNotBlank( this .mqttProperties.getPassword())) { connectOptions.setPassword( this .mqttProperties.getPassword().toCharArray()); }
connectOptions.setKeepAliveInterval( 60 ); return connectOptions; }
@Bean public IMqttClient mqttClient(MqttConnectOptions options) throws MqttException { IMqttClient mqttClient = new MqttClient( this .mqttProperties.getUrl(), this .mqttProperties.getClientId()); mqttClient.connect(options); for ( int i = 0 ; i< this .mqttProperties.getTopic().length; ++i) { mqttClient.subscribe( this .mqttProperties.getTopic()[i], new MqttSubscribeListener()); } return mqttClient; } } |
创建 订阅事件类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import org.springframework.context.ApplicationEvent;
public class UWBMqttSubscribeEvent extends ApplicationEvent { private String topic;
public UWBMqttSubscribeEvent(String topic, Object source) { super (source); this .topic = topic; }
public String getTopic() { return this .topic; } } |
创建订阅事件监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springblade.core.tool.utils.SpringUtil; import org.springblade.ubw.event.UWBMqttSubscribeEvent;
public class MqttSubscribeListener implements IMqttMessageListener {
@Override public void messageArrived(String s, MqttMessage mqttMessage) { String content = new String(mqttMessage.getPayload()); UWBMqttSubscribeEvent event = new UWBMqttSubscribeEvent(s, content); SpringUtil.publishEvent(event); } } |
创建mqtt消息事件异步处理监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
import com.baomidou.mybatisplus.core.toolkit.StringPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springblade.core.tool.utils.Func; import org.springblade.ubw.config.MqttProperties; import org.springblade.ubw.event.UWBMqttSubscribeEvent; import org.springblade.ubw.service.MqttService; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async;
import javax.annotation.Resource; import java.util.Arrays; import java.util.List;
@Configuration public class MqttEventListener {
private static final Logger log = LoggerFactory.getLogger(MqttEventListener. class );
@Resource private MqttProperties mqttProperties;
@Resource private MqttService mqttService;
private String processTopic (String topic) { List<String> topics = Arrays.asList(mqttProperties.getTopic()); for (String wild : topics) { wild = wild.replace(StringPool.HASH, StringPool.EMPTY); if (topic.startsWith(wild)) { return topic.replace(wild, StringPool.EMPTY); } } return StringPool.EMPTY; }
@Async @EventListener (UWBMqttSubscribeEvent. class ) public void listen (UWBMqttSubscribeEvent event) { String topic = processTopic(event.getTopic()); Object source = event.getSource(); if (Func.isEmpty(source)) { return ; } mqttService.issue(topic,source); // log.info("mqtt接收到 通道 {} 的信息为:{}",topic,source); } } |
创建MqttService 数据处理服务类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springblade.core.tool.utils.Func; import org.springblade.ubw.area.entity.WorkArea; import org.springblade.ubw.area.entity.WorkSite; import org.springblade.ubw.area.entity.WorkSiteNeighbourInfo; import org.springblade.ubw.area.entity.WorkSitePassInfo; import org.springblade.ubw.area.service.WorkAreaService; import org.springblade.ubw.area.service.WorkSiteNeighbourInfoService; import org.springblade.ubw.area.service.WorkSitePassInfoService; import org.springblade.ubw.area.service.WorkSiteService; import org.springblade.ubw.constant.UbwConstant; import org.springblade.ubw.history.entity.HistoryLocusInfo; import org.springblade.ubw.history.entity.HistoryOverTimeSosAlarmInfo; import org.springblade.ubw.history.service.HistoryLocusInfoService; import org.springblade.ubw.history.service.HistoryOverTimeSosAlarmInfoService; import org.springblade.ubw.loc.entity.LocStatusInfo; import org.springblade.ubw.loc.entity.LocStatusInfoHistory; import org.springblade.ubw.loc.service.LocStatusInfoHistoryService; import org.springblade.ubw.loc.service.LocStatusInfoService; import org.springblade.ubw.msg.entity.*; import org.springblade.ubw.msg.service.*; import org.springblade.ubw.system.entity.*; import org.springblade.ubw.system.service.*; import org.springblade.ubw.system.wrapper.MqttWrapper; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import java.util.List; import java.util.stream.Collectors;
@Service public class MqttService {
private static final Logger log = LoggerFactory.getLogger(MqttService. class );
@Resource private EmployeeAndDepartmentService employeeAndDepartmentService;
@Resource private VehicleInfoService vehicleInfoService;
@Resource private WorkSiteService workSiteService;
@Resource private LocStatusInfoService locStatusInfoService;
@Resource private LocStatusInfoHistoryService locStatusInfoHistoryService;
@Resource private LocOverTimeSosAlarminfoService locOverTimeSosAlarminfoService;
@Resource private LocAreaOverSosAlarminfoService locAreaOverSosAlarmInfoService;
@Resource private LocSosAlarminfoService locSosAlarminfoService;
@Resource private AttendanceInfoService attendanceInfoService;
@Resource private HistoryLocusInfoService historyLocusInfoService;
@Resource private WorkSitePassInfoService workSitePassInfoService;
@Resource private EnvironmentalMonitorInfoService environmentalMonitorInfoService;
@Resource private TrAlertService trAlertService;
@Resource private AddEvacuateInfoService addEvacuateInfoService;
@Resource private CancelEvacuateInfoService cancelEvacuateInfoService;
@Resource private WorkSiteNeighbourInfoService workSiteNeighbourInfoService;
@Resource private LinkMsgAlarmInfoService linkMsgAlarmInfoService;
@Resource private LeaderEmployeeInfoService leaderEmployeeInfoService;
@Resource private ElectricMsgInfoService electricMsgInfoService;
@Resource private WorkAreaService workAreaService;
@Resource private HistoryOverTimeSosAlarmInfoService historyOverTimeSosAlarmInfoService;
@Resource private SpecialWorksService specialWorksService;
@Resource private AttendanceLocusInfoService attendanceLocusInfoService;
@Resource private WorkTypeService workTypeService;
@Resource private OfficePositionService officePositionService;
@Resource private ClassTeamService classTeamService;
/** * 方法描述: 消息分发 * * @param topic * @param source * @author liwenbin * @date 2021年12月14日 14:14:09 */ public void issue(String topic,Object source){ switch (topic){ case UbwConstant.TOPIC_EMP : //人员和部门信息 employeeAndDepartmentService.saveBatch(source); break ; case UbwConstant.TOPIC_VEHICLE : //车辆信息 List<VehicleInfo> vehicleInfos = MqttWrapper.build().toEntityList(source, new VehicleInfo()); vehicleInfoService.deleteAll(); vehicleInfoService.saveBatch(vehicleInfos); break ; case UbwConstant.TOPIC_WORK_SITE : //基站信息 List<WorkSite> workSites = MqttWrapper.build().toEntityList(source, new WorkSite()); workSiteService.deleteAll(); workSiteService.saveBatch(workSites); break ; case UbwConstant.TOPIC_LOC_STATUS: //井下车辆人员实时 List<LocStatusInfo> locStatusInfos = MqttWrapper.build().toEntityList(source, new LocStatusInfo()); if (Func.isEmpty(locStatusInfos)){ break ; } locStatusInfoService.deleteAll(); //筛选入井人员列表 List<LocStatusInfo> inWellList = locStatusInfos.stream().filter(s -> s.getIsInWell() == 1 ).collect(Collectors.toList()); locStatusInfoService.saveBatch(inWellList); //人员历史数据入库 List<LocStatusInfoHistory> locStatusInfoHistorys = MqttWrapper.build().toEntityList(source, new LocStatusInfoHistory()); locStatusInfoHistoryService.saveBatch(locStatusInfoHistorys); break ; case UbwConstant.TOPIC_LOC_OVER_TIME: //超时报警信息 List<LocOverTimeSosAlarminfo> locOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source, new LocOverTimeSosAlarminfo()); locOverTimeSosAlarminfoService.saveBatch(locOverTimeSosAlarmInfos); break ; case UbwConstant.TOPIC_LOC_OVER_AREA: //超员报警信息 List<LocAreaOverSosAlarminfo> locAreaOverSosAlarmInfos = MqttWrapper.build().toEntityList(source, new LocAreaOverSosAlarminfo()); locAreaOverSosAlarmInfoService.saveBatch(locAreaOverSosAlarmInfos); break ; case UbwConstant.TOPIC_LOC_SOS: //求救报警信息 List<LocSosAlarminfo> locSosAlarmInfos = MqttWrapper.build().toEntityList(source, new LocSosAlarminfo()); locSosAlarminfoService.saveBatch(locSosAlarmInfos); break ; case UbwConstant.TOPIC_ATTEND: //考勤信息 List<AttendanceInfo> attendanceInfos = MqttWrapper.build().toEntityList(source, new AttendanceInfo()); attendanceInfoService.saveBatch(attendanceInfos); break ; case UbwConstant.TOPIC_HISTORY_LOCUS: //精确轨迹信息 List<HistoryLocusInfo> historyLocusInfos = MqttWrapper.build().toEntityList(source, new HistoryLocusInfo()); historyLocusInfoService.saveBatch(historyLocusInfos); break ; case UbwConstant.TOPIC_WORK_SITE_PASS: //基站经过信息 List<WorkSitePassInfo> workSitePassInfos = MqttWrapper.build().toEntityList(source, new WorkSitePassInfo()); workSitePassInfoService.saveBatch(workSitePassInfos); break ; case UbwConstant.TOPIC_ENV_MON: //环境监测信息 List<EnvironmentalMonitorInfo> environmentalMonitorInfos = MqttWrapper.build().toEntityList(source, new EnvironmentalMonitorInfo()); environmentalMonitorInfoService.saveBatch(environmentalMonitorInfos); break ; case UbwConstant.TOPIC_TR_ALERT: //环境监测报警信息 List<TrAlert> trAlerts = MqttWrapper.build().toEntityList(source, new TrAlert()); trAlertService.saveBatch(trAlerts); break ; case UbwConstant.TOPIC_ADD_EVA: //下发撤离信息 List<AddEvacuateInfo> addEvacuateInfos = MqttWrapper.build().toEntityList(source, new AddEvacuateInfo()); addEvacuateInfoService.saveBatch(addEvacuateInfos); break ; case UbwConstant.TOPIC_CANCEL_EVA: //取消撤离信息 List<CancelEvacuateInfo> cancelEvacuateInfos = MqttWrapper.build().toEntityList(source, new CancelEvacuateInfo()); cancelEvacuateInfoService.saveBatch(cancelEvacuateInfos); break ; case UbwConstant.TOPIC_WORK_SITE_NEI: //相邻基站关系信息 workSiteNeighbourInfoService.deleteAll(); List<WorkSiteNeighbourInfo> workSiteNeighbourInfos = MqttWrapper.build().toEntityList(source, new WorkSiteNeighbourInfo()); workSiteNeighbourInfoService.saveBatch(workSiteNeighbourInfos); break ; case UbwConstant.TOPIC_LINK_MSG: //基站链路信息 linkMsgAlarmInfoService.deleteAll(); List<LinkMsgAlarmInfo> linkMsgAlarmInfos = MqttWrapper.build().toEntityList(source, new LinkMsgAlarmInfo()); linkMsgAlarmInfoService.saveBatch(linkMsgAlarmInfos); break ; case UbwConstant.TOPIC_LEADER_EMP: //带班领导信息 leaderEmployeeInfoService.deleteAll(); List<LeaderEmployeeInfo> leaderEmployeeInfos = MqttWrapper.build().toEntityList(source, new LeaderEmployeeInfo()); leaderEmployeeInfoService.saveBatch(leaderEmployeeInfos); break ; case UbwConstant.TOPIC_ELE_MSG: //低电报警信息 List<ElectricMsgInfo> electricMsgInfos = MqttWrapper.build().toEntityList(source, new ElectricMsgInfo()); electricMsgInfoService.saveBatch(electricMsgInfos); break ; case UbwConstant.TOPIC_WORK_AREA: //区域信息 workAreaService.deleteAll(); List<WorkArea> workAreas = MqttWrapper.build().toEntityList(source, new WorkArea()); workAreaService.saveBatch(workAreas); break ; case UbwConstant.TOPIC_HIS_OVER_TIME_SOS: //历史超时报警信息 List<HistoryOverTimeSosAlarmInfo> historyOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source, new HistoryOverTimeSosAlarmInfo()); historyOverTimeSosAlarmInfoService.saveBatch(historyOverTimeSosAlarmInfos); break ; case UbwConstant.TOPIC_SPECIAL_WORK: //特种人员预设线路信息 specialWorksService.deleteAll(); List<SpecialWorks> specialWorks = MqttWrapper.build().toEntityList(source, new SpecialWorks()); specialWorksService.saveBatch(specialWorks); break ; case UbwConstant.TOPIC_ATTEND_LOC: //历史考勤轨迹信息 List<AttendanceLocusInfo> attendanceLocusInfos = MqttWrapper.build().toEntityList(source, new AttendanceLocusInfo()); attendanceLocusInfoService.saveBatch(attendanceLocusInfos); break ; case UbwConstant.TOPIC_WORK_TYPE: //工种信息 workTypeService.deleteAll(); List<WorkType> workTypes = MqttWrapper.build().toEntityList(source, new WorkType()); workTypeService.saveBatch(workTypes); break ; case UbwConstant.TOPIC_OFFICE_POS: //职务信息 officePositionService.deleteAll(); List<OfficePosition> officePositions = MqttWrapper.build().toEntityList(source, new OfficePosition()); officePositionService.saveBatch(officePositions); break ; case UbwConstant.TOPIC_CLASS_TEAM: //班组信息 classTeamService.deleteAll(); List<ClassTeam> classTeams = MqttWrapper.build().toEntityList(source, new ClassTeam()); classTeamService.saveBatch(classTeams); break ; default : //可选 break ; } } } |
完结,小伙伴们,可以根据这个demo 改造自己的mqtt服务处理!!!
以上就是Springboot整合mqtt服务的示例代码的详细内容,更多关于Springboot整合mqtt的资料请关注其它相关文章!
原文链接:https://blog.csdn.net/weixin_40986713/article/details/123572101
查看更多关于Springboot整合mqtt服务的示例代码的详细内容...