Stream幫你無感知切換消息中間件
哈嘍,大家好,我是了不起。
在實際的企業(yè)開發(fā)中,消息中間件是至關(guān)重要的組件之一。如常見的RabbitMQ和Kafka,這些中間件的差異性導(dǎo)致我們實際項目開發(fā)給我們造成了一定的困擾,這時候 Spring Cloud Stream 給我們提供了一種解耦合的方式。
簡介
Spring Cloud Stream 由一個中間件中立的核組成。
應(yīng)用通過 Spring Cloud Stream 插入的Input(相當(dāng)于消費者Consumer,它是從隊列中接收消息的)和Output(相當(dāng)于生產(chǎn)者Producer,它是從隊列中發(fā)送消息的。)通道與外界交流。
通道通過指定中間件的Binder實現(xiàn)與外部代理連接。
業(yè)務(wù)開發(fā)者不再關(guān)注具體消息中間件,只需關(guān)注Binder對應(yīng)用程序提供的抽象概念來使用消息中間件實現(xiàn)業(yè)務(wù)即可。
詳細(xì)介紹
核心概念
Spring Cloud Stream 為各大消息中間件產(chǎn)品提供了個性化的自動化配置實現(xiàn),引用了發(fā)布-訂閱、消費組、分區(qū)的三個核心概念。
Spring Cloud Stream 提供了很多抽象和基礎(chǔ)組件來簡化消息驅(qū)動型微服務(wù)應(yīng)用。包含以下內(nèi)容:
- Spring Cloud Stream的應(yīng)用模型
- 綁定抽象
- 持久化發(fā)布/訂閱支持
- 消費者組支持
- 分片支持(Partitioning Support)
- 可插拔API
應(yīng)用模型
Spring Cloud Stream由一個中立的中間件內(nèi)核組成。Spring Cloud Stream會注入輸入和輸出的channels,應(yīng)用程序通過這些channels與外界通信,而channels則是通過一個明確的中間件Binder與外部brokers連接。
圖片
各大消息中間件的綁定抽象
Spring Cloud Stream 提供對Kafka、Rabbit MQ、Redis、Gemfire的Binder實現(xiàn)。Spring Cloud Stream還包括了一個TestSupportBinder、TestSupportBinder預(yù)留一個未更改的channel以便于直接地、可靠地和channels通信。
分區(qū)支持
分區(qū)在有狀態(tài)處理中是一個很重要的概念,其重要性體現(xiàn)在性能和一致性上,要確保所有相關(guān)數(shù)據(jù)被一并處理,例如,在時間窗平均計算的例子中,給定傳感器測量結(jié)果應(yīng)該都由同一應(yīng)用實例進行計算。
Spring Cloud Stream支持在一個應(yīng)用程序的多個實例之間數(shù)據(jù)分區(qū),在分區(qū)的情況下,物理通信介質(zhì)(例如,topic代理)被視為多分區(qū)結(jié)構(gòu)。一個或多個生產(chǎn)者應(yīng)用程序?qū)嵗龑?shù)據(jù)發(fā)送給多個消費應(yīng)用實例,并保證共同的特性的數(shù)據(jù)由相同的消費者實例處理。
Spring Cloud Stream 提供了一個通用的抽象,用于統(tǒng)一方式進行分區(qū)處理,因此分區(qū)可以用于自帶分區(qū)的代理(如Kafka)或者不帶分區(qū)的代理(如RabbieMQ)
編程模型
Spring Cloud Stream 提供了一些預(yù)定義的注解,用于綁定輸入和輸出channels,以及如何監(jiān)聽channels。
通過@EnableBinding觸發(fā)綁定
將@EnableBinding注解添加到應(yīng)用的配置類,就可以把一個spring應(yīng)用轉(zhuǎn)換成Spring Cloud Stream應(yīng)用,@EnableBinding注解本身就包含@Configuration注解,會觸發(fā)Spring Cloud Stream 基本配置。
@Import(...)
@Configuration
@EnableIntegration
public @interface EnableBinding {
...
Class<?>[] value() default {};
}
@Input 與 @Output
一個Spring Cloud Stream應(yīng)用可以有任意數(shù)目的input和output通道,后者通過@Input和@Output注解在接口中定義。
@StreamListener
定義在方法中,被修飾的方法注冊為消息中間件上數(shù)據(jù)流的事件監(jiān)聽器,注解中屬性值對應(yīng)了監(jiān)聽的消息通道名。
Source、Sink和Processor
Spring Cloud Stream提供了三個開箱即用的預(yù)定義接口。
- Source用于有單個輸出(outbound)通道的應(yīng)用。
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
- Sink用于有單個輸入(inbound)通道的應(yīng)用。
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
- Processor用于單個應(yīng)用同時包含輸入和輸出通道的情況。
public interface Processor extends Source, Sink {
}
極簡實例
下面是一個非常簡單的 SpringBootApplication應(yīng)用,通過依賴Spring Cloud Stream,從Input通道監(jiān)聽消息然后返回應(yīng)答到Output通道,只要添加配置文件就可以應(yīng)用。
@SpringBootApplication
@EnableBinding(Processor.class)
public class ServiceApplication {
public static void main(String[] args) {
SpringApplication.run(MyLoggerServiceApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public LogMessage enrichLogMessage(LogMessage log) {
return new LogMessage(String.format("[1]: %s", log.getMessage()));
}
}
下面解釋下這個示例中相關(guān)注解的應(yīng)用:
- @EnableBinding聲明了這個應(yīng)用程序綁定了2個通道:INPUT和OUTPUT。這2個通道是在接口Processor中定義的(Spring Cloud Stream默認(rèn)設(shè)置)。所有通道都是配置在一個具體的消息中間件或綁定器中。
- @StreamListener(Processor.INPUT)表明這里在input中提取消息,并且處理。
- @SendTo(Processor.OUTPUT)表明在output中返回消息。
其他特性
消息發(fā)送失敗的處理
消息發(fā)送失敗后悔發(fā)送到默認(rèn)的一個“topic.errors"的channel中(topic是配置的destination)。要配置消息發(fā)送失敗的處理,需要將錯誤消息的channel打開。
消費者配置如下
spring:
application:
name: spring-cloud-stream-producer
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
producer:
group: test
sync: true
bindings:
output:
destination: stream-test-topic
content-type: text/plain # 內(nèi)容格式。這里使用 JSON
producer:
errorChannelEnabled: true
在啟動類中配置錯誤消息的Channel信息
@Bean("stream-test-topic.errors")
MessageChannel testoutPutErrorChannel(){
return new PublishSubscribeChannel();
}
新建異常處理service
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
@Service
public class ErrorProducerService {
@ServiceActivator(inputChannel = "stream-test-topic.errors")
public void receiveProducerError(Message message){
System.out.println("receive error msg :"+message);
}
}
當(dāng)發(fā)生異常時,由于測試類中已經(jīng)將異常捕獲,處理發(fā)送異常主要是在這里進行。
總結(jié)
這篇文章根據(jù) Spring Cloud Stream 的官方文檔,對Stream做了一個整體的介紹,包括設(shè)計目標(biāo),應(yīng)用場景,業(yè)務(wù)模型以及對外開放的注解,希望大家能夠?qū)W以致用。