package com.doumee.config.rocketmq5; import com.alibaba.fastjson.JSONObject; import com.doumee.core.utils.Constants; import com.doumee.dao.business.model.Activity; import com.doumee.service.business.ActivityService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.apis.*; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.SimpleConsumer; import org.apache.rocketmq.client.apis.message.MessageId; import org.apache.rocketmq.client.apis.message.MessageView; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Collections; import java.util.List; @Component @Slf4j public class NormalMsgConsumer { @Autowired public MqConfig mqConfig; @Autowired private ActivityService activityService; private SimpleConsumer consumer ; @Async("asyncPoolTaskExecutor") public void buildConsumer() { final ClientServiceProvider provider = ClientServiceProvider.loadService(); // Credential provider is optional for client configuration. StaticSessionCredentialsProvider staticSessionCredentialsProvider = new StaticSessionCredentialsProvider(mqConfig.getUsername(), mqConfig.getPassword()); String endpoints = mqConfig.getNameSrvAddr(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setCredentialProvider(staticSessionCredentialsProvider) .build(); String consumerGroup = mqConfig.getGroupId(); Duration awaitDuration = Duration.ofSeconds(30); String topic = mqConfig.getTopic(); String tag = ""; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); // In most case, you don't need to create too many consumers, singleton pattern is recommended. do { try { log.info("start rocketmq connect....."); consumer = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) // Set the consumer group name. .setConsumerGroup(consumerGroup) // set await duration for long-polling. .setAwaitDuration(awaitDuration) // Set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); }catch (Exception e){ e.printStackTrace(); } }while (consumer==null); try { startService(); }catch (Exception e){ e.printStackTrace(); } } public void startService() throws Exception { if(consumer==null){ return; } // Max message num for each long polling. int maxMessageNum = 16; // Set message invisible duration after it is received. Duration invisibleDuration = Duration.ofSeconds(15); // Receive message, multi-threading is more recommended. do { try { final List messages = consumer.receive(maxMessageNum, invisibleDuration); log.info("Received {} message(s)", messages.size()); for (MessageView message : messages) { final MessageId messageId = message.getMessageId(); try { if(doBusiness(message)){ consumer.ack(message); log.info("Message is acknowledged successfully, messageId={}", messageId); }else{ log.error("Message messageId={}=======业务处理失败", messageId); } } catch (Throwable t) { log.error("Message is failed to be acknowledged, messageId={}", messageId, t); } } }catch (Exception e){ e.printStackTrace(); } } while (true); // Close the simple consumer when you don't need it anymore. // consumer.close(); } /** * 业务处理 * @param message */ private boolean doBusiness(MessageView message) { try { String tag =(String) message.getTag().get(); String body = StandardCharsets.UTF_8.decode(message.getBody()).toString(); if(StringUtils.equals(tag, Constants.MQ_TAG.activity)){ Activity model = JSONObject.toJavaObject(JSONObject.parseObject(body),Activity.class); if(model != null){ activityService.updateLooknum(model); } } return true; }catch (Exception e){ return false; } } }