ApplicationRunnerImpl.java 6.89 KB
package com.shunzhi.mqtt2kanban;

import com.shunzhi.mqtt2kanban.service.UserSeerviceImp;
import com.shunzhi.mqtt2kanban.utils.Tools;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.HashSet;
import java.util.Set;

@Component
public class ApplicationRunnerImpl implements ApplicationRunner {

    private static String accessKey;

    private static String sign;

    public static MqttClient mqttClient;

    public static String groupId;

    public static String topic;

    private static int qosLevel;

    public static boolean isConnect;

    private Set<String> set = new HashSet<String>();

    @Autowired
    UserSeerviceImp userSeerviceImp;

    @Override
    public void run(ApplicationArguments args) throws Exception {

        initMq();

    }

    /**
     * 初始化MQ
     */
    private void initMq() {
        final String brokerUrl = "tcp://post-cn-4590mq2hr03.mqtt.aliyuncs.com:1883";
        groupId = "GID_HFJSIURFHAQO110";
        topic = "Topic_Quene_Test";
        qosLevel = 1;
        final Boolean cleanSession = false;
        String clientId = groupId + "@@@9ED96FB6D72C1698";
        accessKey = "UimvLVp0Wj90P88u";
        String secretKey = "TE4rZenITG27tiQqHx9qINjx71Nws7";
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        if (null == mqttClient) {
            try {
                mqttClient = new MqttClient(brokerUrl, clientId, memoryPersistence);
            } catch (MqttException e) {
                e.printStackTrace();
            }
            MqttConnectOptions connOpts = new MqttConnectOptions();
            //cal the sign as password,sign=BASE64(MAC.SHA1(groupId,secretKey))
            try {
                sign = Tools.macSignature(clientId.split("@@@")[0], secretKey);
            } catch (InvalidKeyException e) {
                e.printStackTrace();
            } catch (NoSuchAlgorithmException e) {
                e.printStackTrace();
            }
            connOpts.setUserName(accessKey);
            connOpts.setPassword(sign.toCharArray());
            connOpts.setCleanSession(cleanSession);
            connOpts.setKeepAliveInterval(90);
            connOpts.setAutomaticReconnect(true);
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    isConnect = true;
                    System.out.println("connect success");
                }

                @Override
                public void connectionLost(Throwable throwable) {
                    isConnect = false;
                    System.out.println("connect lost:" + throwable.toString());
                    throwable.printStackTrace();
                    initMq();//初始化
                }

                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    //this notice make sense when qos >0
//                    System.out.println("send msg succeed");
                }
            });
            try {
                mqttClient.connect(connOpts);
                sendMq();
            } catch (MqttException e) {
                System.out.println("mqtt:" + e.toString());
                e.printStackTrace();
            }
            /*while (true){
                sendMessageTest("528C8E6CD4A3C659","zy105387",0);
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    System.out.println("connect success:"+e);
                    e.printStackTrace();
                }
            }*/
        }
    }

    private void sendMq() {
        String recvClientId = groupId + "@@@09EA8EB5142F2964";
        final String p2pSendTopic = topic + "/p2p/" + recvClientId;
        String content = "{\"cmd\":\"37\",\"clientId\":\"\",\"data\": \"6:00-22:00\"}";
        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(qosLevel);
        System.out.println("发送内容:" + p2pSendTopic);
        if (null != mqttClient) {
            try {
                mqttClient.publish(p2pSendTopic, message);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

    }


    /**
     * @param equipmentSN 控制器SN号
     * @param consumerNO  userid
     * @param inOrOut     进出状态
     */
    public void sendMessage(String equipmentSN, String consumerNO, int inOrOut) {
        try {
            String recvClientId = groupId + "@@@";
            if (null != Mqtt2kanbanApplication.deviceList) {
                for (int i = 0; i < Mqtt2kanbanApplication.deviceList.size(); i++) {
                    String devices = Mqtt2kanbanApplication.deviceList.get(i);
                    if (devices.contains(equipmentSN))
                        recvClientId += devices.split(",")[0];

                }
                final String p2pSendTopic = topic + "/p2p/" + recvClientId;
                String data = "{\"userId\":\"" + consumerNO + "\",\"inOrOut\":" + inOrOut + "}";
                String content = "{\"cmd\":\"34\",\"clientId\":\"\",\"data\":" + data + "}";
                MqttMessage message = new MqttMessage(content.getBytes());
                message.setQos(qosLevel);
                System.out.println("发送内容:" + p2pSendTopic);
                if (null != mqttClient)
                    mqttClient.publish(p2pSendTopic, message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    public void sendMessageTest(String equipmentSN, String consumerNO, int inOrOut) {
        try {
            String recvClientId = groupId + "@@@"+equipmentSN;
                final String p2pSendTopic = topic + "/p2p/" + recvClientId;
                String data = "{\"userId\":\"" + consumerNO + "\",\"inOrOut\":" + inOrOut + "}";
                String content = "{\"cmd\":\"34\",\"clientId\":\"\",\"data\":" + data + "}";
                MqttMessage message = new MqttMessage(content.getBytes());
                message.setQos(qosLevel);
                System.out.println("发送内容:" + p2pSendTopic+" "+message);
                if (null != mqttClient)
                    mqttClient.publish(p2pSendTopic, message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}