您好,欢迎来到三六零分类信息网!老站,搜索引擎当天收录,欢迎发信息

SpringBoot怎么整合Pulsar

2024/3/22 20:40:10发布20次查看
一、添加pom.xml依赖<parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>2.7.0</version></parent><dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.apache.pulsar</groupid> <artifactid>pulsar-client</artifactid> <version>2.10.0</version> </dependency> <dependency> <groupid>org.projectlombok</groupid> <artifactid>lombok</artifactid> <version>1.18.24</version> <scope>provided</scope> </dependency></dependencies><build> <plugins> <plugin> <groupid>org.apache.maven.plugins</groupid> <artifactid>maven-compiler-plugin</artifactid> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins></build>
二、pulsar 参数类import lombok.data;import org.springframework.boot.context.properties.configurationproperties;import org.springframework.stereotype.component;import java.util.map;/** * @author: huangyibo * @date: 2022/5/28 2:32 * @description: pulsar 参数类 */@component@configurationproperties(prefix = "tdmq.pulsar")@datapublic class pulsarproperties { /** * 接入地址 */ private string serviceurl; /** * 命名空间tdc */ private string tdcnamespace; /** * 角色tdc的token */ private string tdctoken; /** * 集群name */ private string cluster; /** * topicmap */ private map<string, string> topicmap; /** * 订阅 */ private map<string, string> submap; /** * 开关 on:consumer可用 ||||| off:consumer断路 */ private string onoff;}
三、pulsar 配置类import org.apache.pulsar.client.api.authenticationfactory;import org.apache.pulsar.client.api.pulsarclient;import org.apache.pulsar.client.api.pulsarclientexception;import org.springframework.beans.factory.annotation.autowired;import org.springframework.boot.context.properties.enableconfigurationproperties;import org.springframework.context.annotation.bean;import org.springframework.context.annotation.configuration;/** * @author: huangyibo * @date: 2022/5/28 2:33 * @description: pulsar 配置类 */@configuration@enableconfigurationproperties(pulsarproperties.class)public class pulsarconfig { @autowired pulsarproperties pulsarproperties; @bean public pulsarclient getpulsarclient() { try { return pulsarclient.builder() .authentication(authenticationfactory.token(pulsarproperties.gettdctoken())) .serviceurl(pulsarproperties.getserviceurl()) .build(); } catch (pulsarclientexception e) { system.out.println(e); throw new runtimeexception("初始化pulsar client失败"); } }}
四、不同消费数据类型的监听器import com.yibo.pulsar.pojo.user;import org.apache.pulsar.client.api.consumer;import org.apache.pulsar.client.api.message;import org.apache.pulsar.client.api.messagelistener;import org.springframework.stereotype.component;/** * @author: huangyibo * @date: 2022/5/28 2:37 * @description: */@componentpublic class usermessagelistener implements messagelistener<user> { @override public void received(consumer<user> consumer, message<user> msg) { try { user user = msg.getvalue(); system.out.println(user); consumer.acknowledge(msg); } catch (exception e) { consumer.negativeacknowledge(msg); } }}import org.apache.pulsar.client.api.consumer;import org.apache.pulsar.client.api.message;import org.apache.pulsar.client.api.messagelistener;import org.springframework.stereotype.component;/** * @author: huangyibo * @date: 2022/5/28 2:37 * @description: */@componentpublic class stringmessagelistener implements messagelistener<string> { @override public void received(consumer<string> consumer, message<string> msg) { try { system.out.println(msg.getvalue()); consumer.acknowledge(msg); } catch (exception e) { consumer.negativeacknowledge(msg); } }}
五、pulsar的核心服务类import com.yibo.pulsar.common.listener.stringmessagelistener;import com.yibo.pulsar.common.listener.usermessagelistener;import com.yibo.pulsar.pojo.user;import org.apache.pulsar.client.api.*;import org.apache.pulsar.client.impl.schema.avroschema;import org.springframework.beans.factory.annotation.autowired;import org.springframework.context.annotation.bean;import org.springframework.stereotype.component;import java.util.concurrent.timeunit;/** * @author: huangyibo * @date: 2022/5/28 2:35 * @description: pulsar的核心服务类 */@componentpublic class pulsarcommon { @autowired private pulsarproperties pulsarproperties; @autowired private pulsarclient client; @autowired private usermessagelistener usermessagelistener; @autowired private stringmessagelistener stringmessagelistener; /** * 创建一个生产者 * @param topic topic name * @param schema schema方式 * @param <t> 泛型 * @return producer生产者 */ public <t> producer<t> createproducer(string topic, schema<t> schema) { try { return client.newproducer(schema) .topic(pulsarproperties.getcluster() + "/" + pulsarproperties.gettdcnamespace() + "/" + topic) .batchingmaxpublishdelay(10, timeunit.milliseconds) .sendtimeout(10, timeunit.seconds) .blockifqueuefull(true) .create(); } catch (pulsarclientexception e) { throw new runtimeexception("初始化pulsar producer失败"); } } /** * * @param topic topic name * @param subscription sub name * @param messagelistener messagelistener的自定义实现类 * @param schema schema消费方式 * @param <t> 泛型 * @return consumer消费者 */ public <t> consumer<t> createconsumer(string topic, string subscription, messagelistener<t> messagelistener, schema<t> schema) { try { return client.newconsumer(schema) .topic(pulsarproperties.getcluster() + "/" + pulsarproperties.gettdcnamespace() + "/" + topic) .subscriptionname(subscription) .acktimeout(10, timeunit.seconds) .subscriptiontype(subscriptiontype.shared) .messagelistener(messagelistener) .subscribe(); } catch (pulsarclientexception e) { throw new runtimeexception("初始化pulsar consumer失败"); } } /** * 异步发送一条消息 * @param message 消息体 * @param producer 生产者实例 * @param <t> 消息泛型 */ public <t> void sendasyncmessage(t message, producer<t> producer) { producer.sendasync(message).thenaccept(msgid -> { }); } /** * 同步发送一条消息 * @param message 消息体 * @param producer 生产者实例 * @param <t> 泛型 * @throws pulsarclientexception */ public <t> void sendsyncmessage(t message, producer<t> producer) throws pulsarclientexception { messageid send = producer.send(message); system.out.println(); system.out.println(); system.out.println(); system.out.println(); system.out.println(send); } //-----------consumer----------- @bean(name = "comment-publish-topic-consumer") public consumer<string> getcommentpublishtopicconsumer() { return this.createconsumer(pulsarproperties.gettopicmap().get("comment-publish-topic"), pulsarproperties.getsubmap().get("comment-publish-topic-test"), stringmessagelistener, schema.string); } @bean(name = "reply-publish-topic-consumer") public consumer<user> getreplypublishtopicconsumer() { return this.createconsumer(pulsarproperties.gettopicmap().get("reply-publish-topic"), pulsarproperties.getsubmap().get("reply-publish-topic-test"), usermessagelistener, avroschema.of(user.class)); } //-----------producer----------- @bean(name = "comment-publish-topic-producer") public producer<string> getcommentpublishtopicproducer() { return this.createproducer(pulsarproperties.gettopicmap().get("comment-publish-topic"),schema.string); } @bean(name = "reply-publish-topic-producer") public producer<user> getreplypublishtopicproducer() { return this.createproducer(pulsarproperties.gettopicmap().get("reply-publish-topic"), avroschema.of(user.class)); }}
六、pulsar整合spring cloud后来发现如上代码会导致bug-> 在更新nacos配置之后 consumer会挂掉
经排查发现结果是由于@refreshscope注解导致,此注解将摧毁bean,pulsarconsumer和producer都将被摧毁,只是说producer将在下⼀次调⽤中完成重启,consumer则不能重启,因为没有调⽤,那么怎么解决呢?
就是发布系列事件以刷新容器
import lombok.extern.slf4j.slf4j;import org.springframework.beans.factory.annotation.autowired;import org.springframework.context.applicationcontext;import org.springframework.context.applicationevent;import org.springframework.context.applicationlistener;import org.springframework.stereotype.component;/** * @author: huangyibo * @date: 2022/5/28 2:34 * @description: */@component@slf4jpublic class refreshpulsarlistener implements applicationlistener { @autowired applicationcontext applicationcontext; @override public void onapplicationevent(applicationevent event) { if (event.getsource().equals("__refreshall__")) { log.info("nacos配置中心配置修改 重启pulsar===================================="); log.info("重启pulsarclient,{}", applicationcontext.getbean("getpulsarclient")); log.info("重启pulsarconsumer,{}", applicationcontext.getbean("comment-publish-topic-consumer")); log.info("重启pulsarconsumer,{}", applicationcontext.getbean("reply-publish-topic-consumer")); } }}
以上就是springboot怎么整合pulsar的详细内容。
该用户其它信息

VIP推荐

免费发布信息,免费发布B2B信息网站平台 - 三六零分类信息网 沪ICP备09012988号-2
企业名录 Product