MessageHandlerTask.java 2.21 KB
package com.example.mypulsar.mq;

import com.alibaba.fastjson.JSON;
import com.example.mypulsar.message.MessageVO;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 */
public class MessageHandlerTask implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(MessageHandlerTask.class);

    private Consumer consumer;
    private Integer consumerNum;

    public MessageHandlerTask(Integer consumerNum, Consumer consumer) {
        this.consumerNum = consumerNum;
        this.consumer = consumer;
    }

    @Override
    public void run() {
        do {
            try {
                Message message = consumer.receive();
                Long start = System.currentTimeMillis();
                // TODO: 2019/9/9 注释日志
                /*System.out.println("----------------consumerNum:" + consumerNum + ";consumerName:" + consumer.getConsumerName() + "--------------------");
                System.out.println("Message Received:" + new String(message.getData()) + ",seq="
                        + message.getSequenceId() + ",time=" + message.getPublishTime() + ",consumed time="
                        + System.currentTimeMillis() + ",partition="
                        + ((TopicMessageIdImpl) message.getMessageId()).getTopicPartitionName());*/
                String jsonMessage = new String(message.getData());
                MessageVO vo = JSON.parseObject(jsonMessage, MessageVO.class);
//                System.out.println("Decrypt Message Data:" + AESBase64Utils.decrypt(vo.getData(), MqConfigs.accessKey.substring(8, 24)));

                Long businessCost = System.currentTimeMillis() - start;
                Long ackStart = System.currentTimeMillis();
                consumer.acknowledge(message);
//                System.out.println("Cost Time: messageKey=" + message.getKey() + "; business process message cost = " + businessCost + " ms; msg ack cost = " + (System.currentTimeMillis() - ackStart) + " ms");
            } catch (Throwable t) {
//                System.out.println("error:" + t);
            }
        } while (true);
    }
}