jiangping
2023-08-10 e3523883aadf423cb78647c38f0648c2143d2a89
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package doumeemes.config.rocketmq;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import doumeemes.config.rocketmq.listener.StatisticsMqReceiveListener;
import doumeemes.core.utils.Constants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
 
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
 
//项目中加上 @Configuration 注解,这样服务启动时consumer也启动了
//@Configuration
public class ConsumerClient {
 
    @Autowired
    private MqConfig mqConfig;
 
    @Autowired
    private StatisticsMqReceiveListener messageListener;
 
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        //配置文件
        Properties properties = mqConfig.getMqPropertie();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
        //将消费者线程数固定为20个 20为默认值
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
        consumerBean.setProperties(properties);
        //订阅关系
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
        Subscription subscription = new Subscription();
        subscription.setTopic(mqConfig.getTopic());
        subscription.setExpression(Constants.MQ_TAG.statistics);
        subscriptionTable.put(subscription, messageListener);
        //订阅多个topic如上面设置
 
        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }
 
}