引入依赖 <!-- rocketmq spring boot starter--> <dependency> <groupid>org.apache.rocketmq</groupid> <artifactid>rocketmq-spring-boot-starter</artifactid> <version>2.0.4</version> </dependency>
消费者代码@rocketmqmessagelistener(consumergroup = "${rocketmq.group}",topic ="${rocketmq.topic}",selectorexpression = "${rocketmq.selectorexpression}")public class consumer implements rocketmqlistener<string> { @override public void onmessage(string s) { system.out.println("消费到的数据为:"+s); }}
问题排查rocketmqmessagelistener整个注解默认selectorexpression为*,表示接收当前topic下的所有数据,如果我们想对tags进行动态配置,在使用${rocketmq.selectorexpression}表达式时会发现所有数据全被过滤了,跟踪源码(listenercontainerconfiguration.java)发现在创建listener时selectorexpression的数据在通environment环境变量中获取对应的数据后又被覆盖了,导致整个过滤条件被变更为表达式。
@override public void aftersingletonsinstantiated() { // 获取所有所有使用了rocketmqmessagelistener注解的bean map<string, object> beans = this.applicationcontext.getbeanswithannotation(rocketmqmessagelistener.class); if (objects.nonnull(beans)) { // 循环注册容器 beans.foreach(this::registercontainer); } } private void registercontainer(string beanname, object bean) { class<?> clazz = aopproxyutils.ultimatetargetclass(bean); // 校验当前bean是否实现了rocketmqlistener接口 if (!rocketmqlistener.class.isassignablefrom(bean.getclass())) { throw new illegalstateexception(clazz + " is not instance of " + rocketmqlistener.class.getname()); } // 获取bean上的annotation rocketmqmessagelistener annotation = clazz.getannotation(rocketmqmessagelistener.class); // 解析group及topic,可支持表达式 string consumergroup = this.environment.resolveplaceholders(annotation.consumergroup()); string topic = this.environment.resolveplaceholders(annotation.topic()); boolean listenerenabled = (boolean)rocketmqproperties.getconsumer().getlisteners().getordefault(consumergroup, collections.empty_map) .getordefault(topic, true); if (!listenerenabled) { log.debug( "consumer listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.", consumergroup, topic); return; } validate(annotation); string containerbeanname = string.format("%s_%s", defaultrocketmqlistenercontainer.class.getname(), counter.incrementandget()); genericapplicationcontext genericapplicationcontext = (genericapplicationcontext)applicationcontext; // 注册bean的,调用createrocketmqlistenercontainer genericapplicationcontext.registerbean(containerbeanname, defaultrocketmqlistenercontainer.class, () -> createrocketmqlistenercontainer(containerbeanname, bean, annotation)); defaultrocketmqlistenercontainer container = genericapplicationcontext.getbean(containerbeanname, defaultrocketmqlistenercontainer.class); if (!container.isrunning()) { try { container.start(); } catch (exception e) { log.error("started container failed. {}", container, e); throw new runtimeexception(e); } } log.info("register the listener to container, listenerbeanname:{}, containerbeanname:{}", beanname, containerbeanname); } private defaultrocketmqlistenercontainer createrocketmqlistenercontainer(string name, object bean, rocketmqmessagelistener annotation) { defaultrocketmqlistenercontainer container = new defaultrocketmqlistenercontainer(); container.setrocketmqmessagelistener(annotation); string nameserver = environment.resolveplaceholders(annotation.nameserver()); nameserver = stringutils.isempty(nameserver) ? rocketmqproperties.getnameserver() : nameserver; string accesschannel = environment.resolveplaceholders(annotation.accesschannel()); container.setnameserver(nameserver); if (!stringutils.isempty(accesschannel)) { container.setaccesschannel(accesschannel.valueof(accesschannel)); } container.settopic(environment.resolveplaceholders(annotation.topic())); // 此处已经根据表达式将数据取出 string tags = environment.resolveplaceholders(annotation.selectorexpression()); if (!stringutils.isempty(tags)) { container.setselectorexpression(tags); } container.setconsumergroup(environment.resolveplaceholders(annotation.consumergroup())); // 此处将selectorexpression的数据覆盖成了表达式 container.setrocketmqmessagelistener(annotation); container.setrocketmqlistener((rocketmqlistener)bean); container.setobjectmapper(objectmapper); container.setmessageconverter(rocketmqmessageconverter.getmessageconverter()); container.setname(name); // review me, use the same clientid or multiple? return container; }
问题解决因为listenercontainerconfiguration类是实现了smartinitializingsingleton接口的aftersingletonsinstantiated方法,我们可以通过反射对selectorexpression的数据在listenercontainerconfiguration进行初始化前进行解析并赋值回去。
/** * 在springboot初始化后,rocketmq容器初始化前利用反射动态改变数据**/@configurationpublic class changeselectorexpressionbeforemqinit implements initializingbean { @autowired private applicationcontext applicationcontext; @autowired private standardenvironment environment; @override public void afterpropertiesset() throws exception { map<string,object> beans =applicationcontext.getbeanswithannotation(rocketmqmessagelistener.class); for (object bean : beans.values()){ class<?> clazz = aopproxyutils.ultimatetargetclass(bean); if (!rocketmqlistener.class.isassignablefrom(bean.getclass())) { continue; } rocketmqmessagelistener annotation = clazz.getannotation(rocketmqmessagelistener.class); invocationhandler invocationhandler = proxy.getinvocationhandler(annotation); field field = invocationhandler.getclass().getdeclaredfield("membervalues"); field.setaccessible(true); map<string, object> membervalues = (map<string, object>) field.get(invocationhandler); for (map.entry<string,object> entry: membervalues.entryset()) { if(objects.nonnull(entry)){ membervalues.put(entry.getkey(),environment.resolveplaceholders(string.valueof(entry.getvalue()))); } } } }}
初次之外,在2.1.0版本的依赖包中已经修复了此bug,在不造成依赖冲突的前提下,建议使用2.1.0以上的版本包。
以上就是springboot整合rocketmq遇到的坑怎么解决的详细内容。
