package com.doumee.config.rocketmq5;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.doumee.core.utils.Constants;
|
import com.doumee.dao.business.model.Activity;
|
import com.doumee.service.business.ActivityService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.StringUtils;
|
import org.apache.rocketmq.client.apis.*;
|
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
|
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
|
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
|
import org.apache.rocketmq.client.apis.message.MessageId;
|
import org.apache.rocketmq.client.apis.message.MessageView;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Service;
|
|
import javax.annotation.PostConstruct;
|
import java.nio.charset.StandardCharsets;
|
import java.time.Duration;
|
import java.util.Collections;
|
import java.util.List;
|
|
@Component
|
@Slf4j
|
public class NormalMsgConsumer {
|
@Autowired
|
public MqConfig mqConfig;
|
|
@Autowired
|
private ActivityService activityService;
|
|
private SimpleConsumer consumer ;
|
|
@Async("asyncPoolTaskExecutor")
|
public void buildConsumer() {
|
final ClientServiceProvider provider = ClientServiceProvider.loadService();
|
|
// Credential provider is optional for client configuration.
|
StaticSessionCredentialsProvider staticSessionCredentialsProvider =
|
new StaticSessionCredentialsProvider(mqConfig.getUsername(), mqConfig.getPassword());
|
String endpoints = mqConfig.getNameSrvAddr();
|
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
|
.setEndpoints(endpoints)
|
.setCredentialProvider(staticSessionCredentialsProvider)
|
.build();
|
String consumerGroup = mqConfig.getGroupId();
|
Duration awaitDuration = Duration.ofSeconds(30);
|
String topic = mqConfig.getTopic();
|
String tag = "";
|
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
|
// In most case, you don't need to create too many consumers, singleton pattern is recommended.
|
do {
|
try {
|
log.info("start rocketmq connect.....");
|
consumer = provider.newSimpleConsumerBuilder()
|
.setClientConfiguration(clientConfiguration)
|
// Set the consumer group name.
|
.setConsumerGroup(consumerGroup)
|
// set await duration for long-polling.
|
.setAwaitDuration(awaitDuration)
|
// Set the subscription for the consumer.
|
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
|
.build();
|
}catch (Exception e){
|
e.printStackTrace();
|
}
|
}while (consumer==null);
|
try {
|
startService();
|
}catch (Exception e){
|
e.printStackTrace();
|
}
|
}
|
|
|
public void startService() throws Exception {
|
if(consumer==null){
|
return;
|
}
|
// Max message num for each long polling.
|
int maxMessageNum = 16;
|
// Set message invisible duration after it is received.
|
Duration invisibleDuration = Duration.ofSeconds(15);
|
// Receive message, multi-threading is more recommended.
|
do {
|
try {
|
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
|
log.info("Received {} message(s)", messages.size());
|
for (MessageView message : messages) {
|
final MessageId messageId = message.getMessageId();
|
try {
|
if(doBusiness(message)){
|
consumer.ack(message);
|
log.info("Message is acknowledged successfully, messageId={}", messageId);
|
}else{
|
log.error("Message messageId={}=======业务处理失败", messageId);
|
}
|
} catch (Throwable t) {
|
log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
|
}
|
}
|
}catch (Exception e){
|
e.printStackTrace();
|
}
|
} while (true);
|
// Close the simple consumer when you don't need it anymore.
|
// consumer.close();
|
}
|
|
/**
|
* 业务处理
|
* @param message
|
*/
|
private boolean doBusiness(MessageView message) {
|
try {
|
String tag =(String) message.getTag().get();
|
String body = StandardCharsets.UTF_8.decode(message.getBody()).toString();
|
if(StringUtils.equals(tag, Constants.MQ_TAG.activity)){
|
Activity model = JSONObject.toJavaObject(JSONObject.parseObject(body),Activity.class);
|
if(model != null){
|
activityService.updateLooknum(model);
|
}
|
}
|
return true;
|
}catch (Exception e){
|
return false;
|
}
|
|
}
|
|
}
|