MyRunnerableInt.java 10.4 KB
package com.example.mypulsar;

import com.alibaba.fastjson.JSON;
import com.example.mypulsar.bean.DeviceBean;
import com.example.mypulsar.bean.TuYaReceiverBean;
import com.example.mypulsar.bean.TuYaReceiverBeanStatus;
import com.example.mypulsar.dao.DeviceDao;
import com.example.mypulsar.message.MessageVO;
import com.example.mypulsar.mq.AESBase64Utils;
import com.example.mypulsar.mq.MessageHandlerTask;
import com.example.mypulsar.mq.MqConsumer;
import com.example.mypulsar.utils.PulsarConsumerPoolFactory;
import com.example.mypulsar.utils.ThreadPoolFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import sun.rmi.runtime.Log;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.function.BiConsumer;

@Component
public class MyRunnerableInt implements ApplicationRunner {

    private static final Logger logger = LoggerFactory.getLogger(MqConsumer.class);

    @Autowired
    DeviceDao deviceDao;

    @Override
    public void run(ApplicationArguments args) throws Exception {

//        System.out.println("running");
//    initPool();
        initMqConsumer();
    }

    /**
     * 第一种方式
     */
    private void initPool() {
        PulsarConsumerPoolFactory.getConsumerPool().forEach(new BiConsumer<Integer, Consumer>() {
            @Override
            public void accept(Integer consumerNum, Consumer consumer) {
                ThreadPoolFactory.getCustomThreadPool().submit(new MessageHandlerTask(consumerNum, consumer));
            }
        });
    }

    String url = "pulsar+ssl://mqe.tuyacn.com:7285/";
    String accessId = "thddfrm97kydyq9wrc8u";
    String accessKey = "tqcuypeytayths9pudcn937wfupugc3t";

    /**
     * 第二种方式
     */
    private void initMqConsumer() {

        MqConsumer mqConsumer = MqConsumer.build().serviceUrl(url).accessId(accessId).accessKey(accessKey)
                .maxRedeliverCount(3).messageListener(message -> {
                            String jsonMessage = new String(message.getData());
                            MessageVO vo = JSON.parseObject(jsonMessage, MessageVO.class);
                            String data = AESBase64Utils.decrypt(vo.getData(), accessKey.substring(8, 24));
                            TuYaReceiverBean tuYaReceiverBean = JSON.parseObject(data, TuYaReceiverBean.class);
                            detealData(tuYaReceiverBean);
                        }

                );
        try {
            mqConsumer.start();
        } catch (Exception e) {
            logger.error("start error:" + e.toString());
            e.printStackTrace();
        }
    }

    /**
     * 处理接收的数据
     *
     * @param tuYaReceiverBean
     */
    private void detealData(TuYaReceiverBean tuYaReceiverBean) {
        System.out.println("tuYaReceiverBean:" + tuYaReceiverBean.toString());
        //数据对象
        DeviceBean deviceBean = new DeviceBean();
        deviceBean.setDataId(tuYaReceiverBean.getDataId());
        String bizCode = tuYaReceiverBean.getBizCode();
        String devId = tuYaReceiverBean.getDevId();
        String productKey = tuYaReceiverBean.getProductKey();

        deviceBean.setBizCode(bizCode);
        deviceBean.setDevId(devId);
        deviceBean.setProductKey(productKey);
        deviceBean.setProductKey(tuYaReceiverBean.getProductKey());

        if (bizCode == null) {//数据上报事件
            deviceBean.setBizCode("");
            String dataId = tuYaReceiverBean.getDataId();
            deviceBean.setDev_status(1);
            List<TuYaReceiverBeanStatus> status = tuYaReceiverBean.getStatus();
            for (TuYaReceiverBeanStatus statue :
                    status) {
                initStatue(statue, devId, productKey, dataId, deviceBean);
            }

        } else {//其他事件上报
            deviceBean.setTime(new SimpleDateFormat("yyyy-MM-dd HH::mm:ss").format(new Date()));
            switch (bizCode) {
                case "offline"://设备掉线
                    log("设备掉线:" + devId);
                    deviceBean.setDev_type(1);
                    deviceBean.setDev_status(0);
                    updateDevStatus(deviceBean);
                    updateStatus(devId, "0");
                    break;
                case "online"://设备上线
                    log("设备上线:" + devId);
                    deviceBean.setDev_status(1);
                    deviceBean.setDev_type(2);
                    updateDevStatus(deviceBean);
                    break;
                case "nameUpdate"://设备名称修改

                    break;
                case "dpNameUpdate"://功能点修改

                    break;
                case "bindUser"://绑定用户

                    break;
                case "delete"://删除设备

                    break;
            }
        }

    }

    private void updateStatus(String devId, String isConnection) {

//        String clintId = deviceDao.selectAttendance(devId);
//        if (clintId.equals(devId)) {
            deviceDao.updateStatus(devId, isConnection);
//        } else {
//            deviceDao.insert(devId, "-2", isConnection);
//        }

    }

    private void updateDevStatus(DeviceBean deviceBean) {
        System.out.println("插入数据:" + deviceBean.toString());
        deviceBean.setDpId("");
        deviceBean.setName("");
        deviceBean.setUid("");
        if (deviceBean.getValue() == null) deviceBean.setValue("");
        if (deviceBean.getBizCode() == null) deviceBean.setBizCode("");
        if (deviceBean.getDataId() == null) deviceBean.setDataId("");
        if (deviceBean.getCode() == null) deviceBean.setCode("");
        if (deviceBean.getProductKey() == null) deviceBean.setProductKey("");

        int index = deviceDao.addDevice(deviceBean.getDataId(), deviceBean.getDevId(), deviceBean.getProductKey(), deviceBean.getCode(), deviceBean.getValue()
                , deviceBean.getBizCode(), deviceBean.getTime(), deviceBean.getName(), deviceBean.getDpId(), deviceBean.getUid(), deviceBean.getDev_status(), deviceBean.getDev_type());
        updateStatus(deviceBean.getDevId(), "1");
    }

    /**
     * 数据上报事件处理
     *
     * @param statue
     * @param devId
     * @param productKey
     * @param dataId
     * @param deviceBean
     */
    private void initStatue(TuYaReceiverBeanStatus statue, String devId, String productKey, String dataId, DeviceBean deviceBean) {
        String code = statue.getCode();
        deviceBean.setCode(code);
        deviceBean.setProductKey(productKey);
        deviceBean.setDataId(dataId);
        deviceBean.setDev_type(3);
        long t = statue.getT();
        Date date = new Date();
        date.setTime(t);
        deviceBean.setTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date));
        String value = statue.getValue();
        deviceBean.setValue(value);
        switch (code) {
            case "va_temperature"://温度
                calOpenOrCloseDevWithtemper(deviceBean);//计算开启或关闭设备
                log("温度:" + value);
                break;
            case "va_humidity"://湿度
                log("湿度:" + value);
                break;
            case "temper_alarm"://防拆报警

                break;
            case "battery_percentage"://电池电量百分比
                log("电池电量百分比:" + value);
                break;
            case "battery_value"://电池电量值

                break;
            case "battery"://门磁电池电量值
                log("电池电量值:" + value);
                break;
            case "battery_state"://电池电量状态 {“range”:[“low”,“middle”,“high”]}
                log("电池电量状态:" + value);
                break;
            case "doorcontact_state"://门磁状态
                value = (value.equals("true") ? "开门" : "关门");
                log("门磁状态:" + (value.equals("true") ? "开门" : "关门"));
                break;
            case "switch"://门磁状态
                value = (value.equals("true") ? "开门" : "关门");
                log("开关状态:" + (value.equals("true") ? "开门" : "关门"));
                break;
            case "va_battery"://电池电量值

                break;
            case "switch_1"://开关1

                break;
            case "switch_2"://开关2

                break;
            case "switch_3"://开关3

                break;
            case "switch_4"://开关4

                break;
            case "switch_5"://开关5

                break;
            case "switch_6"://开关6

                break;
            case "study_code":

                break;
            case "control":

                break;
            case "pir"://人体感应
                value = (value.equals("pir") ? "有人" : "无人");
                log("人体感应:" + (value.equals("pir") ? "有人" : "无人"));
                calOpenOrCloseDevWithPir(deviceBean);
                break;
            case "cur_voltage"://当前电压
//                value = Integer.parseInt(value) / 10 + "V";
                log(devId + " 当前电压:" + value);
                break;
            case "cur_power"://当前功率
//                value = value.equals("0")?"0":Integer.parseInt(value) / 10 + "W";
//                log(devId + " 当前功率:" + Integer.parseInt(value) / 10 + "W");
                break;
            case "cur_current"://当前电流
//                value =  value.equals("0")?"0":Integer.parseInt(value) / 10 + "A";
//                log(devId + " 当前电流:" + Integer.parseInt(value) / 10 + "A");
                break;
        }
        updateDevStatus(deviceBean);
    }

    /**
     * 根据是否有人控制设备
     * @param deviceBean
     */
    private void calOpenOrCloseDevWithPir(DeviceBean deviceBean) {



    }


    /**
     * 根据温度变化控制设备
     * @param deviceBean
     */
    private void calOpenOrCloseDevWithtemper(DeviceBean deviceBean) {



    }

    private void log(String content) {
        System.out.println("接收到数据:" + content);
    }
}