Spring boot集成Kafka之spring-kafka深入探秘
前言
kafka是一個(gè)消息隊(duì)列產(chǎn)品,基于Topic partitions的設(shè)計(jì),能達(dá)到非常高的消息發(fā)送處理性能。Spring創(chuàng)建了一個(gè)項(xiàng)目Spring-kafka,封裝了Apache 的Kafka-client,用于在Spring項(xiàng)目里快速集成kafka。除了簡單的收發(fā)消息外,Spring-kafka還提供了很多高級(jí)功能,下面我們就來一一探秘這些用法。
項(xiàng)目地址:https://github.com/spring-projects/spring-kafka
簡單集成
引入依賴
- <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.6.RELEASE</version></dependency>
添加配置
- spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
測(cè)試發(fā)送和接收
- /**
- * @author: kl @kailing.pub
- * @date: 2019/5/30
- */
- @SpringBootApplication
- @RestController
- public class Application {
- private final Logger logger = LoggerFactory.getLogger(Application.class);
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
- @Autowired
- private KafkaTemplate<Object, Object> template;
- @GetMapping("/send/{input}")
- public void sendFoo(@PathVariable String input) {
- this.template.send("topic_input", input);
- }
- @KafkaListener(id = "webGroup", topics = "topic_input")
- public void listen(String input) {
- logger.info("input value: {}" , input);
- }
- }
啟動(dòng)應(yīng)用后,在瀏覽器中輸入:http://localhost:8080/send/kl。就可以在控制臺(tái)看到有日志輸出了:input value: "kl"。基礎(chǔ)的使用就這么簡單。發(fā)送消息時(shí)注入一個(gè)KafkaTemplate,接收消息時(shí)添加一個(gè)@KafkaListener注解即可。
Spring-kafka-test嵌入式Kafka Server
不過上面的代碼能夠啟動(dòng)成功,前提是你已經(jīng)有了Kafka Server的服務(wù)環(huán)境,我們知道Kafka是由Scala + Zookeeper構(gòu)建的,可以從官網(wǎng)下載部署包在本地部署。但是,我想告訴你,為了簡化開發(fā)環(huán)節(jié)驗(yàn)證Kafka相關(guān)功能,Spring-Kafka-Test已經(jīng)封裝了Kafka-test提供了注解式的一鍵開啟Kafka Server的功能,使用起來也是超級(jí)簡單。本文后面的所有測(cè)試用例的Kafka都是使用這種嵌入式服務(wù)提供的。
引入依賴
- <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><version>2.2.6.RELEASE</version><scope>test</scope></dependency>
啟動(dòng)服務(wù)
下面使用Junit測(cè)試用例,直接啟動(dòng)一個(gè)Kafka Server服務(wù),包含四個(gè)Broker節(jié)點(diǎn)。
- @RunWith(SpringRunner.class)@SpringBootTest(classes = ApplicationTests.class)@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})public class ApplicationTests {@Testpublic void contextLoads()throws IOException { System.in.read(); }}
如上:只需要一個(gè)注解@EmbeddedKafka即可,就可以啟動(dòng)一個(gè)功能完整的Kafka服務(wù),是不是很酷。默認(rèn)只寫注解不加參數(shù)的情況下,是創(chuàng)建一個(gè)隨機(jī)端口的Broker,在啟動(dòng)的日志中會(huì)輸出具體的端口以及默認(rèn)的一些配置項(xiàng)。不過這些我們?cè)贙afka安裝包配置文件中的配置項(xiàng),在注解參數(shù)中都可以配置,下面詳解下@EmbeddedKafka注解中的可設(shè)置參數(shù) :
- value:broker節(jié)點(diǎn)數(shù)量
- count:同value作用一樣,也是配置的broker的節(jié)點(diǎn)數(shù)量
- controlledShutdown:控制關(guān)閉開關(guān),主要用來在Broker意外關(guān)閉時(shí)減少此Broker上Partition的不可用時(shí)間
Kafka是多Broker架構(gòu)的高可用服務(wù),一個(gè)Topic對(duì)應(yīng)多個(gè)partition,一個(gè)Partition可以有多個(gè)副本Replication,這些Replication副本保存在多個(gè)Broker,用于高可用。但是,雖然存在多個(gè)分區(qū)副本集,當(dāng)前工作副本集卻只有一個(gè),默認(rèn)就是首次分配的副本集【首選副本】為Leader,負(fù)責(zé)寫入和讀取數(shù)據(jù)。當(dāng)我們升級(jí)Broker或者更新Broker配置時(shí)需要重啟服務(wù),這個(gè)時(shí)候需要將partition轉(zhuǎn)移到可用的Broker。下面涉及到三種情況
- 直接關(guān)閉Broker:當(dāng)Broker關(guān)閉時(shí),Broker集群會(huì)重新進(jìn)行選主操作,選出一個(gè)新的Broker來作為Partition Leader,選舉時(shí)此Broker上的Partition會(huì)短時(shí)不可用
- 開啟controlledShutdown:當(dāng)Broker關(guān)閉時(shí),Broker本身會(huì)先嘗試將Leader角色轉(zhuǎn)移到其他可用的Broker上
- 使用命令行工具:使用bin/kafka-preferred-replica-election.sh,手動(dòng)觸發(fā)PartitionLeader角色轉(zhuǎn)移
- ports:端口列表,是一個(gè)數(shù)組。對(duì)應(yīng)了count參數(shù),有幾個(gè)Broker,就要對(duì)應(yīng)幾個(gè)端口號(hào)
- brokerProperties:Broker參數(shù)設(shè)置,是一個(gè)數(shù)組結(jié)構(gòu),支持如下方式進(jìn)行Broker參數(shù)設(shè)置:
- @EmbeddedKafka(brokerProperties = {"log.index.interval.bytes = 4096","num.io.threads = 8"})
- kerPropertiesLocation:Broker參數(shù)文件設(shè)置
功能同上面的brokerProperties,只是Kafka Broker的可設(shè)置參數(shù)達(dá)182個(gè)之多,都像上面這樣配置肯定不是最優(yōu)方案,所以提供了加載本地配置文件的功能,如:
- @EmbeddedKafka(brokerPropertiesLocation = "classpath:application.properties")
默認(rèn)情況下,如果在使用KafkaTemplate發(fā)送消息時(shí),Topic不存在,會(huì)創(chuàng)建一個(gè)新的Topic,默認(rèn)的分區(qū)數(shù)和副本數(shù)為如下Broker參數(shù)來設(shè)定
創(chuàng)建新的Topic
- num.partitions = 1 #默認(rèn)Topic分區(qū)數(shù)
- num.replica.fetchers = 1 #默認(rèn)副本數(shù)
程序啟動(dòng)時(shí)創(chuàng)建Topic
- /**
- * @author: kl @kailing.pub
- * @date: 2019/5/31
- */
- @Configuration
- public class KafkaConfig {
- @Bean
- public KafkaAdmin admin(KafkaProperties properties){
- KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());
- admin.setFatalIfBrokerNotAvailable(true);
- return admin;
- }
- @Bean
- public NewTopic topic2() {
- return new NewTopic("topic-kl", 1, (short) 1);
- }
- }
如果Kafka Broker支持(1.0.0或更高版本),則如果發(fā)現(xiàn)現(xiàn)有Topic的Partition 數(shù)少于設(shè)置的Partition 數(shù),則會(huì)新增新的Partition分區(qū)。關(guān)于KafkaAdmin有幾個(gè)常用的用法如下:
setFatalIfBrokerNotAvailable(true):默認(rèn)這個(gè)值是False的,在Broker不可用時(shí),不影響Spring 上下文的初始化。如果你覺得Broker不可用影響正常業(yè)務(wù)需要顯示的將這個(gè)值設(shè)置為True
setAutoCreate(false) : 默認(rèn)值為True,也就是Kafka實(shí)例化后會(huì)自動(dòng)創(chuàng)建已經(jīng)實(shí)例化的NewTopic對(duì)象
initialize():當(dāng)setAutoCreate為false時(shí),需要我們程序顯示的調(diào)用admin的initialize()方法來初始化NewTopic對(duì)象
代碼邏輯中創(chuàng)建
有時(shí)候我們?cè)诔绦騿?dòng)時(shí)并不知道某個(gè)Topic需要多少Partition數(shù)合適,但是又不能一股腦的直接使用Broker的默認(rèn)設(shè)置,這個(gè)時(shí)候就需要使用Kafka-Client自帶的AdminClient來進(jìn)行處理。上面的Spring封裝的KafkaAdmin也是使用的AdminClient來處理的。如:
- @Autowired
- private KafkaProperties properties;
- @Test
- public void testCreateToipc(){
- AdminClient client = AdminClient.create(properties.buildAdminProperties());
- if(client !=null){
- try {
- Collection<NewTopic> newnewTopics = new ArrayList<>(1);
- newTopics.add(new NewTopic("topic-kl",1,(short) 1));
- client.createTopics(newTopics);
- }catch (Throwable e){
- e.printStackTrace();
- }finally {
- client.close();
- }
- }
- }
ps:其他的方式創(chuàng)建Topic
上面的這些創(chuàng)建Topic方式前提是你的spring boot版本到2.x以上了,因?yàn)閟pring-kafka2.x版本只支持spring boot2.x的版本。在1.x的版本中還沒有這些api。下面補(bǔ)充一種在程序中通過Kafka_2.10創(chuàng)建Topic的方式
引入依賴
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.2.2</version>
- </dependency>
api方式創(chuàng)建
- @Test
- public void testCreateTopic()throws Exception{
- ZkClient zkClient =new ZkClient("127.0.0.1:2181", 3000, 3000, ZKStringSerializer$.MODULE$)
- String topicName = "topic-kl";
- int partitions = 1;
- int replication = 1;
- AdminUtils.createTopic(zkClient,topicName,partitions,replication,new Properties());
- }
注意下ZkClient最后一個(gè)構(gòu)造入?yún)?,是一個(gè)序列化反序列化的接口實(shí)現(xiàn),博主測(cè)試如果不填的話,創(chuàng)建的Topic在ZK上的數(shù)據(jù)是有問題的,默認(rèn)的Kafka實(shí)現(xiàn)也很簡單,就是做了字符串UTF-8編碼處理。ZKStringSerializer$是Kafka中已經(jīng)實(shí)現(xiàn)好的一個(gè)接口實(shí)例,是一個(gè)Scala的伴生對(duì)象,在Java中直接調(diào)用點(diǎn)MODULE$就可以得到一個(gè)實(shí)例
命令方式創(chuàng)建
- @Test
- public void testCreateTopic(){
- String [] options= new String[]{
- "--create",
- "--zookeeper","127.0.0.1:2181",
- "--replication-factor", "3",
- "--partitions", "3",
- "--topic", "topic-kl"
- };
- TopicCommand.main(options);
- }
消息發(fā)送之KafkaTemplate探秘
獲取發(fā)送結(jié)果
異步獲取
- template.send("","").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
- @Override
- public void onFailure(Throwable throwable) {
- ......
- }
- @Override
- public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
- ....
- }
- });
同步獲取
- ListenableFuture<SendResult<Object,Object>> future = template.send("topic-kl","kl");
- try {
- SendResult<Object,Object> result = future.get();
- }catch (Throwable e){
- e.printStackTrace();
- }
kafka事務(wù)消息
默認(rèn)情況下,Spring-kafka自動(dòng)生成的KafkaTemplate實(shí)例,是不具有事務(wù)消息發(fā)送能力的。需要使用如下配置激活事務(wù)特性。事務(wù)激活后,所有的消息發(fā)送只能在發(fā)生事務(wù)的方法內(nèi)執(zhí)行了,不然就會(huì)拋一個(gè)沒有事務(wù)交易的異常
- spring.kafka.producer.transaction-id-prefix=kafka_tx.
當(dāng)發(fā)送消息有事務(wù)要求時(shí),比如,當(dāng)所有消息發(fā)送成功才算成功,如下面的例子:假設(shè)第一條消費(fèi)發(fā)送后,在發(fā)第二條消息前出現(xiàn)了異常,那么第一條已經(jīng)發(fā)送的消息也會(huì)回滾。而且正常情況下,假設(shè)在消息一發(fā)送后休眠一段時(shí)間,在發(fā)送第二條消息,消費(fèi)端也只有在事務(wù)方法執(zhí)行完成后才會(huì)接收到消息
- @GetMapping("/send/{input}")
- public void sendFoo(@PathVariable String input) {
- template.executeInTransaction(t ->{
- t.send("topic_input","kl");
- if("error".equals(input)){
- throw new RuntimeException("failed");
- }
- t.send("topic_input","ckl");
- return true;
- });
- }
當(dāng)事務(wù)特性激活時(shí),同樣,在方法上面加@Transactional注解也會(huì)生效
- @GetMapping("/send/{input}")
- @Transactional(rollbackFor = RuntimeException.class)
- public void sendFoo(@PathVariable String input) {
- template.send("topic_input", "kl");
- if ("error".equals(input)) {
- throw new RuntimeException("failed");
- }
- template.send("topic_input", "ckl");
- }
Spring-Kafka的事務(wù)消息是基于Kafka提供的事務(wù)消息功能的。而Kafka Broker默認(rèn)的配置針對(duì)的三個(gè)或以上Broker高可用服務(wù)而設(shè)置的。這邊在測(cè)試的時(shí)候?yàn)榱撕唵畏奖?,使用了嵌入式服?wù)新建了一個(gè)單Broker的Kafka服務(wù),出現(xiàn)了一些問題:如
1、事務(wù)日志副本集大于Broker數(shù)量,會(huì)拋如下異常:
- Number of alive brokers '1' does not meet the required replication factor '3'
- for the transactions state topic (configured via 'transaction.state.log.replication.factor').
- This error can be ignored if the cluster is starting up and not all brokers are up yet.
默認(rèn)Broker的配置transaction.state.log.replication.factor=3,單節(jié)點(diǎn)只能調(diào)整為1
2、副本數(shù)小于副本同步隊(duì)列數(shù)目,會(huì)拋如下異常
- Number of insync replicas for partition __transaction_state-13 is [1], below required minimum [2]
默認(rèn)Broker的配置transaction.state.log.min.isr=2,單節(jié)點(diǎn)只能調(diào)整為1
ReplyingKafkaTemplate獲得消息回復(fù)
ReplyingKafkaTemplate是KafkaTemplate的一個(gè)子類,除了繼承父類的方法,新增了一個(gè)方法sendAndReceive,實(shí)現(xiàn)了消息發(fā)送\回復(fù)語義
- RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
也就是我發(fā)送一條消息,能夠拿到消費(fèi)者給我返回的結(jié)果。就像傳統(tǒng)的RPC交互那樣。當(dāng)消息的發(fā)送者需要知道消息消費(fèi)者的具體的消費(fèi)情況,非常適合這個(gè)api。如,一條消息中發(fā)送一批數(shù)據(jù),需要知道消費(fèi)者成功處理了哪些數(shù)據(jù)。下面代碼演示了怎么集成以及使用ReplyingKafkaTemplate
- /**
- * @author: kl @kailing.pub
- * @date: 2019/5/30
- */
- @SpringBootApplication
- @RestController
- public class Application {
- private final Logger logger = LoggerFactory.getLogger(Application.class);
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
- @Bean
- public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
- ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("replies");
- repliesContainer.getContainerProperties().setGroupId("repliesGroup");
- repliesContainer.setAutoStartup(false);
- return repliesContainer;
- }
- @Bean
- public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> pf, ConcurrentMessageListenerContainer<String, String> repliesContainer) {
- return new ReplyingKafkaTemplate(pf, repliesContainer);
- }
- @Bean
- public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> pf) {
- return new KafkaTemplate(pf);
- }
- @Autowired
- private ReplyingKafkaTemplate template;
- @GetMapping("/send/{input}")
- @Transactional(rollbackFor = RuntimeException.class)
- public void sendFoo(@PathVariable String input) throws Exception {
- ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);
- RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
- ConsumerRecord<String, String> consumerRecord = replyFuture.get();
- System.err.println("Return value: " + consumerRecord.value());
- }
- @KafkaListener(id = "webGroup", topics = "topic-kl")
- @SendTo
- public String listen(String input) {
- logger.info("input value: {}", input);
- return "successful";
- }
- }
Spring-kafka消息消費(fèi)用法探秘
@KafkaListener的使用
前面在簡單集成中已經(jīng)演示過了@KafkaListener接收消息的能力,但是@KafkaListener的功能不止如此,其他的比較常見的,使用場(chǎng)景比較多的功能點(diǎn)如下:
- 顯示的指定消費(fèi)哪些Topic和分區(qū)的消息,
- 設(shè)置每個(gè)Topic以及分區(qū)初始化的偏移量,
- 設(shè)置消費(fèi)線程并發(fā)度
- 設(shè)置消息異常處理器
- @KafkaListener(id = "webGroup", topicPartitions = {
- @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
- @TopicPartition(topic = "topic2", partitions = "0",
- partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
- },concurrency = "6",errorHandler = "myErrorHandler")
- public String listen(String input) {
- logger.info("input value: {}", input);
- return "successful";
- }
其他的注解參數(shù)都很好理解,errorHandler需要說明下,設(shè)置這個(gè)參數(shù)需要實(shí)現(xiàn)一個(gè)接口KafkaListenerErrorHandler。而且注解里的配置,是你自定義實(shí)現(xiàn)實(shí)例在spring上下文中的Name。比如,上面配置為errorHandler = "myErrorHandler"。則在spring上線中應(yīng)該存在這樣一個(gè)實(shí)例:
- /**
- * @author: kl @kailing.pub
- * @date: 2019/5/31
- */
- @Service("myErrorHandler")
- public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
- Logger logger =LoggerFactory.getLogger(getClass());
- @Override
- public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
- logger.info(message.getPayload().toString());
- return null;
- }
- @Override
- public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
- logger.info(message.getPayload().toString());
- return null;
- }
- }
手動(dòng)Ack模式
手動(dòng)ACK模式,由業(yè)務(wù)邏輯控制提交偏移量。比如程序在消費(fèi)時(shí),有這種語義,特別異常情況下不確認(rèn)ack,也就是不提交偏移量,那么你只能使用手動(dòng)Ack模式來做了。開啟手動(dòng)首先需要關(guān)閉自動(dòng)提交,然后設(shè)置下consumer的消費(fèi)模式
- spring.kafka.consumer.enable-auto-commit=false
- spring.kafka.listener.ack-mode=manual
上面的設(shè)置好后,在消費(fèi)時(shí),只需要在@KafkaListener監(jiān)聽方法的入?yún)⒓尤階cknowledgment 即可,執(zhí)行到ack.acknowledge()代表提交了偏移量
- @KafkaListener(id = "webGroup", topics = "topic-kl")
- public String listen(String input, Acknowledgment ack) {
- logger.info("input value: {}", input);
- if ("kl".equals(input)) {
- ack.acknowledge();
- }
- return "successful";
- }
@KafkaListener注解監(jiān)聽器生命周期
@KafkaListener注解的監(jiān)聽器的生命周期是可以控制的,默認(rèn)情況下,@KafkaListener的參數(shù)autoStartup = "true"。也就是自動(dòng)啟動(dòng)消費(fèi),但是也可以同過KafkaListenerEndpointRegistry來干預(yù)他的生命周期。KafkaListenerEndpointRegistry有三個(gè)動(dòng)作方法分別如:start(),pause(),resume()/啟動(dòng),停止,繼續(xù)。如下代碼詳細(xì)演示了這種功能。
- /**
- * @author: kl @kailing.pub
- * @date: 2019/5/30
- */
- @SpringBootApplication
- @RestController
- public class Application {
- private final Logger logger = LoggerFactory.getLogger(Application.class);
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
- @Autowired
- private KafkaTemplate template;
- @GetMapping("/send/{input}")
- @Transactional(rollbackFor = RuntimeException.class)
- public void sendFoo(@PathVariable String input) throws Exception {
- ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);
- template.send(record);
- }
- @Autowired
- private KafkaListenerEndpointRegistry registry;
- @GetMapping("/stop/{listenerID}")
- public void stop(@PathVariable String listenerID){
- registry.getListenerContainer(listenerID).pause();
- }
- @GetMapping("/resume/{listenerID}")
- public void resume(@PathVariable String listenerID){
- registry.getListenerContainer(listenerID).resume();
- }
- @GetMapping("/start/{listenerID}")
- public void start(@PathVariable String listenerID){
- registry.getListenerContainer(listenerID).start();
- }
- @KafkaListener(id = "webGroup", topics = "topic-kl",autoStartup = "false")
- public String listen(String input) {
- logger.info("input value: {}", input);
- return "successful";
- }
- }
在上面的代碼中,listenerID就是@KafkaListener中的id值“webGroup”。項(xiàng)目啟動(dòng)好后,分別執(zhí)行如下url,就可以看到效果了。
先發(fā)送一條消息:http://localhost:8081/send/ckl。因?yàn)閍utoStartup = "false",所以并不會(huì)看到有消息進(jìn)入監(jiān)聽器。
接著啟動(dòng)監(jiān)聽器:http://localhost:8081/start/webGroup??梢钥吹接幸粭l消息進(jìn)來了。
暫停和繼續(xù)消費(fèi)的效果使用類似方法就可以測(cè)試出來了。
SendTo消息轉(zhuǎn)發(fā)
前面的消息發(fā)送響應(yīng)應(yīng)用里面已經(jīng)見過@SendTo,其實(shí)除了做發(fā)送響應(yīng)語義外,@SendTo注解還可以帶一個(gè)參數(shù),指定轉(zhuǎn)發(fā)的Topic隊(duì)列。常見的場(chǎng)景如,一個(gè)消息需要做多重加工,不同的加工耗費(fèi)的cup等資源不一致,那么就可以通過跨不同Topic和部署在不同主機(jī)上的consumer來解決了。如:
- @KafkaListener(id = "webGroup", topics = "topic-kl")
- @SendTo("topic-ckl")
- public String listen(String input) {
- logger.info("input value: {}", input);
- return input + "hello!";
- }
- @KafkaListener(id = "webGroup1", topics = "topic-ckl")
- public void listen2(String input) {
- logger.info("input value: {}", input);
- }
消息重試和死信隊(duì)列的應(yīng)用
除了上面談到的通過手動(dòng)Ack模式來控制消息偏移量外,其實(shí)Spring-kafka內(nèi)部還封裝了可重試消費(fèi)消息的語義,也就是可以設(shè)置為當(dāng)消費(fèi)數(shù)據(jù)出現(xiàn)異常時(shí),重試這個(gè)消息。而且可以設(shè)置重試達(dá)到多少次后,讓消息進(jìn)入預(yù)定好的Topic。也就是死信隊(duì)列里。下面代碼演示了這種效果:
- @Autowired
- private KafkaTemplate template;
- @Bean
- public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
- ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
- ConsumerFactory<Object, Object> kafkaConsumerFactory,
- KafkaTemplate<Object, Object> template) {
- ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
- configurer.configure(factory, kafkaConsumerFactory);
- //最大重試三次
- factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));
- return factory;
- }
- @GetMapping("/send/{input}")
- public void sendFoo(@PathVariable String input) {
- template.send("topic-kl", input);
- }
- @KafkaListener(id = "webGroup", topics = "topic-kl")
- public String listen(String input) {
- logger.info("input value: {}", input);
- throw new RuntimeException("dlt");
- }
- @KafkaListener(id = "dltGroup", topics = "topic-kl.DLT")
- public void dltListen(String input) {
- logger.info("Received from DLT: " + input);
- }
上面應(yīng)用,在topic-kl監(jiān)聽到消息會(huì),會(huì)觸發(fā)運(yùn)行時(shí)異常,然后監(jiān)聽器會(huì)嘗試三次調(diào)用,當(dāng)?shù)竭_(dá)最大的重試次數(shù)后。消息就會(huì)被丟掉重試死信隊(duì)列里面去。死信隊(duì)列的Topic的規(guī)則是,業(yè)務(wù)Topic名字+“.DLT”。如上面業(yè)務(wù)Topic的name為“topic-kl”,那么對(duì)應(yīng)的死信隊(duì)列的Topic就是“topic-kl.DLT”
文末結(jié)語
最近業(yè)務(wù)上使用了kafka用到了Spring-kafka,所以系統(tǒng)性的探索了下Spring-kafka的各種用法,發(fā)現(xiàn)了很多好玩很酷的特性,比如,一個(gè)注解開啟嵌入式的Kafka服務(wù)、像RPC調(diào)用一樣的發(fā)送\響應(yīng)語義調(diào)用、事務(wù)消息等功能。希望此博文能夠幫助那些正在使用Spring-kafka或即將使用的人少走一些彎路少踩一點(diǎn)坑。