MqttManager.java 3.88 KB
package com.sincere.att.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.io.UnsupportedEncodingException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;

/**
 * mqtt 消息发送服务
 */
@Slf4j
public class MqttManager {

    private static String accessKey;

    private static String sign;

    public static MqttClient mqttClient;

    public static String groupId;

    public static String topic;

    private static int qosLevel;

    public void init() {
        final String brokerUrl = "tcp://post-cn-4590mq2hr03.mqtt.aliyuncs.com:1883";
        groupId = "GID_HFJSIURFHAQO110";
        topic = "Topic_Quene_Test";
        qosLevel = 1;
        final Boolean cleanSession = false;
        String clientId = groupId + "@@@9ED96FB6D72C1698";
        accessKey = "UimvLVp0Wj90P88u";
        String secretKey = "TE4rZenITG27tiQqHx9qINjx71Nws7";
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        if (null == mqttClient) {
            try {
                mqttClient = new MqttClient(brokerUrl, clientId, memoryPersistence);
            } catch (MqttException e) {
                e.printStackTrace();
            }
            MqttConnectOptions connOpts = new MqttConnectOptions();
            //cal the sign as password,sign=BASE64(MAC.SHA1(groupId,secretKey))
            try {
                sign = Tools.macSignature(clientId.split("@@@")[0], secretKey);
            } catch (InvalidKeyException e) {
                e.printStackTrace();
            } catch (NoSuchAlgorithmException e) {
                e.printStackTrace();
            }
            connOpts.setUserName(accessKey);
            connOpts.setPassword(sign.toCharArray());
            connOpts.setCleanSession(cleanSession);
            connOpts.setKeepAliveInterval(90);
            connOpts.setAutomaticReconnect(true);
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    log.info("connect success");
                }

                @Override
                public void connectionLost(Throwable throwable) {
                    log.info("connect lost:" + throwable.toString());
                    throwable.printStackTrace();
                    init();//初始化
                }

                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    log.info("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    //this notice make sense when qos >0
//                    System.out.println("send msg succeed");
                }
            });
            try {
                mqttClient.connect(connOpts);
            } catch (MqttException e) {
                log.warn("mqtt:" + e.toString());
                e.printStackTrace();
            }
        }
    }

    public void sendMq(String deviceId, String content) {

        String recvClientId = groupId + "@@@" + deviceId;
        final String p2pSendTopic = topic + "/p2p/" + recvClientId;

        MqttMessage message = null;
        try {
            message = new MqttMessage(content.getBytes("GB2312"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        message.setQos(qosLevel);
        log.info("发送内容:" + p2pSendTopic + " msg:---------" + content);
        if (null != mqttClient) {
            try {
                mqttClient.publish(p2pSendTopic, message);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

    }


}