package doumeemes.config.rocketmq.listener; import com.alibaba.fastjson.JSONObject; import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import doumeemes.dao.business.model.Workorder; import doumeemes.service.ext.WorkorderExtService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; //@Service public class StatisticsMqReceiveListener implements MessageListener { @Autowired private WorkorderExtService workorderExtService; private static final Logger logger = LoggerFactory.getLogger(StatisticsMqReceiveListener.class); @Override public Action consume(Message message, ConsumeContext consumeContext) { try { //消费消息 String msgTag = message.getTag(); String msgBody = new String(message.getBody(), StandardCharsets.UTF_8); logger.info("接收到MQ消息 -- Topic:{},tag:{},msgId:{},Key:{},body:{}", message.getTopic(), msgTag, message.getMsgID(), message.getKey(), new String(message.getBody())); try { Workorder model = JSONObject.toJavaObject(JSONObject.parseObject(msgBody),Workorder.class); if(model != null){ workorderExtService.statisticNum(model); } }catch (Exception e){ logger.error("------------工单及相应计划的数量统计失败---------------"+e.getMessage()); } //获取消息重试次数 int reconsumeTimes = message.getReconsumeTimes(); logger.info("获取消息重试次数为:" + reconsumeTimes); //消费成功,继续消费下一条消息 return Action.CommitMessage; } catch (Exception e) { logger.error("消费MQ消息失败! msgId:" + message.getMsgID() + " ---Exception:" + e); //消费失败,告知服务器稍后再投递这条消息,继续消费其他消息 return Action.ReconsumeLater; } } }