原创

SpringBoot通过MQTT接入IOT设备

温馨提示:
本文最后更新于 2024年10月29日,已超过 178 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

IOT设备:理工智控(武汉)科技有限公司  所属产品:智能转换器-新国标

SpringBoot版本:3.2.10

RabbitMQ版本:RabbitMQ 4.0.2

Maven版本:3.6.1

JDK版本:21

MySQL版本:5.7.31

步骤一:先进行iot设备配网,可以根据设备的官方文档来进行

配网首页:

WiFi配置:

MQTT配置(自定义配置):


这个MQTT服务使用的是上一篇文章搭建的RabbitMQ MQTT服务,MQTT地址搭建MQTT服务的服务器地址,端口是MQTT服务的端口,客户id、发布主题和订阅主题都是自定义的其中发布订阅/订阅主题主题官方推荐格式/${appId}/${deviceKey}/${deviceMAC}/publish和/${appId}/${deviceKey}/${deviceMAC}/subscribe用户名密码是RabbitMQ的密码,默认都是guest,在配置完成并保存后,在主页面选择重启设备选项,当设备重新启动后这些配置才会生效。

验证iot是否连接到到自定义设备,可以进入RabbitMQ的管理页面查看,在connections中看到有一个连接设备,那么这个设备就是我们的iot设备了(之前没有任何设备连接的情况下)

步骤二:编写SpringBoot代码使其连接到MQTT,并实现发送MQTT消息控制iot设备并保存设备的返回的信息到数据库中。

数据库message_info表建表语句:

CREATE TABLE `message_info` (
  `id` varchar(255) NOT NULL,
  `message` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

项目pom文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.10</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>org.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>demo</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>21</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.2.20</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 引入 mqtt 相关依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
            <version>3.2.9</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>
        <!-- mp-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-spring-boot3-starter</artifactId>
            <version>3.5.7</version>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>com.baomidou</groupId>-->
<!--            <artifactId>mybatis-plus</artifactId>-->
<!--            <version>3.5.4</version>-->
<!--        </dependency>-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

配置文件(application.yml)如下:

spring:
  application:
    name: BaILcBsjuYyF_3
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: root
    password: #这里填入mysql密码
    url: jdbc:mysql://127.0.0.1:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useSSL=false&allowPublicKeyRetrieval=true&useLegacyDatetimeCode=false
    type: com.alibaba.druid.pool.DruidDataSource
mybatis-plus:
  global-config:
    db-config:
      id-type: assign_id
mqtt:
  url: #这里填入rabbitmq的地址,格式:tcp://ip:mqtt端口
  username: #填入rabbitmq用户名
  password: #填入rabbitmq密码

MqttCallBack配置类,接收MQTT的消息回调:

package com.example.demo.config;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class MqttCallBack implements MqttCallback {

    @Override
    public void connectionLost(Throwable cause) {
        log.error("断开连接");
    }

    /**
     * 消息到达的回调
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        log.info("主题:{}, qos:{}, 消息内容:{}已到达",topic,message.getQos(),message.getPayload());
    }

    /**
     * 消息发布成功的回调
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        IMqttAsyncClient client = token.getClient();
        log.info("客户端id:{},消息发送消息成功!",client.getClientId());
    }
}

MqttUtils配置类,用于创建MQTT连接并封装一些常用的比如发送消息和销毁链接等方法,具体如下:

package com.example.demo.config;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqttUtils {

    private static final Logger logger = LoggerFactory.getLogger(MqttUtils.class);

    @Value("${mqtt.username}")
    private String username;//MQTT用户名

    @Value("${mqtt.password}")
    private String password;//MQTT密码

    @Value("${mqtt.url}")
    private String brokerUrl;//MQTT地址

    @Value("${spring.application.name}")
    private String applicationName;//clientId

    private MqttClient client;

    /**
     * 项目启动时初始化MQTT连接
     */
    @PostConstruct
    public void init() {
        connect();
    }

    /**
     * 对象被回收时释放连接
     */
    @PreDestroy
    public void disConnect() {
        try {
            if (client != null && client.isConnected()) {
                client.disconnect();
            }
        } catch (MqttException e) {
            logger.error("Failed to disconnect from MQTT broker", e);
        }
    }

    private void connect() {
        try {
            //创建链接
            client = new MqttClient(brokerUrl, applicationName, new MemoryPersistence());
//            设置连接选项
            MqttConnectOptions options = new MqttConnectOptions();
            //是否每次连接都是新连接(是否复用session)
            options.setCleanSession(true);
            //设置MQTT用户名
            options.setUserName(username);
//            设置MQTT密码
            options.setPassword(password.toCharArray());
            //设置连接超时时间
            options.setConnectionTimeout(100);
            //心跳检测
            options.setKeepAliveInterval(20);
            //遗嘱消息
            options.setWill("willTopic", (applicationName + "与服务器断开连接").getBytes(), 0, false);
//            消息回调
            client.setCallback(new MqttCallBack());
//            创建MQTT连接
            client.connect(options);
        } catch (MqttException e) {
            logger.error("Failed to connect to MQTT broker", e);
        }
    }

    /**
     * 消息发送
     * @param topic 主题
     * @param message 消息
     * @return 是否发送成功
     */
    public boolean publish(String topic, String message) {
        try {
            //创建MQTT消息实体
            MqttMessage mqttMessage = new MqttMessage(message.getBytes());
            /**
             * 设置Qos
             * 0:最多一次交付,设置为0后消息发布完成,对消息的投递没有任何确认或重传机制,如果发送失败消息被丢弃
             * 1:至少一次交付,设置为1后保证消息至少会被传递一次,如果消费失败消息会被存储等待客户端重新监听后发送
             * 2:恰好一次交付,设置为2后保证消息只会被传第一次,并且确保消息只被接收者接收到一次
             */
            mqttMessage.setQos(2);
            //消息持久化,并且在新的客户端订阅时自动转发该消息
            mqttMessage.setRetained(false);
            //获取主题
            MqttTopic mqttTopic = client.getTopic(topic);
            //发送消息至主题
            mqttTopic.publish(mqttMessage).waitForCompletion();
            return true;
        } catch (MqttException e) {
            logger.error("Failed to publish message to topic {}", topic, e);
            return false;
        }
    }
}

MqttListen类,用于订阅主题,并将接收到的消息保存入数据库中:

package com.example.demo.config;

import com.example.demo.service.MqttMessageService;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import java.util.Random;

@Slf4j
@Configuration
public class MqttListen {
    @Autowired
    private MqttMessageService mqttMessageService;

    @Value("${mqtt.username}")
    private String username;//MQTT用户名

    @Value("${mqtt.password}")
    private String password;//MQTT密码

    @Value("${mqtt.url}")
    private String brokerUrl;//MQTT地址


    private MqttClient client;
    private final String clientIdPrefix = "BaILcBsjuYyF_";//客户端id前缀

    @PostConstruct
    public void init() {
        connect();
    }

    @PreDestroy
    public void destroy() {
        disconnect();
    }

    public void connect() {
        try {
            //设置客户端id
            String clientId = clientIdPrefix + new Random().nextInt(100000);
            //创建mqtt客户端对象
            client = new MqttClient(brokerUrl, clientId);
            //设置连接选项
            MqttConnectOptions options = new MqttConnectOptions();
            //用户名
            options.setUserName(username);
            //密码
            options.setPassword(password.toCharArray());
            //设置接收回调
            client.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    log.error("MQTT 连接丢失: {}", cause.getMessage());
                }

                @Override
                public void messageArrived(String topic, MqttMessage message) {
                    mqttMessageService.processMessage(topic, message);
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    System.out.println(token);
                }
            });
            //连接
            client.connect(options);
            log.info("MQTT 客户端已连接并订阅消息");
            //订阅主题列表
            String[] subscribes = {"/qJSbaj/dZpuwaPvNCpo/8cce4e521dd9/publish"};
            //订阅主题
            client.subscribe(subscribes);
        } catch (MqttException e) {
            throw new RuntimeException("MQTT 客户端连接失败", e);
        }
    }

    public void disconnect() {
        try {
            if (client != null && client.isConnected()) {
                client.disconnect();
                log.info("MQTT 客户端已断开连接");
            }
        } catch (MqttException e) {
            e.printStackTrace();
            log.error("MQTT 客户端断开连接时出错: {}", e.getMessage());
        }
    }
}

MessageInfo实体类:

package com.example.demo.entity;

import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

@Data
@TableName("message_info")
public class MessageInfo {

    @TableId
    private String id;
    @TableField("message")
    private String message;
}

MessageInfoMapper接口:

package com.example.demo.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.demo.entity.MessageInfo;

public interface MessageInfoMapper extends BaseMapper<MessageInfo> {
}

MqttMessageService接口:

package com.example.demo.service;

import com.baomidou.mybatisplus.extension.service.IService;

import com.example.demo.entity.MessageInfo;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public interface MqttMessageService extends IService<MessageInfo> {
    void processMessage(String topic, MqttMessage message);
}

MqttMessageServiceImpl类:

package com.example.demo.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;

import com.example.demo.entity.MessageInfo;
import com.example.demo.mapper.MessageInfoMapper;
import com.example.demo.service.MqttMessageService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class MqttMessageServiceImpl extends ServiceImpl<MessageInfoMapper, MessageInfo> implements MqttMessageService {

    @Autowired
    private MessageInfoMapper messageInfoMapper;

    public void processMessage(String topic, MqttMessage message) {
        log.info("接收到主题:{} ,QOS:{}, 消息内容:{}的消息", topic, message.getQos(), new String(message.getPayload()));
        MessageInfo mqtt = new MessageInfo();
        mqtt.setMessage(new String(message.getPayload()));
        messageInfoMapper.insert(mqtt);
    }
}

MqttSendController类:

package com.example.demo.controller;



import com.example.demo.config.MqttUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.Map;

@RestController
@RequestMapping(value = "/api")
public class MqttSendController {

    @Autowired
    private MqttUtils mqttUtils;

    @PostMapping("/sendMessage")
    public String sendMessage(@RequestBody Map<String, String> map) {
        boolean publish = mqttUtils.publish(map.get("topic").toString(), map.get("message").toString()); //发布消息
        if (publish) {
            return "ok";
        }
        return "no";
    }
}

步骤三:运行项目,使用apipost进行接口测试:

运行成功截图:

apipost发送消息:

项目控制台log:

数据库存储message信息:

至此就完成了所有的工作,并且保证了springboot对iot设备的控制与iot的状态信息获取

正文到此结束