应用场景消息系统: kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
存储系统: kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 kafka 的消息持久化功能和多副本机制,我们可以把 kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
流式处理平台: kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。
下面看下springboot整合kafka工具类的详细代码。
pom.xml
<dependency> <groupid>org.apache.commons</groupid> <artifactid>commons-lang3</artifactid> <version>3.12.0</version> </dependency> <dependency> <groupid>org.apache.kafka</groupid> <artifactid>kafka-clients</artifactid> <version>2.6.3</version> </dependency> <dependency> <groupid>fastjson</groupid> <artifactid>fastjson</artifactid> <version>1.2.83</version> </dependency>
工具类
package com.bbl.demo.utils;import org.apache.commons.lang3.exception.exceptionutils;import org.apache.kafka.clients.admin.*;import org.apache.kafka.clients.consumer.consumerconfig;import org.apache.kafka.clients.consumer.consumerrecord;import org.apache.kafka.clients.consumer.consumerrecords;import org.apache.kafka.clients.consumer.kafkaconsumer;import org.apache.kafka.clients.producer.kafkaproducer;import org.apache.kafka.clients.producer.producerrecord;import org.apache.kafka.common.kafkafuture;import org.apache.kafka.common.errors.topicexistsexception;import org.apache.kafka.common.errors.unknowntopicorpartitionexception;import com.alibaba.fastjson.jsonobject;import java.time.duration;import java.util.*;import java.util.concurrent.executionexception;public class kafkautils { private static adminclient admin; /** * 私有静态方法,创建kafka生产者 * @author o * @return kafkaproducer */ private static kafkaproducer<string, string> createproducer() { properties props = new properties(); //声明kafka的地址 props.put(consumerconfig.bootstrap_servers_config,"node01:9092,node02:9092,node03:9092"); //0、1 和 all:0表示只要把消息发送出去就返回成功;1表示只要leader收到消息就返回成功;all表示所有副本都写入数据成功才算成功 props.put("acks", "all"); //重试次数 props.put("retries", integer.max_value); //批处理的字节数 props.put("batch.size", 16384); //批处理的延迟时间,当批次数据未满之时等待的时间 props.put("linger.ms", 1); //用来约束kafkaproducer能够使用的内存缓冲的大小的,默认值32mb props.put("buffer.memory", 33554432); // properties.put("value.serializer", // "org.apache.kafka.common.serialization.bytearrayserializer"); // properties.put("key.serializer", // "org.apache.kafka.common.serialization.bytearrayserializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer"); props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer"); return new kafkaproducer<string, string>(props); } /** * 私有静态方法,创建kafka消费者 * @author o * @return kafkaconsumer */ private static kafkaconsumer<string, string> createconsumer() { properties props = new properties(); //声明kafka的地址 props.put(consumerconfig.bootstrap_servers_config,"node01:9092,node02:9092,node03:9092"); //每个消费者分配独立的消费者组编号 props.put("group.id", "111"); //如果value合法,则自动提交偏移量 props.put("enable.auto.commit", "true"); //设置多久一次更新被消费消息的偏移量 props.put("auto.commit.interval.ms", "1000"); //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息 props.put("session.timeout.ms", "30000"); //自动重置offset props.put("auto.offset.reset","earliest"); // properties.put("value.serializer", // "org.apache.kafka.common.serialization.bytearrayserializer"); // properties.put("key.serializer", // "org.apache.kafka.common.serialization.bytearrayserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); props.put("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); return new kafkaconsumer<string, string>(props); } /** * 私有静态方法,创建kafka集群管理员对象 * @author o */ public static void createadmin(string servers){ properties props = new properties(); props.put(adminclientconfig.bootstrap_servers_config,servers); admin = adminclient.create(props); } /** * 私有静态方法,创建kafka集群管理员对象 * @author o * @return adminclient */ private static void createadmin(){ createadmin("node01:9092,node02:9092,node03:9092"); } /** * 传入kafka约定的topic,json格式字符串,发送给kafka集群 * @author o * @param topic * @param jsonmessage */ public static void sendmessage(string topic, string jsonmessage) { kafkaproducer<string, string> producer = createproducer(); producer.send(new producerrecord<string, string>(topic, jsonmessage)); producer.close(); } /** * 传入kafka约定的topic消费数据,用于测试,数据最终会输出到控制台上 * @author o * @param topic */ public static void consume(string topic) { kafkaconsumer<string, string> consumer = createconsumer(); consumer.subscribe(arrays.aslist(topic)); while (true) { consumerrecords<string, string> records = consumer.poll(duration.ofseconds(100)); for (consumerrecord<string, string> record : records){ system.out.printf("offset = %d, key = %s, value = %s",record.offset(), record.key(), record.value()); system.out.println(); } } } /** * 传入kafka约定的topic数组,消费数据 * @author o * @param topics */ public static void consume(string ... topics) { kafkaconsumer<string, string> consumer = createconsumer(); consumer.subscribe(arrays.aslist(topics)); while (true) { consumerrecords<string, string> records = consumer.poll(duration.ofseconds(100)); for (consumerrecord<string, string> record : records){ system.out.printf("offset = %d, key = %s, value = %s",record.offset(), record.key(), record.value()); system.out.println(); } } } /** * 传入kafka约定的topic,json格式字符串数组,发送给kafka集群 * 用于批量发送消息,性能较高。 * @author o * @param topic * @param jsonmessages * @throws interruptedexception */ public static void sendmessage(string topic, string... jsonmessages) throws interruptedexception { kafkaproducer<string, string> producer = createproducer(); for (string jsonmessage : jsonmessages) { producer.send(new producerrecord<string, string>(topic, jsonmessage)); } producer.close(); } /** * 传入kafka约定的topic,map集合,内部转为json发送给kafka集群 <br> * 用于批量发送消息,性能较高。 * @author o * @param topic * @param mapmessagetojsonforarray */ public static void sendmessage(string topic, list<map<object, object>> mapmessagetojsonforarray) { kafkaproducer<string, string> producer = createproducer(); for (map<object, object> mapmessagetojson : mapmessagetojsonforarray) { string array = jsonobject.tojson(mapmessagetojson).tostring(); producer.send(new producerrecord<string, string>(topic, array)); } producer.close(); } /** * 传入kafka约定的topic,map,内部转为json发送给kafka集群 * @author o * @param topic * @param mapmessagetojson */ public static void sendmessage(string topic, map<object, object> mapmessagetojson) { kafkaproducer<string, string> producer = createproducer(); string array = jsonobject.tojson(mapmessagetojson).tostring(); producer.send(new producerrecord<string, string>(topic, array)); producer.close(); } /** * 创建主题 * @author o * @param name 主题的名称 * @param numpartitions 主题的分区数 * @param replicationfactor 主题的每个分区的副本因子 */ public static void createtopic(string name,int numpartitions,int replicationfactor){ if(admin == null) { createadmin(); } map<string, string> configs = new hashmap<>(); createtopicsresult result = admin.createtopics(arrays.aslist(new newtopic(name, numpartitions, (short) replicationfactor).configs(configs))); //以下内容用于判断创建主题的结果 for (map.entry<string, kafkafuture<void>> entry : result.values().entryset()) { try { entry.getvalue().get(); system.out.println("topic "+entry.getkey()+" created"); } catch (interruptedexception | executionexception e) { if (exceptionutils.getrootcause(e) instanceof topicexistsexception) { system.out.println("topic "+entry.getkey()+" existed"); } } } } /** * 删除主题 * @author o * @param names 主题的名称 */ public static void deletetopic(string name,string ... names){ if(admin == null) { createadmin(); } map<string, string> configs = new hashmap<>(); collection<string> topics = arrays.aslist(names); topics.add(name); deletetopicsresult result = admin.deletetopics(topics); //以下内容用于判断删除主题的结果 for (map.entry<string, kafkafuture<void>> entry : result.values().entryset()) { try { entry.getvalue().get(); system.out.println("topic "+entry.getkey()+" deleted"); } catch (interruptedexception | executionexception e) { if (exceptionutils.getrootcause(e) instanceof unknowntopicorpartitionexception) { system.out.println("topic "+entry.getkey()+" not exist"); } } } } /** * 查看主题详情 * @author o * @param names 主题的名称 */ public static void describetopic(string name,string ... names){ if(admin == null) { createadmin(); } map<string, string> configs = new hashmap<>(); collection<string> topics = arrays.aslist(names); topics.add(name); describetopicsresult result = admin.describetopics(topics); //以下内容用于显示主题详情的结果 for (map.entry<string, kafkafuture<topicdescription>> entry : result.values().entryset()) { try { entry.getvalue().get(); system.out.println("topic "+entry.getkey()+" describe"); system.out.println("\t name: "+entry.getvalue().get().name()); system.out.println("\t partitions: "); entry.getvalue().get().partitions().stream().foreach(p-> { system.out.println("\t\t index: "+p.partition()); system.out.println("\t\t\t leader: "+p.leader()); system.out.println("\t\t\t replicas: "+p.replicas()); system.out.println("\t\t\t isr: "+p.isr()); }); system.out.println("\t internal: "+entry.getvalue().get().isinternal()); } catch (interruptedexception | executionexception e) { if (exceptionutils.getrootcause(e) instanceof unknowntopicorpartitionexception) { system.out.println("topic "+entry.getkey()+" not exist"); } } } } /** * 查看主题列表 * @author o * @return set<string> topiclist */ public static set<string> listtopic(){ if(admin == null) { createadmin(); } listtopicsresult result = admin.listtopics(); try { result.names().get().stream().map(x->x+"\t").foreach(system.out::print); return result.names().get(); } catch (interruptedexception | executionexception e) { e.printstacktrace(); return null; } } public static void main(string[] args) { system.out.println(listtopic()); }}
以上就是springboot怎么整合kafka工具类的详细内容。
