jiangping
2023-08-23 deaa638544da41dbc8b46f158e600e74ebbb26e3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package doumeemes.config.rocketmq.listener;
 
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import doumeemes.dao.business.model.Workorder;
import doumeemes.service.ext.WorkorderExtService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
 
import java.nio.charset.StandardCharsets;
//@Service
public class StatisticsMqReceiveListener implements MessageListener {
 
    @Autowired
    private WorkorderExtService workorderExtService;
    private static final Logger logger = LoggerFactory.getLogger(StatisticsMqReceiveListener.class);
 
    @Override
    public Action consume(Message message, ConsumeContext consumeContext) {
        try {
            //消费消息
            String msgTag = message.getTag();
            String msgBody = new String(message.getBody(), StandardCharsets.UTF_8);
            logger.info("接收到MQ消息 -- Topic:{},tag:{},msgId:{},Key:{},body:{}",
                    message.getTopic(), msgTag, message.getMsgID(), message.getKey(),
                    new String(message.getBody()));
            try {
                Workorder model = JSONObject.toJavaObject(JSONObject.parseObject(msgBody),Workorder.class);
                if(model != null){
                    workorderExtService.statisticNum(model);
                }
            }catch (Exception e){
                logger.error("------------工单及相应计划的数量统计失败---------------"+e.getMessage());
            }
            //获取消息重试次数
            int reconsumeTimes = message.getReconsumeTimes();
            logger.info("获取消息重试次数为:" + reconsumeTimes);
 
            //消费成功,继续消费下一条消息
            return Action.CommitMessage;
        } catch (Exception e) {
            logger.error("消费MQ消息失败! msgId:" + message.getMsgID() + " ---Exception:" + e);
            //消费失败,告知服务器稍后再投递这条消息,继续消费其他消息
            return Action.ReconsumeLater;
        }
    }
 
}