跳至主要內容

Mqtt

LiCheng大约 1 分钟

Mqtt

介绍🍏

使用场景🍑

  • 移动端 与 服务端的消息解耦

示例 SpringBoot🏧

  • 接入依赖
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>
  • 配置类
package com.demo.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author lc
 * @since 2022/6/7
 */
@Configuration
public class Config {

    @Bean
    public MqttClient mqttClient(@Autowired  OnMessageCallback onMessageCallback) throws MqttException {
        String topic = "/fb1973f8-5590-4336-9384-54f07ad79c57/status/base";
        String topic1 = "test1";
        String topic2 = "test2";
        String topic3 = "test3";
        String broker = "tcp://120.76.138.18:1883";
        String username = "fix_panzhihua_developer";
        String password = "developer";
        String clientId = "testClientId";
        MqttClient client = null;
        MemoryPersistence persistence = new MemoryPersistence();
        client = new MqttClient(broker, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();  // MQTT 连接选项
        connOpts.setUserName(username);
        connOpts.setPassword(password.toCharArray());
        connOpts.setCleanSession(true); // 保留会话
        client.setCallback(onMessageCallback); // 设置回调
        client.connect(connOpts); // 建立连接
        // 可以订阅多个并且通过 OnMessageCallback 获取发送回来的消息
        client.subscribe(new String[]{topic, topic1, topic2, topic3});
        return client;
    }
}
  • Mqtt控制器
package com.demo.mqtt;

import org.springframework.stereotype.Service;

/**
 * @author lc
 * @since 2022/6/7
 */
@Service
public class MqttController {
    public void printTopic(String topic){
        System.out.println(topic);
    }
}

  • 消息回调
package com.demo.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

@Configuration
public class OnMessageCallback implements MqttCallback {
    @Autowired
    private MqttController mqttController;

    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        System.out.println("连接断开,可以做重连");
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        mqttController.printTopic(topic);
        // subscribe后得到的消息会执行到这里面
        System.out.println("接收消息主题:" + topic);
        System.out.println("接收消息Qos:" + message.getQos());
        System.out.println("接收消息内容:" + new String(message.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }
}