package com.sincere.att.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.io.UnsupportedEncodingException; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; /** * mqtt 消息发送服务 */ @Slf4j public class MqttManager { private static String accessKey; private static String sign; public static MqttClient mqttClient; public static String groupId; public static String topic; private static int qosLevel; public void init() { final String brokerUrl = "tcp://post-cn-4590mq2hr03.mqtt.aliyuncs.com:1883"; groupId = "GID_HFJSIURFHAQO110"; topic = "Topic_Quene_Test"; qosLevel = 1; final Boolean cleanSession = false; String clientId = groupId + "@@@9ED96FB6D72C1698"; accessKey = "UimvLVp0Wj90P88u"; String secretKey = "TE4rZenITG27tiQqHx9qINjx71Nws7"; final MemoryPersistence memoryPersistence = new MemoryPersistence(); if (null == mqttClient) { try { mqttClient = new MqttClient(brokerUrl, clientId, memoryPersistence); } catch (MqttException e) { e.printStackTrace(); } MqttConnectOptions connOpts = new MqttConnectOptions(); //cal the sign as password,sign=BASE64(MAC.SHA1(groupId,secretKey)) try { sign = Tools.macSignature(clientId.split("@@@")[0], secretKey); } catch (InvalidKeyException e) { e.printStackTrace(); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } connOpts.setUserName(accessKey); connOpts.setPassword(sign.toCharArray()); connOpts.setCleanSession(cleanSession); connOpts.setKeepAliveInterval(90); connOpts.setAutomaticReconnect(true); mqttClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { log.info("connect success"); } @Override public void connectionLost(Throwable throwable) { log.info("connect lost:" + throwable.toString()); throwable.printStackTrace(); init();//初始化 } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { log.info("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { //this notice make sense when qos >0 // System.out.println("send msg succeed"); } }); try { mqttClient.connect(connOpts); } catch (MqttException e) { log.warn("mqtt:" + e.toString()); e.printStackTrace(); } } } public void sendMq(String deviceId, String content) { String recvClientId = groupId + "@@@" + deviceId; final String p2pSendTopic = topic + "/p2p/" + recvClientId; MqttMessage message = null; try { message = new MqttMessage(content.getBytes("GB2312")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } message.setQos(qosLevel); log.info("发送内容:" + p2pSendTopic + " msg:---------" + content); if (null != mqttClient) { try { mqttClient.publish(p2pSendTopic, message); } catch (MqttException e) { e.printStackTrace(); } } } }