自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

簡單易用的消息隊列框架的設(shè)計與實現(xiàn)

開發(fā) 開發(fā)工具
消息隊列在互聯(lián)網(wǎng)領(lǐng)域里得到了廣泛的應(yīng)用,它多應(yīng)用在異步處理、模塊之間的解偶和高并發(fā)的消峰等場景,消息隊列中表現(xiàn)最好的當(dāng)屬Apache開源項目Kafka,Kafka使用支持高并發(fā)的Scala語言開發(fā),利用操作系統(tǒng)的緩存原理達到高性能,并且天生具有可分區(qū),分布式的特點,而且有不同語言的客戶端,使用起來非常的方便。

[[189769]]

1 背景介紹

消息隊列在互聯(lián)網(wǎng)領(lǐng)域里得到了廣泛的應(yīng)用,它多應(yīng)用在異步處理、模塊之間的解偶和高并發(fā)的消峰等場景,消息隊列中表現(xiàn)最好的當(dāng)屬Apache開源項目Kafka,Kafka使用支持高并發(fā)的Scala語言開發(fā),利用操作系統(tǒng)的緩存原理達到高性能,并且天生具有可分區(qū),分布式的特點,而且有不同語言的客戶端,使用起來非常的方便。

Kclient是Kafka生產(chǎn)者客戶端和消費者客戶端的一個簡單易用的框架,它具有高效集成、高性能、高穩(wěn)定的高級特點。

在繼續(xù)閱讀kclient的功能特性、架構(gòu)設(shè)計和使用方法之前,讀者需要對Kafka進行基本的學(xué)習(xí)和了解。如果讀者是Kafka的初學(xué)者,而且英文還不錯,也可以直接參考Kafka官方在線文檔:Kafka 0.8.2 Documentation,如果對英文不感性趣,可以在網(wǎng)上搜索Kakfa的中文資料進行學(xué)習(xí),內(nèi)容需要涵蓋Kafka的使用向?qū)б约袄貌僮飨到y(tǒng)緩存的“空中接力”、持久化、分片機制、高可用等原理。

本文包含了背景介紹、功能特性、架構(gòu)設(shè)計、使用指南、API簡介、后臺監(jiān)控和管理、消息處理機模板項目、以及性能壓測相關(guān)章節(jié)。如果你想使用kclient快速的構(gòu)建Kafka處理機服務(wù),請參考消息處理機模板項目章節(jié); 如果你想了解kclient的其他使用方式、功能特性、監(jiān)控和管理等,請參考功能特性、使用指南、API簡介、后臺監(jiān)控和管理等章節(jié); 如果你想更深入的理解kclient的架構(gòu)設(shè)計和性能指標(biāo),請參考架構(gòu)設(shè)計和性能壓測章節(jié)。

2 功能特性

2.1 簡單易用

簡化了Kafka客戶端API的使用方法, 特別是對消費端開發(fā),消費端開發(fā)者只需要實現(xiàn)MessageHandler接口或者相關(guān)子類,在實現(xiàn)中處理消息完成業(yè)務(wù)邏輯,并且在主線程中啟動封裝的消費端服務(wù)器即可。它提供了各種常用的MessageHandler,框架自動轉(zhuǎn)換消息到領(lǐng)域?qū)ο竽P突蛘逬SON對象等數(shù)據(jù)結(jié)構(gòu),讓開發(fā)者更專注于業(yè)務(wù)處理。如果使用服務(wù)源碼注解的方式聲明消息處理機的后臺,可以將一個通用的服務(wù)方法直接轉(zhuǎn)變成具有完善功能的處理Kafka消息隊列的處理機,使用起來極其簡單,代碼看起來一目了然,在框架級別通過多種線程池技術(shù)保證了處理機的高性能。

在使用方面,它提供了多種使用方式:

  1. 直接使用Java API;
  2. 與Spring環(huán)境無縫集成;
  3. 服務(wù)源碼注解,通過注解聲明方式啟動Kafka消息隊列的處理機。

除此之外,它基于注解提供了消息處理機的模板項目,可以根據(jù)模板項目通過配置快速開發(fā)Kafka的消息處理機。

2.2 高性能

為了在不同的業(yè)務(wù)場景下實現(xiàn)高性能, 它提供不同的線程模型:

適合輕量級服務(wù)的同步線程模型;

適合IO密集型服務(wù)的異步線程模型(細(xì)分為所有消費者流共享線程池和每個流獨享線程池)。

另外,異步模型中的線程池也支持確定數(shù)量線程的線程池和線程數(shù)量可伸縮的線程池。

2.3 高穩(wěn)定性

框架級別處理了常見的異常,計入錯誤日志,可用于錯誤手工恢復(fù)或者洗數(shù)據(jù),并實現(xiàn)了優(yōu)雅關(guān)機和重啟等功能。

3 架構(gòu)設(shè)計

3.1 線程模型

1. 同步線程模型

在這種線程模型中,客戶端為每一個消費者流使用一個線程,每個線程負(fù)責(zé)從Kafka隊列里消費消息,并且在同一個線程里進行業(yè)務(wù)處理。我們把這些線程稱為消費線程,把這些線程所在的線程池叫做消息消費線程池。這種模型之所以在消息消費線程池處理業(yè)務(wù),是因為它多用于處理輕量級別的業(yè)務(wù),例如:緩存查詢、本地計算等。

2. 異步線程模型

在這種線程模型中,客戶端為每一個消費者流使用一個線程,每個線程負(fù)責(zé)從Kafka隊列里消費消息,并且傳遞消費得到的消息到后端的異步線程池,在異步線程池中處理業(yè)務(wù)。我們?nèi)匀话亚懊尕?fù)責(zé)消費消息的線程池稱為消息消費線程池,把后面的異步線程池稱為異步業(yè)務(wù)線程池。這種線程模型適合重量級的業(yè)務(wù),例如:業(yè)務(wù)中有大量的IO操作、網(wǎng)絡(luò)IO操作、復(fù)雜計算、對外部系統(tǒng)的調(diào)用等。

后端的異步業(yè)務(wù)線程池又細(xì)分為所有消費者流共享線程池和每個流獨享線程池。

1)所有消費者流共享線程池

所有消費者流共享線程池對比每個流獨享線程池,創(chuàng)建更少的線程池對象,能節(jié)省些許的內(nèi)存,但是,由于多個流共享同一個線程池,在數(shù)據(jù)量較大的時候,流之間的處理可能互相影響。例如,一個業(yè)務(wù)使用2個區(qū)和兩個流,他們一一對應(yīng),通過生產(chǎn)者指定定制化的散列函數(shù)替換默認(rèn)的key-hash, 實現(xiàn)一個流(區(qū))用來處理普通用戶,另外一個流(區(qū))用來處理VIP用戶,如果兩個流共享一個線程池,當(dāng)普通用戶的消息大量產(chǎn)生的時候,VIP用戶只有少量,并且排在了隊列的后頭,就會產(chǎn)生餓死的情況。這個場景是可以使用多個topic來解決,一個普通用戶的topic,一個VIP用戶的topic,但是這樣又要多維護一個topic,客戶端發(fā)送的時候需要顯式的進行判斷topic目標(biāo),也沒有多少好處。

2)每個流獨享線程池

每個流獨享線程池使用不同的異步業(yè)務(wù)線程池來處理不同的流里面的消息,互相隔離,互相獨立,不互相影響,對于不同的流(區(qū))的優(yōu)先級不同的情況,或者消息在不同流(區(qū))不均衡的情況下表現(xiàn)會更好,當(dāng)然,創(chuàng)建多個線程池會多使用些許內(nèi)存,但是這并不是一個大問題。

另外,異步業(yè)務(wù)線程池支持確定數(shù)量線程的線程池和線程數(shù)量可伸縮的線程池。

  1. 核心業(yè)務(wù)硬件資源有保證,核心服務(wù)有專享的資源池,或者線上流量可預(yù)測,請使用固定數(shù)量的線程池。
  2. 非核心業(yè)務(wù)一般混布,資源互相調(diào)配,線上流量不固定等情況請使用線程數(shù)量可伸縮的線程池。

3.2 異常處理

對于消息處理過程中產(chǎn)生的業(yè)務(wù)異常,當(dāng)前在業(yè)務(wù)處理的上層捕捉了Throwable, 在專用的錯誤恢復(fù)日志中記錄出錯的消息,后續(xù)可根據(jù)錯誤恢復(fù)日志人工處理錯誤消息,也可重做或者洗數(shù)據(jù)。TODO:考慮實現(xiàn)異常Listener體系結(jié)構(gòu), 對異常處理實現(xiàn)監(jiān)聽者模式,異常處理器可插拔等,默認(rèn)打印錯誤日志。

由于默認(rèn)的異常處理中,捕捉異常,在專用的錯誤回復(fù)日志中記錄錯誤,并且繼續(xù)處理下一個消息??紤]到可能上線失敗,或者上游消息格式出錯,導(dǎo)致所有消息處理都出錯,堆滿錯誤恢復(fù)日志的情況,我們需要訴諸于報警和監(jiān)控系統(tǒng)來解決。

3.3 優(yōu)雅關(guān)機

由于消費者本身是一個事件驅(qū)動的服務(wù)器,類似Tomcat,Tomcat接收HTTP請求返回HTTP響應(yīng),Consumer則接收Kafka消息,然后處理業(yè)務(wù)后返回,也可以將處理結(jié)果發(fā)送到下一個消息隊列。所以消費者本身是非常復(fù)雜的,除了線程模型,異常處理,性能,穩(wěn)定性,可用性等都是需要思考點。既然消費者是一個后臺的服務(wù)器,我們需要考慮如何優(yōu)雅的關(guān)機,也就是在消費者服務(wù)器在處理消息的時候,如果關(guān)機才能不導(dǎo)致處理的消息中斷而丟失。

優(yōu)雅關(guān)機的重點在于解決如下3個問題:

  1. 如何知道JVM要退出?
  2. 如何阻止Daemon的線程在JVM退出被殺掉而導(dǎo)致消息丟失?
  3. 如果Worker線程在阻塞,如何喚起并退出?

第一個問題:如果一個后臺程序運行在控制臺的前臺,通過Ctrl + C可以發(fā)送退出信號給JVM,也可以通過kill -2 PS_IS 或者 kill -15 PS_IS發(fā)送退出信號,但是不能發(fā)送kill -9 PS_IS, 否則進程會無條件強制退出。JVM收到退出信號后,會調(diào)用注冊的鉤子,我們通過的注冊的JVM退出鉤子進行優(yōu)雅關(guān)機。

第二個問題:線程分為Daemon線程和非Daemon線程,一個線程默認(rèn)繼承父線程的Daemon屬性,如果當(dāng)前線程池是由Daemon線程創(chuàng)建的,則Worker線程是Daemon線程。如果Worker線程是Daemon線程,我們需要在JVM退出鉤子中等待Worker線程完成當(dāng)前手頭處理的消息,再退出JVM。如果不是Daemon線程,即使JVM收到退出信號,也得等待Worker線程退出后再退出,不會丟掉正在處理的消息。

第三個問題:在Worker線程從Kafka服務(wù)器消費消息的時候,Worker線程可能處于阻塞,這時需要中斷線程以退出,沒有消息被丟掉。在Worker線程處理業(yè)務(wù)時有可能有阻塞,例如:IO,網(wǎng)絡(luò)IO,在指定退出時間內(nèi)沒有完成,我們也需要中斷線程退出,這時會產(chǎn)生一個InterruptedException, 在異常處理的默認(rèn)處理器中被捕捉,并寫入錯誤日志,Worker線程隨后退出。

4 使用指南

kclient提供了三種使用方法,對于每一種方法,按照下面的步驟可快速構(gòu)建Kafka生產(chǎn)者和消費者程序。

4.1 前置步驟

1.下載源代碼后在項目根目錄執(zhí)行如下命令安裝打包文件到你的Maven本地庫。

  1. mvn install 

2.在你的項目pom.xml文件中添加對kclient的依賴。

  1. <dependency> 
  2.     <groupId>com.robert.kafka</groupId> 
  3.     <artifactId>kclient-core</artifactId> 
  4.     <version>0.0.1</version> 
  5. </dependency> 

3.根據(jù)Kafka官方文檔搭建Kafka環(huán)境,并創(chuàng)建兩個Topic, test1和test2。

4.然后,從Kafka安裝目錄的config目錄下拷貝kafka-consumer.properties和kafka-producer.properties到你的項目類路徑下,通常是src/main/resources目錄。

4.2 Java API

Java API提供了最直接,最簡單的使用kclient的方法。

構(gòu)建Producer示例:

  1. KafkaProducer kafkaProducer = new KafkaProducer("kafka-producer.properties""test"); 
  2.  
  3. for (int i = 0; i < 10; i++) { 
  4.     Dog dog = new Dog(); 
  5.     dog.setName("Yours " + i); 
  6.     dog.setId(i); 
  7.     kafkaProducer.sendBean2Topic("test", dog); 
  8.  
  9.     System.out.format("Sending dog: %d \n", i + 1); 
  10.  
  11.     Thread.sleep(100); 

構(gòu)建Consumer示例:

  1. DogHandler mbe = new DogHandler(); 
  2.  
  3. KafkaConsumer kafkaConsumer = new KafkaConsumer("kafka-consumer.properties""test", 1, mbe); 
  4. try { 
  5.     kafkaConsumer.startup(); 
  6.  
  7.     try { 
  8.         System.in.read(); 
  9.     } catch (IOException e) { 
  10.         e.printStackTrace(); 
  11.     } 
  12. } finally { 
  13.     kafkaConsumer.shutdownGracefully(); 
  1. public class DogHandler extends BeanMessageHandler<Dog> { 
  2.     public DogHandler() { 
  3.         super(Dog.class); 
  4.     } 
  5.  
  6.     protected void doExecuteBean(Dog dog) { 
  7.         System.out.format("Receiving dog: %s\n", dog); 
  8.     } 

4.3 Spring環(huán)境集成

kclient可以與Spring環(huán)境無縫集成,你可以像使用Spring Bean一樣來使用KafkaProducer和KafkaConsumer。

構(gòu)建Producer示例:

  1. ApplicationContext ac = new ClassPathXmlApplicationContext("kafka-producer.xml"); 
  2.  
  3. KafkaProducer kafkaProducer = (KafkaProducer) ac.getBean("producer"); 
  4.  
  5. for (int i = 0; i < 10; i++) { 
  6.     Dog dog = new Dog(); 
  7.     dog.setName("Yours " + i); 
  8.     dog.setId(i); 
  9.     kafkaProducer.send2Topic("test", JSON.toJSONString(dog)); 
  10.  
  11.     System.out.format("Sending dog: %d \n", i + 1); 
  12.  
  13.     Thread.sleep(100); 
  1. <bean name="producer" class="com.robert.kafka.kclient.core.KafkaProducer" init-method="init"
  2.     <property name="propertiesFile" value="kafka-producer.properties"/> 
  3.     <property name="defaultTopic" value="test"/> 
  4. </bean> 

構(gòu)建Consumer示例:

  1. ApplicationContext ac = new ClassPathXmlApplicationContext( 
  2.         "kafka-consumer.xml"); 
  3.  
  4. KafkaConsumer kafkaConsumer = (KafkaConsumer) ac.getBean("consumer"); 
  5. try { 
  6.     kafkaConsumer.startup(); 
  7.  
  8.     try { 
  9.         System.in.read(); 
  10.     } catch (IOException e) { 
  11.         e.printStackTrace(); 
  12.     } 
  13. } finally { 
  14.     kafkaConsumer.shutdownGracefully(); 
  1. public class DogHandler extends BeanMessageHandler<Dog> { 
  2.     public DogHandler() { 
  3.         super(Dog.class); 
  4.     } 
  5.  
  6.     protected void doExecuteBean(Dog dog) { 
  7.         System.out.format("Receiving dog: %s\n", dog); 
  8.     } 
  1. <bean name="dogHandler" class="com.robert.kafka.kclient.sample.api.DogHandler" /> 
  2.  
  3. <bean name="consumer" class="com.robert.kafka.kclient.core.KafkaConsumer" init-method="init"
  4.     <property name="propertiesFile" value="kafka-consumer.properties" /> 
  5.     <property name="topic" value="test" /> 
  6.     <property name="streamNum" value="1" /> 
  7.     <property name="handler" ref="dogHandler" /> 
  8. </bean> 

4.4 服務(wù)源碼注解

kclient提供了類似Spring聲明式的編程方法,使用注解聲明Kafka處理器方法,所有的線程模型、異常處理、服務(wù)啟動和關(guān)閉等都由后臺服務(wù)自動完成,極大程度的簡化了API的使用方法,提高了開發(fā)者的工作效率。

注解聲明Kafka消息處理器:

  1. @KafkaHandlers 
  2. public class AnnotatedDogHandler { 
  3.     @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test", streamNum = 1) 
  4.     @OutputProducer(propertiesFile = "kafka-producer.properties", defaultTopic = "test1"
  5.     public Cat dogHandler(Dog dog) { 
  6.         System.out.println("Annotated dogHandler handles: " + dog); 
  7.  
  8.         return new Cat(dog); 
  9.     } 
  10.  
  11.     @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test1", streamNum = 1) 
  12.     public void catHandler(Cat cat) throws IOException { 
  13.         System.out.println("Annotated catHandler handles: " + cat); 
  14.  
  15.         throw new IOException("Man made exception."); 
  16.     } 
  17.  
  18.     @ErrorHandler(exception = IOException.class, topic = "test1"
  19.     public void ioExceptionHandler(IOException e, String message) { 
  20.         System.out.println("Annotated excepHandler handles: " + e); 
  21.     } 

注解啟動程序:

  1. public static void main(String[] args) { 
  2.     ApplicationContext ac = new ClassPathXmlApplicationContext( 
  3.             "annotated-kafka-consumer.xml"); 
  4.  
  5.     try { 
  6.         System.in.read(); 
  7.     } catch (IOException e) { 
  8.         e.printStackTrace(); 
  9.     } 

注解Spring環(huán)境配置:

  1. <bean name="kclientBoot" class="com.robert.kafka.kclient.boot.kclientBoot" init-method="init"/> 
  2.  
  3. <context:component-scan base-package="com.robert.kafka.kclient.sample.annotation" /> 

5 API簡介

5.1 Producer API

KafkaProducer類提供了豐富的API來發(fā)送不同類型的消息,它支持發(fā)送字符串消息,發(fā)送一個普通的Bean,以及發(fā)送JSON對象等。在這些API中可以指定發(fā)送到某個Topic,也可以不指定而使用默認(rèn)的Topic。對于發(fā)送的數(shù)據(jù),支持帶Key值的消息和不帶Key值的消息。

發(fā)送字符串消息:

  1. public void send(String message); 
  2. public void send2Topic(String topicName, String message);  
  3. public void send(String key, String message);  
  4. public void send2Topic(String topicName, String key, String message);  
  5. public void send(Collection<String> messages);  
  6. public void send2Topic(String topicName, Collection<String> messages);  
  7. public void send(Map<String, String> messages);  
  8. public void send2Topic(String topicName, Map<String, String> messages); 

發(fā)送Bean消息:

  1. public <T> void sendBean(T bean);  
  2. public <T> void sendBean2Topic(String topicName, T bean);  
  3. public <T> void sendBean(String key, T bean);  
  4. public <T> void sendBean2Topic(String topicName, String key, T bean);  
  5. public <T> void sendBeans(Collection<T> beans);  
  6. public <T> void sendBeans2Topic(String topicName, Collection<T> beans);  
  7. public <T> void sendBeans(Map<String, T> beans);  
  8. public <T> void sendBeans2Topic(String topicName, Map<String, T> beans); 

發(fā)送JSON對象消息:

  1. public void sendObject(JSONObject jsonObject);  
  2. public void sendObject2Topic(String topicName, JSONObject jsonObject);  
  3. public void sendObject(String key, JSONObject jsonObject);  
  4. public void sendObject2Topic(String topicName, String key, JSONObject jsonObject);  
  5. public void sendObjects(JSONArray jsonArray);  
  6. public void sendObjects2Topic(String topicName, JSONArray jsonArray);  
  7. public void sendObjects(Map<String, JSONObject> jsonObjects);  
  8. public void sendObjects2Topic(String topicName, Map<String, JSONObject> jsonObjects); 

5.2 Consumer API

KafkaConsumer類提供了豐富的構(gòu)造函數(shù)用來指定Kafka消費者服務(wù)器的各項參數(shù),包括線程池策略,線程池類型,流數(shù)量等等。

使用PROPERTIES文件初始化:

  1. public KafkaConsumer(String propertiesFile, String topic, int streamNum, MessageHandler handler); 
  2. public KafkaConsumer(String propertiesFile, String topic, int streamNum, int fixedThreadNum, MessageHandler handler); 
  3. public KafkaConsumer(String propertiesFile, String topic, int streamNum, int fixedThreadNum, boolean isSharedThreadPool, MessageHandler handler); 
  4. public KafkaConsumer(String propertiesFile, String topic, int streamNum, int minThreadNum, int maxThreadNum, MessageHandler handler); 
  5. public KafkaConsumer(String propertiesFile, String topic, int streamNum, int minThreadNum, int maxThreadNum, boolean isSharedThreadPool,MessageHandler handler); 

使用PROPERTIES對象初始化:

  1. public KafkaConsumer(Properties properties, String topic, int streamNum, MessageHandler handler); 
  2. public KafkaConsumer(Properties properties, String topic, int streamNum, int fixedThreadNum, MessageHandler handler); 
  3. public KafkaConsumer(Properties properties, String topic, int streamNum, int fixedThreadNum, boolean isSharedThreadPool, MessageHandler handler); 
  4. public KafkaConsumer(Properties properties, String topic, int streamNum, int minThreadNum, int maxThreadNum, MessageHandler handler); 
  5. public KafkaConsumer(Properties properties, String topic, int streamNum, int minThreadNum, int maxThreadNum, boolean isSharedThreadPool,MessageHandler handler); 

5.3 消息處理器

消息處理器結(jié)構(gòu)提供了一個基本接口,并且提供了不同的抽象類實現(xiàn)不同層次的功能,讓功能得到最大化的重用,并且互相解偶,開發(fā)者可以根據(jù)需求選擇某一個抽象類來繼承和使用。

接口定義:

  1. public interface MessageHandler { 
  2.     public void execute(String message); 

安全處理異常抽象類:

  1. public abstract class SafelyMessageHandler implements MessageHandler { 
  2.     public void execute(String message) { 
  3.         try { 
  4.             doExecute(message); 
  5.         } catch (Throwable t) { 
  6.             handleException(t, message); 
  7.         } 
  8.     } 
  9.  
  10.     protected void handleException(Throwable t, String message) { 
  11.         for (ExceptionHandler excepHandler : excepHandlers) { 
  12.             if (t.getClass() == IllegalStateException.class 
  13.                     && t.getCause() != null 
  14.                     && t.getCause().getClass() == InvocationTargetException.class 
  15.                     && t.getCause().getCause() != null
  16.                 t = t.getCause().getCause(); 
  17.  
  18.             if (excepHandler.support(t)) { 
  19.                 try { 
  20.                     excepHandler.handle(t, message); 
  21.                 } catch (Exception e) { 
  22.                     log.error( 
  23.                             "Exception hanppens when the handler {} is handling the exception {} and the message {}. Please check if the exception handler is configured properly."
  24.                             excepHandler.getClass(), t.getClass(), message); 
  25.                     log.error( 
  26.                             "The stack of the new exception on exception is, "
  27.                             e); 
  28.                 } 
  29.             } 
  30.         } 
  31.     } 
  32.  
  33. protected abstract void doExecute(String message); 

面向類型的抽象類:

  1. public abstract class BeanMessageHandler<T> extends SafelyMessageHandler {...} 
  2. public abstract class BeansMessageHandler<T> extends SafelyMessageHandler {...} 
  3. public abstract class DocumentMessageHandler extends SafelyMessageHandler {...} 
  4. public abstract class ObjectMessageHandler extends SafelyMessageHandler {...} 
  5. public abstract class ObjectsMessageHandler extends SafelyMessageHandler {...} 

5.4 消息處理器注解

正如上面使用指南第三部分服務(wù)源碼注解所講述的那樣,kclient可以通過注解來聲明Kafka消息處理器,kclient提供了@KafkaHandlers、@InputConsumer、@OutputProducer和@ErrorHandler等注解。

@KafkaHandlers:

  1. @Target({ ElementType.TYPE }) 
  2. @Retention(RetentionPolicy.RUNTIME) 
  3. @Documented 
  4. @Component 
  5. public @interface KafkaHandlers { 

@InputConsumer:

  1. @Target({ ElementType.METHOD }) 
  2. @Retention(RetentionPolicy.RUNTIME) 
  3. @Documented 
  4. public @interface InputConsumer { 
  5.     String propertiesFile() default ""
  6.  
  7.     String topic() default ""
  8.  
  9.     int streamNum() default 1; 
  10.  
  11.     int fixedThreadNum() default 0; 
  12.  
  13.     int minThreadNum() default 0; 
  14.  
  15.     int maxThreadNum() default 0; 

@OutputProducer:

  1. @Target({ ElementType.METHOD }) 
  2. @Retention(RetentionPolicy.RUNTIME) 
  3. @Documented 
  4. public @interface OutputProducer { 
  5.     String propertiesFile() default ""
  6.  
  7.     String defaultTopic() default ""

@ErrorHandler:

  1. @Target({ ElementType.METHOD }) 
  2. @Retention(RetentionPolicy.RUNTIME) 
  3. @Documented 
  4. public @interface ErrorHandler { 
  5.     Class<? extends Throwable> exception() default Throwable.class; 
  6.  
  7.     String topic() default ""

6 消息處理機模板項目

6.1 快速開發(fā)向?qū)?/strong>

通過下面步驟可以快速開發(fā)Kafka處理機服務(wù)。

1.從本項目下載kclient-processor項目模板,并且根據(jù)業(yè)務(wù)需要修改pom.xml后導(dǎo)入Eclipse。

2.根據(jù)業(yè)務(wù)需要更改com.robert.kclient.app.handler.AnimalsHandler類名稱,并且根據(jù)業(yè)務(wù)需要修改處理器的注解。這里,可以導(dǎo)入業(yè)務(wù)服務(wù)對消息進行處理。

  1. @KafkaHandlers 
  2. public class AnimalsHandler { 
  3.     @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test", streamNum = 1) 
  4.     @OutputProducer(propertiesFile = "kafka-producer.properties", defaultTopic = "test1"
  5.     public Cat dogHandler(Dog dog) { 
  6.         System.out.println("Annotated dogHandler handles: " + dog); 
  7.  
  8.         return new Cat(dog); 
  9.     } 
  10.  
  11.     @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test1", streamNum = 1) 
  12.     public void catHandler(Cat cat) throws IOException { 
  13.         System.out.println("Annotated catHandler handles: " + cat); 
  14.  
  15.         throw new IOException("Man made exception."); 
  16.     } 
  17.  
  18.     @ErrorHandler(exception = IOException.class, topic = "test1"
  19.     public void ioExceptionHandler(IOException e, String message) { 
  20.         System.out.println("Annotated excepHandler handles: " + e); 
  21.     } 

3.通過mvn package既可以打包包含Spring Boot功能的自啟動jar包。

4.通過java -jar kclient-processor.jar即可啟動服務(wù)。

6.2 后臺監(jiān)控和管理

kclient模板項目提供了后臺管理接口來監(jiān)控和管理消息處理服務(wù)。

1.歡迎詞 - 用來校驗服務(wù)是否啟動成功。

curl http://localhost:8080/

2.服務(wù)狀態(tài) - 顯示處理器數(shù)量。

curl http://localhost:8080/status

3.重啟服務(wù) - 重新啟動服務(wù)。

curl http://localhost:8080/restart

7 性能壓測

Benchmark應(yīng)該覆蓋推送QPS、接收處理QPS以及單線程、多線程生產(chǎn)者的用例。

用例1: 輕量級服務(wù)同步線程模型和異步線程模型的性能對比。

用例2: 重量級服務(wù)同步線程模型和異步線程模型的性能對比。

用例3: 重量級服務(wù)異步線程模型中所有消費者流共享線程池和每個流獨享線程池的性能對比。

用例4: 重量級服務(wù)異步線程模型中每個流獨享線程池的對比的確定數(shù)量線程的線程池和線程數(shù)量可伸縮的線程池的性能對比。

由于筆者在發(fā)文的時候還沒有時間完成前面四種場景的壓測,暫時留給讀者親自動手,作為一個練習(xí)。

8 更多思考

盡管本文設(shè)計和實現(xiàn)的kclient項目提供了許多高級功能,并且使用起來方便,而且筆者在幾家公司里在線上進行了應(yīng)用,已經(jīng)發(fā)揮了不少的作用,但是,還有一些細(xì)節(jié)需要提高。

kclient處理器項目中管理Restful服務(wù)暫時只提供了獲得狀態(tài)的API,需要進行進一步的豐富,增加對線程池的監(jiān)控,以及消息處理性能的監(jiān)控。

當(dāng)前注解@ErrorHandler里面的exception參數(shù)為必選,完全可以使用方法第一參數(shù)進行推導(dǎo),簡化開發(fā)人員配置的工作。

模板項目還不完善,需要增加啟動和關(guān)閉腳本,這樣讀者可以直接拷貝使用。

盡管線上應(yīng)用已經(jīng)證明了kclient沒有性能問題,但是開發(fā)一款中間件系統(tǒng)是需要閉環(huán)的,需要盡快把性能壓測這塊昨晚并且形成壓測報表。

點擊《簡單易用的消息隊列框架的設(shè)計與實現(xiàn)》閱讀原文。

【本文為51CTO專欄作者“李艷鵬”的原創(chuàng)稿件,轉(zhuǎn)載可通過作者簡書號(李艷鵬)或51CTO專欄獲取聯(lián)系】

戳這里,看該作者更多好文

責(zé)任編輯:武曉燕 來源: 51CTO專欄
相關(guān)推薦

2023-09-12 14:58:00

Redis

2023-11-13 08:37:33

消息中間件分布式架構(gòu)

2024-05-07 09:02:47

2023-12-30 13:47:48

Redis消息隊列機制

2022-01-21 19:22:45

RedisList命令

2015-11-10 18:04:22

FileMaker

2022-01-15 07:20:18

Redis List 消息隊列

2010-02-26 13:14:39

Java日志系統(tǒng)

2024-02-26 07:43:10

大語言模型LLM推理框架

2024-10-16 15:11:58

消息隊列系統(tǒng)設(shè)計

2024-10-25 08:41:18

消息隊列RedisList

2021-12-16 13:04:41

消息隊列緩存

2022-09-07 21:43:34

云原生存儲技術(shù)消息隊列

2025-03-12 07:55:46

2022-05-14 23:49:32

Python數(shù)據(jù)計算技巧

2022-04-03 15:44:55

Vue.js框架設(shè)計設(shè)計與實現(xiàn)

2009-08-06 16:21:09

點對點消息隊列

2016-09-18 18:27:21

KubernetesDocker

2024-03-22 12:10:39

Redis消息隊列數(shù)據(jù)庫

2019-07-24 14:49:48

SQL開源庫BI軟件
點贊
收藏

51CTO技術(shù)棧公眾號