MyRunnerableInt.java 15.5 KB
package com.example.mypulsar;

import com.alibaba.fastjson.JSON;
import com.example.mypulsar.bean.*;
import com.example.mypulsar.dao.DeviceDao;
import com.example.mypulsar.message.MessageVO;
import com.example.mypulsar.mq.AESBase64Utils;
import com.example.mypulsar.mq.MessageHandlerTask;
import com.example.mypulsar.mq.MqConsumer;
import com.example.mypulsar.mqtt.MqttConsumer;
import com.example.mypulsar.task.SchduledTasks;
import com.example.mypulsar.utils.ControlUtils;
import com.example.mypulsar.utils.JsonUtils;
import com.example.mypulsar.utils.PulsarConsumerPoolFactory;
import com.example.mypulsar.utils.ThreadPoolFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.function.BiConsumer;

@Slf4j
@Component
public class MyRunnerableInt implements ApplicationRunner {

    private static final Logger logger = LoggerFactory.getLogger(MqConsumer.class);

    @Autowired
    DeviceDao deviceDao;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        initMqConsumer();
    }

    /**
     * 第一种方式
     */
    private void initPool() {
        PulsarConsumerPoolFactory.getConsumerPool().forEach(new BiConsumer<Integer, Consumer>() {
            @Override
            public void accept(Integer consumerNum, Consumer consumer) {
                ThreadPoolFactory.getCustomThreadPool().submit(new MessageHandlerTask(consumerNum, consumer));
            }
        });
    }

    String url = "pulsar+ssl://mqe.tuyacn.com:7285/";
    String accessId = "thddfrm97kydyq9wrc8u";
    String accessKey = "tqcuypeytayths9pudcn937wfupugc3t";

    /**
     * 第二种方式
     */
    private void initMqConsumer() {
        MqConsumer mqConsumer = MqConsumer.build()
                .serviceUrl(url)
                .accessId(accessId)
                .accessKey(accessKey)
                .maxRedeliverCount(3)
                .messageListener(message -> {
                    //消息内容
                    String jsonMessage = new String(message.getData());
                    MessageVO vo = JSON.parseObject(jsonMessage, MessageVO.class);
                    //通过 AES (ECB 模式)对 accessKey 的中间 16 位代码进行解密
                    String data = AESBase64Utils.decrypt(vo.getData(), accessKey.substring(8, 24));
                    TuYaReceiverBean tuYaReceiverBean = JSON.parseObject(data, TuYaReceiverBean.class);
                    log.info("接收消息,消息内容:"+ JSON.toJSONString(tuYaReceiverBean));
                    //处理数据上报
                    detealData(tuYaReceiverBean);
                });
        try {
            mqConsumer.start();
        } catch (Exception e) {
            logger.error("start error:",e);
        }
    }

    /**
     * 处理接收的数据
     *
     * @param tuYaReceiverBean
     */
    private void detealData(TuYaReceiverBean tuYaReceiverBean) {
        //涂鸦全局唯一ID
        String dataId = tuYaReceiverBean.getDataId();
        //其他事件
        String bizCode = tuYaReceiverBean.getBizCode();
        //设备ID
        String devId = tuYaReceiverBean.getDevId();
        //开发者平台产品Key
        String productKey = tuYaReceiverBean.getProductKey();

        //数据对象
        DeviceBean deviceBean = new DeviceBean();
        deviceBean.setDataId(dataId);
        deviceBean.setBizCode(bizCode);
        deviceBean.setDevId(devId);
        deviceBean.setProductKey(productKey);
        Wl_Attendace attendace = deviceDao.selectAttendance(devId);
        String deviceName = attendace ==null?"":attendace.getName();
        Integer schoolId = attendace ==null?null:attendace.getSchool_id();
        //温度、人感、湿度数据上报事件
        if (StringUtils.isEmpty(bizCode)) {
            //在线状态:1在线0离线
            deviceBean.setDev_status(1);
            //设备上报事件:code:时间类型; 格式:"status":[{"code":"pir","t":1612283849564,"value":"pir"}]
            List<TuYaReceiverBeanStatus> status = tuYaReceiverBean.getStatus();
            for (TuYaReceiverBeanStatus statue : status) {
                //事件上报处理
                initStatue(statue, deviceBean,schoolId);
            }
        } else {//其他事件上报
            deviceBean.setTime(new SimpleDateFormat("yyyy-MM-dd HH::mm:ss").format(new Date()));
            switch (bizCode) {
                case "offline"://设备掉线
                    log.info("设备: {},已掉线",deviceName);
                    deviceBean.setDev_type(1);
                    deviceBean.setDev_status(0);
                    updateDevStatus(deviceBean);
                    updateStatus(devId, "0");
                    break;
                case "online"://设备上线
                    log.info("设备: {},已上线",deviceName);
                    deviceBean.setDev_status(1);
                    deviceBean.setDev_type(2);
                    updateDevStatus(deviceBean);
                    break;
                case "nameUpdate"://设备名称修改
                    break;
                case "dpNameUpdate"://功能点修改
                    break;
                case "bindUser"://绑定用户
                    break;
                case "delete"://删除设备
                    break;
            }
            if(schoolId !=null && schoolId.intValue()==1){
                //同步数据至小程序
                sendMsg(deviceBean);
            }
        }

    }

    private void updateStatus(String devId, String isConnection) {
//        String clintId = deviceDao.selectAttendance(devId);
//        if (clintId.equals(devId)) {
        deviceDao.updateStatus(devId, isConnection, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
//        } else {
//            deviceDao.insert(devId, "-2", isConnection);
//        }

    }

    private void updateDevStatus(DeviceBean deviceBean) {
        deviceBean.setDpId("");
        deviceBean.setName("");
        deviceBean.setUid("");
        if (deviceBean.getValue() == null) deviceBean.setValue("");
        if (deviceBean.getBizCode() == null) deviceBean.setBizCode("");
        if (deviceBean.getDataId() == null) deviceBean.setDataId("");
        if (deviceBean.getCode() == null) deviceBean.setCode("");
        if (deviceBean.getProductKey() == null) deviceBean.setProductKey("");
        log("设备上线,更新历史记录: "+JSON.toJSONString(deviceBean));
        int index = deviceDao.addDevice(deviceBean.getDataId(), deviceBean.getDevId(), deviceBean.getProductKey(), deviceBean.getCode(), deviceBean.getValue()
                , deviceBean.getBizCode(), deviceBean.getTime(), deviceBean.getName(), deviceBean.getDpId(), deviceBean.getUid(), deviceBean.getDev_status(), deviceBean.getDev_type());
        updateStatus(deviceBean.getDevId(), "1");
    }


    private MqttConsumer mqttConsumer;
    private String accesskey = "T6dtGFMEs35U4la176032PCis5q6em3h";
    /**
     * 数据上报事件处理
     *
     * @param statue
     * @param deviceBean
     */
    private void initStatue(TuYaReceiverBeanStatus statue,DeviceBean deviceBean,Integer schoolId) {
        //设备消息类型:1:掉线,2:上线,3:其他事件
        deviceBean.setDev_type(3);
        //事件时间
        long t = statue.getT();
        Date date = new Date();
        date.setTime(t);
        deviceBean.setTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date));
        //事件value值
        String value = statue.getValue();
        deviceBean.setValue(value);
        //事件类型:温度va_temperature、湿度va_humidity、人感pir
        // 格式:"status":[{"code":"pir","t":1612283849564,"value":"pir"}],"status":[{"code":"va_temperature","t":1612283809697,"value":"1879"}]
        String code = statue.getCode();
        deviceBean.setCode(code);
        switch (code) {
            case "va_temperature"://温度
                deviceBean.setValue(Integer.parseInt(value) / 100 + "");
                log.info("温度:" + deviceBean.getValue());
                //存储 对应设备对应的温度记录
                ControlUtils.tempDevices.put(deviceBean.getDevId(),Integer.valueOf(deviceBean.getValue()));
                break;
            case "va_humidity"://湿度
                deviceBean.setValue(Integer.parseInt(value) / 100 + "");
                log.info("湿度:" +  deviceBean.getValue());
                break;
            case "pir"://人体感应
                log.info("人体感应:" + value);
                if(value.equals("pir")){
                    deviceBean.setValue("有人");
                }else{
                    deviceBean.setValue("无人");
                }
                try {
                    Thread.sleep(1000);
                    //根据场地是否有人控制空调设备
                    calOpenOrCloseDevWithPir(deviceBean);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                break;
            case "battery_value"://电池电量值
                break;
            case "battery"://门磁电池电量值
                log("电池电量值:" + value);
                break;
            case "battery_state"://电池电量状态 {“range”:[“low”,“middle”,“high”]}
                log("电池电量状态:" + value);
                break;
            case "doorcontact_state"://门磁状态
                value = (value.equals("true") ? "开门" : "关门");
                log("门磁状态:" + (value.equals("true") ? "开门" : "关门"));
                break;
            case "switch"://门磁状态
                value = (value.equals("true") ? "开门" : "关门");
                log("开关状态:" + (value.equals("true") ? "开门" : "关门"));
                break;
            case "cur_voltage"://当前电压
                if(value.equals("0")){
                    value = "0";
                }else{
                    value = Integer.parseInt(value) / 10 + "";
                }
                log(deviceBean.getDevId() + " 当前电压:" + value +"v");
                break;
            case "cur_power"://当前功率
                if(value.equals("0")){
                    value = "0";
                }else{
                    value = Integer.parseInt(value) / 10+"";
                }
                log(deviceBean.getDevId() + " 当前功率:" + value + "W");
                break;
            case "cur_current"://当前电流
                if(value.equals("0")){
                    value = "0";
                }else{
                    value = Integer.parseInt(value) / 10+"";
                }
                log(deviceBean.getDevId() + " 当前电流:" + value + "A");
                break;
            case "smoke_sensor_state"://烟雾报警
                break;
            case "temper_alarm"://防拆报警
                break;
            case "battery_percentage"://电池电量百分比
                log.info("电池电量百分比:" + value);
                break;
            case "router_mgr"://网关路由
                break;
        }
        updateDevStatus(deviceBean);
        if(schoolId !=null && schoolId.intValue()==16){
            //同步数据至小程序
            sendMsg(deviceBean);
        }
    }

    private void sendMsg(DeviceBean deviceBean){
        String json = JsonUtils.nonDefaultMapper().toJson(deviceBean);
        String data ="";
        try{
            data= AESBase64Utils.encrypt(json,accesskey.substring(8, 24));
        }catch (Exception e){
            log.error("加密失败: ",e);
        }
        if(mqttConsumer == null){
            mqttConsumer = new MqttConsumer();
            mqttConsumer.init();
        }
        mqttConsumer.publish("Topic_Quene_Test",data);
    }

    /**
     * 根据是否有人控制设备
     *
     * @param deviceBean
     */
    private void calOpenOrCloseDevWithPir(DeviceBean deviceBean) {
        //获取人感设备控制
        List<CalDevContrl> calDevContrls = deviceDao.getCalDevContrlWidthDevId(deviceBean.getDevId());
        if (CollectionUtils.isEmpty(calDevContrls)) {
            log.info("未关联人感控制设备, 设备ID: "+ deviceBean.getDevId());
            return;
        }
        ControlUtils.getInstance(deviceDao).conTemper(calDevContrls, deviceBean);
     }

    /**
     * 设置更新上报时间
     * @param clintId
     */
     private void setClintTime(String clintId){
         //获取人感设备
         List<CalDevContrl> calDevContrls = deviceDao.getCalDevContrlWidthDevId(clintId);
         if(!CollectionUtils.isEmpty(calDevContrls)){
             CalDevContrl calDevContrl = calDevContrls.get(0);
             String assDevice = calDevContrl.getAssDevice();
             //获取人感关联红外遥控器
             CalDevBeContrl  hwClint = getCalDevBeCon(assDevice);
             if(hwClint==null) {
                 log.info("未查询到控制红外遥控器");
                 return;
             }
             //红外遥控设备id
             String conDevId = hwClint.getConDevId();
             SchduledTasks.closeDevMap.put(conDevId,new Date());
         }
     }

    private CalDevBeContrl getCalDevBeCon(String assDevice) {
        String[] assDeviceIds = assDevice.split(",");
        List<CalDevBeContrl> devBeContrlList = new ArrayList<>();
        for (int i = 0; i < assDeviceIds.length; i++) {
            String deviceId = assDeviceIds[i];
            //获取温感设备
            List<CalDevContrl> calDevContrl = deviceDao.getTempCalWitdDevid(deviceId);
            if(!CollectionUtils.isEmpty(calDevContrl)){
                for (int j = 0; j < calDevContrl.size(); j++) {
                    //温感设备
                    CalDevContrl calDevContrl1 = calDevContrl.get(j);
                    if (!calDevContrl1.getDevBeId().equals("0")) {
                        String[] calDevBeIdStr = calDevContrl1.getDevBeId().split(",");
                        if (calDevBeIdStr != null) {
                            for (int k = 0; k < calDevBeIdStr.length; k++) {
                                //红外设备
                                CalDevBeContrl calDevBeContrl = deviceDao.getCalDevBeControl(calDevBeIdStr[k]);
                                devBeContrlList.add(calDevBeContrl);
                            }
                        }
                    }
                }
            }
        }
        if(devBeContrlList.size()>0){
            return devBeContrlList.get(0);
        }
        return null;
    }

    /**
     * 根据温度变化控制设备
     *
     * @param deviceBean
     */
    private void calOpenOrCloseDevWithtemper(DeviceBean deviceBean) {

        ControlUtils.tempDevices.put(deviceBean.getDevId(), Integer.valueOf(deviceBean.getValue()));

//        List<CalDevContrl> calDevContrls = deviceDao.getCalDevContrlWidthDevId(deviceBean.getDevId());
//        if (null != calDevContrls && calDevContrls.size() > 0)
//            ControlUtils.getInstance(deviceDao).conTemper(calDevContrls, deviceBean);

    }

    private void log(String content) {
        System.out.println("接收到数据:" + content);
    }
}