package doumeemes.config.rocketmq.listener;
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.aliyun.openservices.ons.api.Action;
|
import com.aliyun.openservices.ons.api.ConsumeContext;
|
import com.aliyun.openservices.ons.api.Message;
|
import com.aliyun.openservices.ons.api.MessageListener;
|
import doumeemes.dao.business.model.Workorder;
|
import doumeemes.service.ext.WorkorderExtService;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Service;
|
|
import java.nio.charset.StandardCharsets;
|
//@Service
|
public class StatisticsMqReceiveListener implements MessageListener {
|
|
@Autowired
|
private WorkorderExtService workorderExtService;
|
private static final Logger logger = LoggerFactory.getLogger(StatisticsMqReceiveListener.class);
|
|
@Override
|
public Action consume(Message message, ConsumeContext consumeContext) {
|
try {
|
//消费消息
|
String msgTag = message.getTag();
|
String msgBody = new String(message.getBody(), StandardCharsets.UTF_8);
|
logger.info("接收到MQ消息 -- Topic:{},tag:{},msgId:{},Key:{},body:{}",
|
message.getTopic(), msgTag, message.getMsgID(), message.getKey(),
|
new String(message.getBody()));
|
try {
|
Workorder model = JSONObject.toJavaObject(JSONObject.parseObject(msgBody),Workorder.class);
|
if(model != null){
|
workorderExtService.statisticNum(model);
|
}
|
}catch (Exception e){
|
logger.error("------------工单及相应计划的数量统计失败---------------"+e.getMessage());
|
}
|
//获取消息重试次数
|
int reconsumeTimes = message.getReconsumeTimes();
|
logger.info("获取消息重试次数为:" + reconsumeTimes);
|
|
//消费成功,继续消费下一条消息
|
return Action.CommitMessage;
|
} catch (Exception e) {
|
logger.error("消费MQ消息失败! msgId:" + message.getMsgID() + " ---Exception:" + e);
|
//消费失败,告知服务器稍后再投递这条消息,继续消费其他消息
|
return Action.ReconsumeLater;
|
}
|
}
|
|
}
|