SpringBoot通过MQTT接入IOT设备
IOT设备:理工智控(武汉)科技有限公司 所属产品:智能转换器-新国标
SpringBoot版本:3.2.10
RabbitMQ版本:RabbitMQ 4.0.2
Maven版本:3.6.1
JDK版本:21
MySQL版本:5.7.31
步骤一:先进行iot设备配网,可以根据设备的官方文档来进行
配网首页:

WiFi配置:

MQTT配置(自定义配置):

这个MQTT服务使用的是上一篇文章搭建的RabbitMQ MQTT服务,MQTT地址搭建MQTT服务的服务器地址,端口是MQTT服务的端口,客户id、发布主题和订阅主题都是自定义的其中发布订阅/订阅主题主题官方推荐格式/${appId}/${deviceKey}/${deviceMAC}/publish和/${appId}/${deviceKey}/${deviceMAC}/subscribe用户名密码是RabbitMQ的密码,默认都是guest,在配置完成并保存后,在主页面选择重启设备选项,当设备重新启动后这些配置才会生效。
验证iot是否连接到到自定义设备,可以进入RabbitMQ的管理页面查看,在connections中看到有一个连接设备,那么这个设备就是我们的iot设备了(之前没有任何设备连接的情况下)

步骤二:编写SpringBoot代码使其连接到MQTT,并实现发送MQTT消息控制iot设备并保存设备的返回的信息到数据库中。
数据库message_info表建表语句:
CREATE TABLE `message_info` (
`id` varchar(255) NOT NULL,
`message` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
项目pom文件如下:
4.0.0
org.springframework.boot
spring-boot-starter-parent
3.2.10
org.example
demo
0.0.1-SNAPSHOT
demo
demo
21
com.alibaba
druid
1.2.20
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-integration
3.2.9
mysql
mysql-connector-java
8.0.33
com.baomidou
mybatis-plus-spring-boot3-starter
3.5.7
org.springframework.integration
spring-integration-stream
org.springframework.integration
spring-integration-mqtt
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
org.springframework.boot
spring-boot-maven-plugin
org.projectlombok
lombok
配置文件(application.yml)如下:
spring:
application:
name: BaILcBsjuYyF_3
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: #这里填入mysql密码
url: jdbc:mysql://127.0.0.1:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useSSL=false&allowPublicKeyRetrieval=true&useLegacyDatetimeCode=false
type: com.alibaba.druid.pool.DruidDataSource
mybatis-plus:
global-config:
db-config:
id-type: assign_id
mqtt:
url: #这里填入rabbitmq的地址,格式:tcp://ip:mqtt端口
username: #填入rabbitmq用户名
password: #填入rabbitmq密码
MqttCallBack配置类,接收MQTT的消息回调:
package com.example.demo.config;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class MqttCallBack implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
log.error("断开连接");
}
/**
* 消息到达的回调
*/
@Override
public void messageArrived(String topic, MqttMessage message) {
log.info("主题:{}, qos:{}, 消息内容:{}已到达",topic,message.getQos(),message.getPayload());
}
/**
* 消息发布成功的回调
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
IMqttAsyncClient client = token.getClient();
log.info("客户端id:{},消息发送消息成功!",client.getClientId());
}
}
MqttUtils配置类,用于创建MQTT连接并封装一些常用的比如发送消息和销毁链接等方法,具体如下:
package com.example.demo.config;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqttUtils {
private static final Logger logger = LoggerFactory.getLogger(MqttUtils.class);
@Value("${mqtt.username}")
private String username;//MQTT用户名
@Value("${mqtt.password}")
private String password;//MQTT密码
@Value("${mqtt.url}")
private String brokerUrl;//MQTT地址
@Value("${spring.application.name}")
private String applicationName;//clientId
private MqttClient client;
/**
* 项目启动时初始化MQTT连接
*/
@PostConstruct
public void init() {
connect();
}
/**
* 对象被回收时释放连接
*/
@PreDestroy
public void disConnect() {
try {
if (client != null && client.isConnected()) {
client.disconnect();
}
} catch (MqttException e) {
logger.error("Failed to disconnect from MQTT broker", e);
}
}
private void connect() {
try {
//创建链接
client = new MqttClient(brokerUrl, applicationName, new MemoryPersistence());
// 设置连接选项
MqttConnectOptions options = new MqttConnectOptions();
//是否每次连接都是新连接(是否复用session)
options.setCleanSession(true);
//设置MQTT用户名
options.setUserName(username);
// 设置MQTT密码
options.setPassword(password.toCharArray());
//设置连接超时时间
options.setConnectionTimeout(100);
//心跳检测
options.setKeepAliveInterval(20);
//遗嘱消息
options.setWill("willTopic", (applicationName + "与服务器断开连接").getBytes(), 0, false);
// 消息回调
client.setCallback(new MqttCallBack());
// 创建MQTT连接
client.connect(options);
} catch (MqttException e) {
logger.error("Failed to connect to MQTT broker", e);
}
}
/**
* 消息发送
* @param topic 主题
* @param message 消息
* @return 是否发送成功
*/
public boolean publish(String topic, String message) {
try {
//创建MQTT消息实体
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
/**
* 设置Qos
* 0:最多一次交付,设置为0后消息发布完成,对消息的投递没有任何确认或重传机制,如果发送失败消息被丢弃
* 1:至少一次交付,设置为1后保证消息至少会被传递一次,如果消费失败消息会被存储等待客户端重新监听后发送
* 2:恰好一次交付,设置为2后保证消息只会被传第一次,并且确保消息只被接收者接收到一次
*/
mqttMessage.setQos(2);
//消息持久化,并且在新的客户端订阅时自动转发该消息
mqttMessage.setRetained(false);
//获取主题
MqttTopic mqttTopic = client.getTopic(topic);
//发送消息至主题
mqttTopic.publish(mqttMessage).waitForCompletion();
return true;
} catch (MqttException e) {
logger.error("Failed to publish message to topic {}", topic, e);
return false;
}
}
}
MqttListen类,用于订阅主题,并将接收到的消息保存入数据库中:
package com.example.demo.config;
import com.example.demo.service.MqttMessageService;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import java.util.Random;
@Slf4j
@Configuration
public class MqttListen {
@Autowired
private MqttMessageService mqttMessageService;
@Value("${mqtt.username}")
private String username;//MQTT用户名
@Value("${mqtt.password}")
private String password;//MQTT密码
@Value("${mqtt.url}")
private String brokerUrl;//MQTT地址
private MqttClient client;
private final String clientIdPrefix = "BaILcBsjuYyF_";//客户端id前缀
@PostConstruct
public void init() {
connect();
}
@PreDestroy
public void destroy() {
disconnect();
}
public void connect() {
try {
//设置客户端id
String clientId = clientIdPrefix + new Random().nextInt(100000);
//创建mqtt客户端对象
client = new MqttClient(brokerUrl, clientId);
//设置连接选项
MqttConnectOptions options = new MqttConnectOptions();
//用户名
options.setUserName(username);
//密码
options.setPassword(password.toCharArray());
//设置接收回调
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
log.error("MQTT 连接丢失: {}", cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
mqttMessageService.processMessage(topic, message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println(token);
}
});
//连接
client.connect(options);
log.info("MQTT 客户端已连接并订阅消息");
//订阅主题列表
String[] subscribes = {"/qJSbaj/dZpuwaPvNCpo/8cce4e521dd9/publish"};
//订阅主题
client.subscribe(subscribes);
} catch (MqttException e) {
throw new RuntimeException("MQTT 客户端连接失败", e);
}
}
public void disconnect() {
try {
if (client != null && client.isConnected()) {
client.disconnect();
log.info("MQTT 客户端已断开连接");
}
} catch (MqttException e) {
e.printStackTrace();
log.error("MQTT 客户端断开连接时出错: {}", e.getMessage());
}
}
}
MessageInfo实体类:
package com.example.demo.entity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@Data
@TableName("message_info")
public class MessageInfo {
@TableId
private String id;
@TableField("message")
private String message;
}
MessageInfoMapper接口:
package com.example.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.demo.entity.MessageInfo;
public interface MessageInfoMapper extends BaseMapper {
}
MqttMessageService接口:
package com.example.demo.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.example.demo.entity.MessageInfo;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public interface MqttMessageService extends IService {
void processMessage(String topic, MqttMessage message);
}
MqttMessageServiceImpl类:
package com.example.demo.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.demo.entity.MessageInfo;
import com.example.demo.mapper.MessageInfoMapper;
import com.example.demo.service.MqttMessageService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class MqttMessageServiceImpl extends ServiceImpl implements MqttMessageService {
@Autowired
private MessageInfoMapper messageInfoMapper;
public void processMessage(String topic, MqttMessage message) {
log.info("接收到主题:{} ,QOS:{}, 消息内容:{}的消息", topic, message.getQos(), new String(message.getPayload()));
MessageInfo mqtt = new MessageInfo();
mqtt.setMessage(new String(message.getPayload()));
messageInfoMapper.insert(mqtt);
}
}
MqttSendController类:
package com.example.demo.controller;
import com.example.demo.config.MqttUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
@RestController
@RequestMapping(value = "/api")
public class MqttSendController {
@Autowired
private MqttUtils mqttUtils;
@PostMapping("/sendMessage")
public String sendMessage(@RequestBody Map map) {
boolean publish = mqttUtils.publish(map.get("topic").toString(), map.get("message").toString()); //发布消息
if (publish) {
return "ok";
}
return "no";
}
}
步骤三:运行项目,使用apipost进行接口测试:
运行成功截图:

apipost发送消息:

项目控制台log:

数据库存储message信息:

至此就完成了所有的工作,并且保证了springboot对iot设备的控制与iot的状态信息获取
- 本文标签: 综合
- 本文链接: https://ssadan-blog.cn/article/29
- 版权声明: 本文由sadan原创发布,转载请遵循《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权