Mqtt
大约 1 分钟
Mqtt
介绍🍏
- 2022/6/6
- 消息队列 https://www.emqx.com/zh
- 基于:完整 MQTT 3.x 和 5.0 规范
使用场景🍑
- 移动端 与 服务端的消息解耦
示例 SpringBoot🏧
- 接入依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
- 配置类
package com.demo.mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author lc
* @since 2022/6/7
*/
@Configuration
public class Config {
@Bean
public MqttClient mqttClient(@Autowired OnMessageCallback onMessageCallback) throws MqttException {
String topic = "/fb1973f8-5590-4336-9384-54f07ad79c57/status/base";
String topic1 = "test1";
String topic2 = "test2";
String topic3 = "test3";
String broker = "tcp://120.76.138.18:1883";
String username = "fix_panzhihua_developer";
String password = "developer";
String clientId = "testClientId";
MqttClient client = null;
MemoryPersistence persistence = new MemoryPersistence();
client = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions(); // MQTT 连接选项
connOpts.setUserName(username);
connOpts.setPassword(password.toCharArray());
connOpts.setCleanSession(true); // 保留会话
client.setCallback(onMessageCallback); // 设置回调
client.connect(connOpts); // 建立连接
// 可以订阅多个并且通过 OnMessageCallback 获取发送回来的消息
client.subscribe(new String[]{topic, topic1, topic2, topic3});
return client;
}
}
- Mqtt控制器
package com.demo.mqtt;
import org.springframework.stereotype.Service;
/**
* @author lc
* @since 2022/6/7
*/
@Service
public class MqttController {
public void printTopic(String topic){
System.out.println(topic);
}
}
- 消息回调
package com.demo.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@Configuration
public class OnMessageCallback implements MqttCallback {
@Autowired
private MqttController mqttController;
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
mqttController.printTopic(topic);
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题:" + topic);
System.out.println("接收消息Qos:" + message.getQos());
System.out.println("接收消息内容:" + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}