doum
2025-12-11 3cd92951fd2a67a02e649a870d100b3e8776ae11
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package com.doumee.config.rocketmq5;
 
 
import com.alibaba.fastjson.JSONObject;
import com.doumee.core.utils.Constants;
import com.doumee.dao.business.model.Activity;
import com.doumee.service.business.ActivityService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
 
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
 
@Component
@Slf4j
public class NormalMsgConsumer {
    @Autowired
    public  MqConfig mqConfig;
 
    @Autowired
    private ActivityService activityService;
 
    private  SimpleConsumer consumer ;
 
    @Async("asyncPoolTaskExecutor")
    public void buildConsumer()  {
            final ClientServiceProvider provider = ClientServiceProvider.loadService();
 
            // Credential provider is optional for client configuration.
            StaticSessionCredentialsProvider staticSessionCredentialsProvider =
                    new StaticSessionCredentialsProvider(mqConfig.getUsername(), mqConfig.getPassword());
            String endpoints = mqConfig.getNameSrvAddr();
            ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                    .setEndpoints(endpoints)
                    .setCredentialProvider(staticSessionCredentialsProvider)
                    .build();
            String consumerGroup = mqConfig.getGroupId();
            Duration awaitDuration = Duration.ofSeconds(30);
            String topic = mqConfig.getTopic();
            String tag = "";
            FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
            // In most case, you don't need to create too many consumers, singleton pattern is recommended.
            do {
                try {
                    log.info("start rocketmq connect.....");
                    consumer = provider.newSimpleConsumerBuilder()
                            .setClientConfiguration(clientConfiguration)
                            // Set the consumer group name.
                            .setConsumerGroup(consumerGroup)
                            // set await duration for long-polling.
                            .setAwaitDuration(awaitDuration)
                            // Set the subscription for the consumer.
                            .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                            .build();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }while (consumer==null);
            try {
                startService();
            }catch (Exception e){
                e.printStackTrace();
            }
    }
 
 
    public  void startService() throws  Exception {
        if(consumer==null){
            return;
        }
        // Max message num for each long polling.
        int maxMessageNum = 16;
        // Set message invisible duration after it is received.
        Duration invisibleDuration = Duration.ofSeconds(15);
        // Receive message, multi-threading is more recommended.
        do {
            try {
                final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
                log.info("Received {} message(s)", messages.size());
                for (MessageView message : messages) {
                    final MessageId messageId = message.getMessageId();
                    try {
                        if(doBusiness(message)){
                            consumer.ack(message);
                            log.info("Message is acknowledged successfully, messageId={}", messageId);
                        }else{
                            log.error("Message messageId={}=======业务处理失败", messageId);
                        }
                    } catch (Throwable t) {
                        log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        } while (true);
        // Close the simple consumer when you don't need it anymore.
        // consumer.close();
    }
 
    /**
     * 业务处理
     * @param message
     */
    private boolean doBusiness(MessageView message) {
        try {
            String tag =(String) message.getTag().get();
            String body = StandardCharsets.UTF_8.decode(message.getBody()).toString();
            if(StringUtils.equals(tag, Constants.MQ_TAG.activity)){
                Activity model = JSONObject.toJavaObject(JSONObject.parseObject(body),Activity.class);
                if(model != null){
                    activityService.updateLooknum(model);
                }
            }
            return true;
        }catch (Exception e){
            return false;
        }
 
    }
 
}