MessageHandlerTask.java
2.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.example.mypulsar.mq;
import com.alibaba.fastjson.JSON;
import com.example.mypulsar.message.MessageVO;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*/
public class MessageHandlerTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(MessageHandlerTask.class);
private Consumer consumer;
private Integer consumerNum;
public MessageHandlerTask(Integer consumerNum, Consumer consumer) {
this.consumerNum = consumerNum;
this.consumer = consumer;
}
@Override
public void run() {
do {
try {
Message message = consumer.receive();
Long start = System.currentTimeMillis();
// TODO: 2019/9/9 注释日志
/*System.out.println("----------------consumerNum:" + consumerNum + ";consumerName:" + consumer.getConsumerName() + "--------------------");
System.out.println("Message Received:" + new String(message.getData()) + ",seq="
+ message.getSequenceId() + ",time=" + message.getPublishTime() + ",consumed time="
+ System.currentTimeMillis() + ",partition="
+ ((TopicMessageIdImpl) message.getMessageId()).getTopicPartitionName());*/
String jsonMessage = new String(message.getData());
MessageVO vo = JSON.parseObject(jsonMessage, MessageVO.class);
// System.out.println("Decrypt Message Data:" + AESBase64Utils.decrypt(vo.getData(), MqConfigs.accessKey.substring(8, 24)));
Long businessCost = System.currentTimeMillis() - start;
Long ackStart = System.currentTimeMillis();
consumer.acknowledge(message);
// System.out.println("Cost Time: messageKey=" + message.getKey() + "; business process message cost = " + businessCost + " ms; msg ack cost = " + (System.currentTimeMillis() - ackStart) + " ms");
} catch (Throwable t) {
// System.out.println("error:" + t);
}
} while (true);
}
}