| package doumeemes.config.rocketmq; | 
| import com.aliyun.openservices.ons.api.MessageListener; | 
| import com.aliyun.openservices.ons.api.PropertyKeyConst; | 
| import com.aliyun.openservices.ons.api.bean.ConsumerBean; | 
| import com.aliyun.openservices.ons.api.bean.Subscription; | 
| import doumeemes.config.rocketmq.listener.StatisticsMqReceiveListener; | 
| import doumeemes.core.utils.Constants; | 
| import org.springframework.beans.factory.annotation.Autowired; | 
| import org.springframework.context.annotation.Bean; | 
| import org.springframework.context.annotation.Configuration; | 
| import org.springframework.stereotype.Component; | 
|   | 
| import java.util.HashMap; | 
| import java.util.Map; | 
| import java.util.Properties; | 
|   | 
| //项目中加上 @Configuration 注解,这样服务启动时consumer也启动了 | 
| //@Configuration | 
| public class ConsumerClient { | 
|   | 
|     @Autowired | 
|     private MqConfig mqConfig; | 
|   | 
|     @Autowired | 
|     private StatisticsMqReceiveListener messageListener; | 
|   | 
|     @Bean(initMethod = "start", destroyMethod = "shutdown") | 
|     public ConsumerBean buildConsumer() { | 
|         ConsumerBean consumerBean = new ConsumerBean(); | 
|         //配置文件 | 
|         Properties properties = mqConfig.getMqPropertie(); | 
|         properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId()); | 
|         //将消费者线程数固定为20个 20为默认值 | 
|         properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20"); | 
|         consumerBean.setProperties(properties); | 
|         //订阅关系 | 
|         Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>(); | 
|         Subscription subscription = new Subscription(); | 
|         subscription.setTopic(mqConfig.getTopic()); | 
|         subscription.setExpression(Constants.MQ_TAG.statistics); | 
|         subscriptionTable.put(subscription, messageListener); | 
|         //订阅多个topic如上面设置 | 
|   | 
|         consumerBean.setSubscriptionTable(subscriptionTable); | 
|         return consumerBean; | 
|     } | 
|   | 
| } |