package doumeemes.config.rocketmq; import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.bean.ConsumerBean; import com.aliyun.openservices.ons.api.bean.Subscription; import doumeemes.config.rocketmq.listener.StatisticsMqReceiveListener; import doumeemes.core.utils.Constants; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.Properties; //项目中加上 @Configuration 注解,这样服务启动时consumer也启动了 //@Configuration public class ConsumerClient { @Autowired private MqConfig mqConfig; @Autowired private StatisticsMqReceiveListener messageListener; @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean buildConsumer() { ConsumerBean consumerBean = new ConsumerBean(); //配置文件 Properties properties = mqConfig.getMqPropertie(); properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId()); //将消费者线程数固定为20个 20为默认值 properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20"); consumerBean.setProperties(properties); //订阅关系 Map subscriptionTable = new HashMap(); Subscription subscription = new Subscription(); subscription.setTopic(mqConfig.getTopic()); subscription.setExpression(Constants.MQ_TAG.statistics); subscriptionTable.put(subscription, messageListener); //订阅多个topic如上面设置 consumerBean.setSubscriptionTable(subscriptionTable); return consumerBean; } }