非常簡單的SpringCloudStream集成Kafka教程!
哈嘍,大家好,我是指北君。
開發(fā)中,服務與服務之間通信通常會用到消息中間件,如果我們使用了某一個MQ,那么消息中間件與我們的系統(tǒng)算是高耦合。將來有一天,要替換成另外的MQ,我們的改動就會比較大。為了解決這個問題,我們可以使用Spring Cloud Stream 來整合我們的消息中間件,降低耦合度,使服務可以更多關注自己的業(yè)務邏輯等。
今天為大家?guī)硪粋€人人可實操的SpringCloudStream集成Kafka的快速入門示例。
1.前言
SpringCloudStream是一個構建高擴展性的事件消息驅動的微服務框架。簡單點說就是幫助你操作MQ,可以與底層MQ框架解耦。將來想要替換MQ框架的時候會比較容易。
Kafka是一個分布式發(fā)布 - 訂閱消息系統(tǒng),源于LinkedIn的一個項目,2011年成為開源Apache項目。
ZooKeeper 是 Apache 軟件基金會的一個軟件項目,它為大型分布式計算提供開源的分布式配置服務、同步服務和命名注冊,Kafka的實現(xiàn)同時也依賴于zookeeper。
2.Windows搭建簡單的Kafka
2.1 啟動zookeeper
使用Kafka首先需要啟動zookeeper,windows中搭建zookeeper也很簡單。以下幾步即可完成:
- 下載zookeeper (本文使用3.7.0版本,下載鏈接在文章末尾。)
- 配置基本環(huán)境變量:
將conf文件夾下面的 zoo_sample.cfg 重命名zoo.cfg。并修改其工作目錄dataDir。
bin文件夾下面有zkEnv.cmd有zookeeper相關的配置,其中就包括JAVA_HOME,所以系統(tǒng)環(huán)境變量需要配置JAVA_HOME,或者直接用Java的路徑來替換。
- 啟動,在bin目錄下運行zkServer.cmd腳本啟動zookeeper。
默認啟動端口2181為。
正常啟動如下:
2.2 搭建Kafka
本地使用kafka同樣也是如下的幾個步驟:
- 下載Kafka(本文使用2.11版本,下載鏈接見文章末尾)。
- 環(huán)境變量配置:
查看config文件下面的 server.properties配置文件中的zookeeper的配置。
zookeeper.connect=localhost:2181
在bin/windows文件夾下面kafka-run-class.bat文件中有JAVA_HOME的配置,同樣也可以直接改成系統(tǒng)的Java路徑。
- 在kafka根目錄下使用如下命令啟動kafka,并在zookeeper中注冊。
# .\bin\windows\kafka-server-start.bat .\config\server.properties
- 創(chuàng)建topic,在bin\windows目錄下使用如下命令。創(chuàng)建名稱為“test”的topic。
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic test
- 使用windows命令窗口的producer和consumer,在bin\windows目錄下使用如下命令。
#test topic的消息生產(chǎn)者
kafka-console-producer.bat --broker-list localhost:9092 --topic test
#test topic的消息消費者
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
#test topic的消息消費者(從頭消費)
kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic
kafka啟動windows界面如下:
3 SpringCloudStream集成Kafka
3.1 引入依賴
由于我們直接使用Spring Cloud Stream 集成Kafka,官方也已經(jīng)有現(xiàn)成的starter。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
3.2 關于kafka的配置
spring:
application:
name: shop-server
cloud:
stream:
bindings:
#配置自己定義的通道與哪個中間件交互
input: #MessageChannel里Input和Output的值
destination: test #目標主題 相當于kafka的topic
output:
destination: test1 #本例子創(chuàng)建了另外一個topic (test1)用于區(qū)分不同的功能區(qū)分。
default-binder: kafka #默認的binder是kafka
kafka:
binder:
zk-nodes: localhost:2181
bootstrap-servers: localhost:9092 #kafka服務地址,集群部署的時候需要配置多個,
consumer:
group-id: consumer1
producer:
key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
client-id: producer1
server:
port: 8100
3.3 消費者示例
首先需要定義SubscribableChannel 接口方法使用Input注解。
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}
然后簡單的使用 StreamListener 監(jiān)聽某一通道的消息。
@Service
@EnableBinding(Sink.class)
public class MessageSinkHandler {
@StreamListener(Sink.INPUT)
public void handler(Message<String> msg){
System.out.println(" received message : "+msg);
}
}
cloud stream配置中綁定了對應的Kafka topic,如下:
cloud:
stream:
bindings:
#配置自己定義的通道與哪個中間件交互
input: #SubscribableChannel里Input值
destination: test #目標主題
我們使用Kafka console producer 生產(chǎn)消息。
kafka-console-producer.bat --broker-list localhost:9092 --topic test
同時啟動我們的示例SpringBoot項目,使用producer推送幾條消息。
我們同時啟動一個Kafka console consumer。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
消費結果如下:
Spring Boot 項目消費消息如下:
3.4 生產(chǎn)者示例
首先需要定義生產(chǎn)者MessageChannel,這里會用到Output注解。
public interface KafkaSource {
String OUTPUT = "output";
@Output(KafkaSource.OUTPUT)
MessageChannel output();
}
使用MessageChannel 發(fā)送消息。
@Component
public class MessageService {
@Autowired
private KafkaSource source;
public Object sendMessage(Object msg) {
source.output().send(MessageBuilder.withPayload(msg).build());
return msg;
}
定義一個Rest API 來觸發(fā)消息發(fā)送。
@RestController
public class MessageController {
@Autowired
private MessageService messageService;
@GetMapping(value = "/sendMessage/{msg}")
public String sendMessage(@PathVariable("msg") String msg){
messageService.sendMessage("messageService send out : " + msg + LocalDateTime.now());
return "sent message";
}
}
配置中關于producer的配置如下:
cloud:
stream:
bindings:
input:
destination: test
output:
destination: test1 #目標topic
啟動SpringBoot App, 并觸發(fā)如下API call。
??http://localhost:8100/sendMessage/JavaNorthProducer??
我們同時啟動一個Kafka console consumer,這里我們使用另一個test1 topic。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1
console consumer消費消息如下:
總結
本章初步介紹了Spring Cloud Stream 集成Kafka的簡單示例,實現(xiàn)了簡單的發(fā)布-訂閱功能。但是Spring Cloud Stream肯定還有更多的功能,我們后續(xù)還將繼續(xù)深入學習更多Stream的功能。
以上示例倉庫:https://github.com/javatechnorth/java-study-note/tree/master/kafka
下載鏈接:
??https://dlcdn.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz??