admin 管理员组

文章数量: 894198

【MQTT】SpringBoot整合MQTT(EMQX)

最近在做MQTT对接,然后发送消息,然后参考网上的实战文章进行了一下整理。

文章主要参考自(),然后自己做了些许更改。

1、整合准备

SpringBoot:2.2.2.RELEASE

MQTT平台:EMQX4.4.1(Docker运行)

虚拟机服务器:Centos7(192.168.56.102 )

发送端:cloud-mqtt-send8001

接收端:cloud-mqtt-accept8002

2、发送端:cloud-mqtt-send8002

导入POM依赖:

    <!-- mqtt --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><!--配置文件报错问题--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version><scope>provided</scope></dependency>

设置YML

server:port: 8001
​
spring:application:name: mqtt-send
​
#mqtt属性配置
mqtt:hostUrl: tcp://192.168.56.102:1883username: adminpassword: publicclientid: mqtt_send_clientcleanSession: truereconnect: true#连接超时timeout: 1000#设置会话心跳时间keepalive: 100defaultTopic: client:report:1isOpen: trueqos: 1

新建配置类:MqttProperties.java

@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
​/*** 用户名*/private String username;
​/*** 密码*/private String password;
​/*** 连接地址*/private String hostUrl;
​/*** 客户端Id,同一台服务器下,不允许出现重复的客户端id*/private String clientId;
​/*** 默认连接主题*/private String defaultTopic;
​/*** 超时时间*/private int timeout;
​/*** 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端* 发送个消息判断客户端是否在线,但这个方法并没有重连的机制*/private int keepAlive;
​/*** 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连* 接记录,这里设置为true表示每次连接到服务器都以新的身份连接*/private Boolean cleanSession;
​/*** 是否断线重连*/private Boolean reconnect;
​/*** 启动的时候是否关闭mqtt*/private Boolean isOpen;
​/*** 连接方式*/private Integer qos;
}

添加MQTT发送客户端:MqttSendClient:

@Slf4j
@Component
public class MqttSendClient {
​
​@Autowiredprivate MqttSendCallBack mqttSendCallBack;
​@Autowiredprivate MqttProperties mqttProperties;
​private static MqttClient mqttClient;
​private static MqttClient getClient() {
​
​return mqttClient;}
​private static void setClient(MqttClient client) {
​
​MqttSendClient.mqttClient = client;}
​/*** 客户端连接* @return*/public void connect(){MqttClient client = null;
​try {
​
​//String uuid = UUID.randomUUID().toString().replaceAll("-",""); //设置每一个客户端的idclient = new MqttClient(mqttProperties.getHostUrl(),mqttProperties.getClientId() , new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setConnectionTimeout(mqttProperties.getTimeout());options.setKeepAliveInterval(mqttProperties.getKeepAlive());options.setCleanSession(true);options.setAutomaticReconnect(false);
​MqttSendClient.setClient(client);try {
​
​// 设置回调client.setCallback(mqttSendCallBack);client.connect(options);} catch (Exception e) {
​
​e.printStackTrace();}} catch (Exception e) {
​e.printStackTrace();}}
​
​
​
​/*** 发布,默认qos为0,非持久化** @param topic 主题名* @param pushMessage 消息*/public void publish(String topic, String pushMessage) {publish(0, false, topic, pushMessage);}
​/*** 发布** @param qos* @param retained* @param topic* @param pushMessage*/public void publish(int qos, boolean retained, String topic, String pushMessage) {MqttMessage message = new MqttMessage();message.setQos(qos);message.setRetained(retained);message.setPayload(pushMessage.getBytes());MqttTopic mTopic = MqttSendClient.getClient().getTopic(topic);if (null == mTopic) {log.error("主题不存在:{}",mTopic);}try {mTopic.publish(message);log.info("消息发送成功");} catch (Exception e) {log.error("mqtt发送消息异常:",e);}}
}

添加MQTT发送客户端回调类:MqttSendCallBack

@Slf4j
@Component
public class MqttSendCallBack implements MqttCallbackExtended {
​
​/*** 链接EMQ服务器后触发* @param reconnect* @param serverURI*/@Overridepublic void connectComplete(boolean reconnect, String serverURI) {log.info("————————————————-ClientID:{}——————————————"+"链接成功");}
​/*** 客户端连接断开后触发* 这里可以做重新链接操作*/@Overridepublic void connectionLost(Throwable cause) {log.error("【MQTT-发送端】链接断开!");}
​@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {log.info("【MQTT-发送端】接收消息主题 : " + topic);log.info("【MQTT-发送端】接收消息Qos : " + message.getQos());log.info("【MQTT-发送端】接收消息内容 : " + new String(message.getPayload()));}
​/*** 发送消息回调* @param token*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {
​String[] topics = token.getTopics();
​if (topics!=null && topics.length>0){for (String topic : topics) {
​
​log.info("【MQTT-发送端】向主题:" + topic + "发送消息成功!");}}
​try {
​
​MqttMessage message = token.getMessage();byte[] payload = message.getPayload();String s = new String(payload, "UTF-8");log.info("【MQTT-发送端】消息的内容是:" + s);} catch (MqttException e) {
​
​e.printStackTrace();} catch (UnsupportedEncodingException e) {
​
​e.printStackTrace();}}
}
​

添加:MqttCondition

public class MqttCondition implements Condition {@Overridepublic boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
​System.out.println("MqttCondition。。。。");
​//1、能获取到ioc使用的beanfactoryConfigurableListableBeanFactory beanFactory = context.getBeanFactory();//2、获取类加载器ClassLoader classLoader = context.getClassLoader();//3、获取当前环境信息Environment environment = context.getEnvironment();String isOpen = environment.getProperty("mqtt.isOpen");return Boolean.valueOf(isOpen);}
}

添加MQTT配置类:MqttConfig

@Configuration
public class MqttConfig {
​@Autowiredprivate MqttSendClient mqttSendClient;
​@Conditional(MqttCondition.class)@Beanpublic MqttSendClient getMqttSendClient(){mqttSendClient.connect();return mqttSendClient;}
}
​

主启动类

@SpringBootApplication
public class MqttSendApplication {
​public static void main(String[] args) {SpringApplication.run(MqttSendApplication.class, args);}
}

启动项目,链接MQTT服务器成功。

项目整体代码结构如下:

 

3、接收端:cloud-mqtt-accept8002

导入POM

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--MQTT客户端工具--><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency>
</dependencies>

设置YML

server:port: 8002
​
spring:application:name: mqtt-accept
​
​
mqtt:hostUrl: tcp://192.168.56.102:1883username: adminpassword: publicclientid: mqtt_accept_clientcleanSession: truereconnect: true#连接超时timeout: 1000#设置会话心跳时间keepalive: 100defaultTopic: client:report:1isOpen: trueqos: 1

属性配置文件:MqttProperties.java

@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
​/*** 用户名*/private String username;
​/*** 密码*/private String password;
​/*** 连接地址*/private String hostUrl;
​/*** 客户端Id,同一台服务器下,不允许出现重复的客户端id*/private String clientId;
​/*** 默认连接主题*/private String defaultTopic;
​/*** 超时时间*/private int timeout;
​/*** 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端* 发送个消息判断客户端是否在线,但这个方法并没有重连的机制*/private int keepAlive;
​/*** 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连* 接记录,这里设置为true表示每次连接到服务器都以新的身份连接*/private Boolean cleanSession;
​/*** 是否断线重连*/private Boolean reconnect;
​/*** 启动的时候是否关闭mqtt*/private Boolean isOpen;
​/*** 连接方式*/private Integer qos;
}
​

添加MQTT接收客户端:MqttAcceptClient

@Slf4j
@Component
public class MqttAcceptClient {
​
​@Autowiredprivate MqttAcceptCallback mqttAcceptCallback;
​@Autowiredprivate MqttProperties mqttProperties;
​
​private static MqttClient mqttClient;
​public static MqttClient getMqttClient() {return mqttClient;}
​public static void setMqttClient(MqttClient mqttClient) {MqttAcceptClient.mqttClient = mqttClient;}
​
​/*** 客户端连接*/public void connect() {
​
​MqttClient client;try {
​
​client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setConnectionTimeout(mqttProperties.getTimeout());options.setKeepAliveInterval(mqttProperties.getKeepAlive());options.setAutomaticReconnect(mqttProperties.getReconnect());options.setCleanSession(mqttProperties.getCleanSession());MqttAcceptClient.setMqttClient(client);try {
​
​// 设置回调client.setCallback(mqttAcceptCallback);client.connect(options);} catch (Exception e) {
​
​e.printStackTrace();}} catch (Exception e) {
​
​e.printStackTrace();}}
​/*** 重新连接*/public void reconnection() {
​
​try {mqttClient.connect();} catch (MqttException e) {
​
​e.printStackTrace();}}
​/*** 订阅某个主题** @param topic 主题* @param qos   连接方式*/public void subscribe(String topic, int qos) {
​
​log.info("==============开始订阅主题==============" + topic);try {
​
​mqttClient.subscribe(topic, qos);} catch (MqttException e) {
​
​e.printStackTrace();}}
​/*** 取消订阅某个主题** @param topic*/public void unsubscribe(String topic) {
​
​log.info("==============开始取消订阅主题==============" + topic);try {
​
​mqttClient.unsubscribe(topic);} catch (MqttException e) {
​
​e.printStackTrace();}}
}
​

添加mqtt接受服务的回调类:MqttAcceptCallback

@Slf4j
@Component
public class MqttAcceptCallback implements MqttCallbackExtended {@Autowiredprivate MqttAcceptClient mqttAcceptClient;
​/*** 客户端断开后触发** @param throwable*/@Overridepublic void connectionLost(Throwable throwable) {
​
​log.info("【MQTT-消费端】连接断开,可以做重连");if (MqttAcceptClient.getMqttClient() == null || !MqttAcceptClient.getMqttClient().isConnected()) {
​
​log.info("【MQTT-消费端】emqx重新连接....................................................");mqttAcceptClient.reconnection();}}
​/*** 客户端收到消息触发** @param topic       主题* @param mqttMessage 消息*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
​
​log.info("【MQTT-消费端】接收消息主题 : " + topic);log.info("【MQTT-消费端】接收消息Qos : " + mqttMessage.getQos());log.info("【MQTT-消费端】接收消息内容 : " + new String(mqttMessage.getPayload()));
//        int i = 1/0;}
​/*** 发布消息成功** @param token token*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {
​
​String[] topics = token.getTopics();for (String topic : topics) {
​
​log.info("【MQTT-消费端】向主题:" + topic + "发送消息成功!");}try {
​
​MqttMessage message = token.getMessage();byte[] payload = message.getPayload();String s = new String(payload, "UTF-8");log.info("【MQTT-消费端】消息的内容是:" + s);} catch (MqttException e) {
​
​e.printStackTrace();} catch (UnsupportedEncodingException e) {
​
​e.printStackTrace();}}
​/*** 连接emq服务器后触发** @param b* @param s*/@Overridepublic void connectComplete(boolean b, String s) {
​System.out.println("s: " + s);
​log.info("--------------------【MQTT-消费端】连接成功!--------------------");// 以/#结尾表示订阅所有以test开头的主题// 订阅所有机构主题mqttAcceptClient.subscribe("test_queue", 0);}
}

MqttCondition

public class MqttCondition implements Condition {@Overridepublic boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
​System.out.println("MqttCondition。。。。");
​//1、能获取到ioc使用的beanfactoryConfigurableListableBeanFactory beanFactory = context.getBeanFactory();//2、获取类加载器ClassLoader classLoader = context.getClassLoader();//3、获取当前环境信息Environment environment = context.getEnvironment();String isOpen = environment.getProperty("mqtt.isOpen");return Boolean.valueOf(isOpen);}
}

MQTT启动配置类:MqttConfig.java

@Configuration
public class MqttConfig {
​@Autowiredprivate MqttAcceptClient mqttAcceptClient;
​@Conditional(MqttCondition.class)@Beanpublic MqttAcceptClient getMqttAcceptClient(){
​mqttAcceptClient.connect();//mqttAcceptClient.subscribe("test_queue",0);return mqttAcceptClient;}
}

主启动类

package com.xlh.springcloud;
​
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
@SpringBootApplication
public class MqttAcceptApplication {
​public static void main(String[] args) {SpringApplication.run(MqttAcceptApplication.class,args);}
}

启动项目,链接MQTT服务器,然后订阅主题:

 

项目整体结构如下:

 

4、项目测试

分别启动发送端和接收端以后,EMQX监控平台如下:

 

然后调用发送端测试接口:http://localhost:8001/send

发送端cloud-mqtt-send8001运行结果如下:

 

接收端cloud-mqtt-accept8002运行结果如下:

可以看到接收端成功的接收到消息

 

本文标签: MQTTSpringBoot整合MQTT(EMQX)