MyRunnerableInt.java 14.3 KB
package com.example.mypulsar;

import com.alibaba.fastjson.JSON;
import com.example.mypulsar.bean.*;
import com.example.mypulsar.dao.DeviceDao;
import com.example.mypulsar.message.MessageVO;
import com.example.mypulsar.mq.AESBase64Utils;
import com.example.mypulsar.mq.MessageHandlerTask;
import com.example.mypulsar.mq.MqConsumer;
import com.example.mypulsar.task.SchduledTasks;
import com.example.mypulsar.utils.ControlUtils;
import com.example.mypulsar.utils.PulsarConsumerPoolFactory;
import com.example.mypulsar.utils.ThreadPoolFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.web.client.RestTemplate;
import sun.rmi.runtime.Log;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.function.BiConsumer;

@Slf4j
@Component
public class MyRunnerableInt implements ApplicationRunner {

    private static final Logger logger = LoggerFactory.getLogger(MqConsumer.class);

    @Autowired
    DeviceDao deviceDao;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        initMqConsumer();
    }

    /**
     * 第一种方式
     */
    private void initPool() {
        PulsarConsumerPoolFactory.getConsumerPool().forEach(new BiConsumer<Integer, Consumer>() {
            @Override
            public void accept(Integer consumerNum, Consumer consumer) {
                ThreadPoolFactory.getCustomThreadPool().submit(new MessageHandlerTask(consumerNum, consumer));
            }
        });
    }

    String url = "pulsar+ssl://mqe.tuyacn.com:7285/";
    String accessId = "thddfrm97kydyq9wrc8u";
    String accessKey = "tqcuypeytayths9pudcn937wfupugc3t";

    /**
     * 第二种方式
     */
    private void initMqConsumer() {
        MqConsumer mqConsumer = MqConsumer.build().serviceUrl(url).accessId(accessId).accessKey(accessKey)
                .maxRedeliverCount(3).messageListener(message -> {
                            //消息内容
                            String jsonMessage = new String(message.getData());
                            MessageVO vo = JSON.parseObject(jsonMessage, MessageVO.class);
                            String data = AESBase64Utils.decrypt(vo.getData(), accessKey.substring(8, 24));
                            TuYaReceiverBean tuYaReceiverBean = JSON.parseObject(data, TuYaReceiverBean.class);
                            log.info("接收消息,消息内容:"+ JSON.toJSONString(tuYaReceiverBean));
                            //处理数据上报
                            detealData(tuYaReceiverBean);
                        }

                );
        try {
            mqConsumer.start();
        } catch (Exception e) {
            logger.error("start error:",e);
        }
    }

    /**
     * 处理接收的数据
     *
     * @param tuYaReceiverBean
     */
    private void detealData(TuYaReceiverBean tuYaReceiverBean) {
        //涂鸦全局唯一ID
        String dataId = tuYaReceiverBean.getDataId();
        //其他事件
        String bizCode = tuYaReceiverBean.getBizCode();
        //设备ID
        String devId = tuYaReceiverBean.getDevId();
        //开发者平台产品Key
        String productKey = tuYaReceiverBean.getProductKey();

        //数据对象
        DeviceBean deviceBean = new DeviceBean();
        deviceBean.setDataId(dataId);
        deviceBean.setBizCode(bizCode);
        deviceBean.setDevId(devId);
        deviceBean.setProductKey(productKey);
        //温度、人感、湿度数据上报事件
        if (bizCode == null) {
            deviceBean.setBizCode("");
            //在线状态:1在线0离线
            deviceBean.setDev_status(1);
            //设备上报事件:code:时间类型; 格式:"status":[{"code":"pir","t":1612283849564,"value":"pir"}]
            List<TuYaReceiverBeanStatus> status = tuYaReceiverBean.getStatus();
            for (TuYaReceiverBeanStatus statue : status) {
                //事件上报处理
                initStatue(statue, deviceBean);
            }
        } else {//其他事件上报
            deviceBean.setTime(new SimpleDateFormat("yyyy-MM-dd HH::mm:ss").format(new Date()));
            String name = deviceDao.selectAttendance(devId);
            switch (bizCode) {
                case "offline"://设备掉线
                    log.info("设备: {},已掉线",name);
                    deviceBean.setDev_type(1);
                    deviceBean.setDev_status(0);
                    updateDevStatus(deviceBean);
                    updateStatus(devId, "0");
                    break;
                case "online"://设备上线
                    log.info("设备: {},已上线",name);
                    deviceBean.setDev_status(1);
                    deviceBean.setDev_type(2);
                    updateDevStatus(deviceBean);
                    break;
                case "nameUpdate"://设备名称修改
                    break;
                case "dpNameUpdate"://功能点修改
                    break;
                case "bindUser"://绑定用户
                    break;
                case "delete"://删除设备
                    break;
            }
        }

    }

    private void updateStatus(String devId, String isConnection) {
//        String clintId = deviceDao.selectAttendance(devId);
//        if (clintId.equals(devId)) {
        deviceDao.updateStatus(devId, isConnection, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
//        } else {
//            deviceDao.insert(devId, "-2", isConnection);
//        }

    }

    private void updateDevStatus(DeviceBean deviceBean) {
        deviceBean.setDpId("");
        deviceBean.setName("");
        deviceBean.setUid("");
        if (deviceBean.getValue() == null) deviceBean.setValue("");
        if (deviceBean.getBizCode() == null) deviceBean.setBizCode("");
        if (deviceBean.getDataId() == null) deviceBean.setDataId("");
        if (deviceBean.getCode() == null) deviceBean.setCode("");
        if (deviceBean.getProductKey() == null) deviceBean.setProductKey("");
        log("设备上线,更新历史记录: "+JSON.toJSONString(deviceBean));
        int index = deviceDao.addDevice(deviceBean.getDataId(), deviceBean.getDevId(), deviceBean.getProductKey(), deviceBean.getCode(), deviceBean.getValue()
                , deviceBean.getBizCode(), deviceBean.getTime(), deviceBean.getName(), deviceBean.getDpId(), deviceBean.getUid(), deviceBean.getDev_status(), deviceBean.getDev_type());
        updateStatus(deviceBean.getDevId(), "1");
    }

    /**
     * 数据上报事件处理
     *
     * @param statue
     * @param deviceBean
     */
    private void initStatue(TuYaReceiverBeanStatus statue,DeviceBean deviceBean) {
        //设备消息类型:1:掉线,2:上线,3:其他事件
        deviceBean.setDev_type(3);
        //事件时间
        long t = statue.getT();
        Date date = new Date();
        date.setTime(t);
        deviceBean.setTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date));
        //事件value值
        String value = statue.getValue();
        deviceBean.setValue(value);
        //事件类型:温度va_temperature、湿度va_humidity、人感pir
        // 格式:"status":[{"code":"pir","t":1612283849564,"value":"pir"}],"status":[{"code":"va_temperature","t":1612283809697,"value":"1879"}]
        String code = statue.getCode();
        deviceBean.setCode(code);
        switch (code) {
            case "va_temperature"://温度
                deviceBean.setValue(Integer.parseInt(value) / 100 + "");
                //计算开启或关闭设备
//                calOpenOrCloseDevWithtemper(deviceBean);
                log.info("温度:" + deviceBean.getValue());
                //存储 对应设备对应的温度记录
                ControlUtils.tempDevices.put(deviceBean.getDevId(),Integer.valueOf(deviceBean.getValue()));
                break;
            case "va_humidity"://湿度
                deviceBean.setValue(Integer.parseInt(value) / 100 + "");
                log.info("湿度:" +  deviceBean.getValue());
                break;
            case "pir":
                //人体感应
                log.info("人体感应:" + value);
                if(value.equals("pir")){
                    deviceBean.setValue("有人");
                }else{
                    deviceBean.setValue("无人");
                }
                setClintTime(deviceBean.getDevId());
                try {
                    Thread.sleep(1000);
                    //根据场地是否有人控制空调设备
                    calOpenOrCloseDevWithPir(deviceBean);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                break;
            case "temper_alarm"://防拆报警
                break;
            case "battery_percentage"://电池电量百分比
                log.info("电池电量百分比:" + value);
                break;
            case "battery_value"://电池电量值
                break;
            case "battery"://门磁电池电量值
                log("电池电量值:" + value);
                break;
            case "battery_state"://电池电量状态 {“range”:[“low”,“middle”,“high”]}
                log("电池电量状态:" + value);
                break;
            case "doorcontact_state"://门磁状态
                value = (value.equals("true") ? "开门" : "关门");
                log("门磁状态:" + (value.equals("true") ? "开门" : "关门"));
                break;
            case "switch"://门磁状态
                value = (value.equals("true") ? "开门" : "关门");
                log("开关状态:" + (value.equals("true") ? "开门" : "关门"));
                break;
            case "cur_voltage"://当前电压
//                value = Integer.parseInt(value) / 10 + "V";
                log(deviceBean.getDevId() + " 当前电压:" + value);
                break;
            case "cur_power"://当前功率
//                value = value.equals("0")?"0":Integer.parseInt(value) / 10 + "W";
//                log(devId + " 当前功率:" + Integer.parseInt(value) / 10 + "W");
                break;
            case "cur_current"://当前电流
//                value =  value.equals("0")?"0":Integer.parseInt(value) / 10 + "A";
//                log(devId + " 当前电流:" + Integer.parseInt(value) / 10 + "A");
                break;
            case "router_mgr"://网关路由
                break;
        }
        updateDevStatus(deviceBean);
    }

    /**
     * 根据是否有人控制设备
     *
     * @param deviceBean
     */
    private void calOpenOrCloseDevWithPir(DeviceBean deviceBean) {
//        if (deviceBean.getValue().equals("有人")){
//            deviceBean.setValue("1");
//        } else{
//            deviceBean.setValue("0");
//        }
        //获取人感设备控制
        List<CalDevContrl> calDevContrls = deviceDao.getCalDevContrlWidthDevId(deviceBean.getDevId());
        if (CollectionUtils.isEmpty(calDevContrls)) {
            log.info("未关联人感控制设备, 设备ID: "+ deviceBean.getDevId());
            return;
        }
        ControlUtils.getInstance(deviceDao).conTemper(calDevContrls, deviceBean);
     }

    /**
     * 设置更新上报时间
     * @param clintId
     */
     private void setClintTime(String clintId){
         List<CalDevContrl> calDevContrls = deviceDao.getCalDevContrlWidthDevId(clintId);
         if(!CollectionUtils.isEmpty(calDevContrls)){
             CalDevContrl calDevContrl = calDevContrls.get(0);
             String assDevice = calDevContrl.getAssDevice();
             //红外遥控器
             CalDevBeContrl  hwClint = getCalDevBeCon(assDevice);
             if(hwClint==null) {
                 log.info("未查询到控制红外遥控器");
                 return;
             }
             //红外遥控设备id
             String conDevId = hwClint.getConDevId();
             SchduledTasks.closeDevMap.put(conDevId,new Date());
         }
     }

    private CalDevBeContrl getCalDevBeCon(String assDevice) {
        String[] assDeviceIds = assDevice.split(",");
        List<CalDevBeContrl> devBeContrlList = new ArrayList<>();
        for (int i = 0; i < assDeviceIds.length; i++) {
            String deviceId = assDeviceIds[i];
            //获取温感设备
            List<CalDevContrl> calDevContrl = deviceDao.getTempCalWitdDevid(deviceId);
            if(!CollectionUtils.isEmpty(calDevContrl)){
                for (int j = 0; j < calDevContrl.size(); j++) {
                    //温感设备
                    CalDevContrl calDevContrl1 = calDevContrl.get(j);
                    if (!calDevContrl1.getDevBeId().equals("0")) {
                        String[] calDevBeIdStr = calDevContrl1.getDevBeId().split(",");
                        if (calDevBeIdStr != null) {
                            for (int k = 0; k < calDevBeIdStr.length; k++) {
                                //红外设备
                                CalDevBeContrl calDevBeContrl = deviceDao.getCalDevBeControl(calDevBeIdStr[k]);
                                devBeContrlList.add(calDevBeContrl);
                            }
                        }
                    }
                }
            }
        }
        if(devBeContrlList.size()>0){
            return devBeContrlList.get(0);
        }
        return null;
    }

    /**
     * 根据温度变化控制设备
     *
     * @param deviceBean
     */
    private void calOpenOrCloseDevWithtemper(DeviceBean deviceBean) {

        ControlUtils.tempDevices.put(deviceBean.getDevId(), Integer.valueOf(deviceBean.getValue()));

//        List<CalDevContrl> calDevContrls = deviceDao.getCalDevContrlWidthDevId(deviceBean.getDevId());
//        if (null != calDevContrls && calDevContrls.size() > 0)
//            ControlUtils.getInstance(deviceDao).conTemper(calDevContrls, deviceBean);

    }

    private void log(String content) {
        System.out.println("接收到数据:" + content);
    }
}