自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

非常簡單的SpringCloudStream集成Kafka教程!

開發(fā) 架構
本章初步介紹了Spring Cloud Stream 集成Kafka的簡單示例,實現(xiàn)了簡單的發(fā)布-訂閱功能。但是Spring Cloud Stream肯定還有更多的功能,我們后續(xù)還將繼續(xù)深入學習更多Stream的功能。

哈嘍,大家好,我是指北君。

開發(fā)中,服務與服務之間通信通常會用到消息中間件,如果我們使用了某一個MQ,那么消息中間件與我們的系統(tǒng)算是高耦合。將來有一天,要替換成另外的MQ,我們的改動就會比較大。為了解決這個問題,我們可以使用Spring Cloud Stream 來整合我們的消息中間件,降低耦合度,使服務可以更多關注自己的業(yè)務邏輯等。

今天為大家?guī)硪粋€人人可實操的SpringCloudStream集成Kafka的快速入門示例。

1.前言

SpringCloudStream是一個構建高擴展性的事件消息驅動的微服務框架。簡單點說就是幫助你操作MQ,可以與底層MQ框架解耦。將來想要替換MQ框架的時候會比較容易。

Kafka是一個分布式發(fā)布 - 訂閱消息系統(tǒng),源于LinkedIn的一個項目,2011年成為開源Apache項目。

ZooKeeper 是 Apache 軟件基金會的一個軟件項目,它為大型分布式計算提供開源的分布式配置服務、同步服務和命名注冊,Kafka的實現(xiàn)同時也依賴于zookeeper。

2.Windows搭建簡單的Kafka

2.1 啟動zookeeper

使用Kafka首先需要啟動zookeeper,windows中搭建zookeeper也很簡單。以下幾步即可完成:

  • 下載zookeeper (本文使用3.7.0版本,下載鏈接在文章末尾。)
  • 配置基本環(huán)境變量:

將conf文件夾下面的 zoo_sample.cfg 重命名zoo.cfg。并修改其工作目錄dataDir。

bin文件夾下面有zkEnv.cmd有zookeeper相關的配置,其中就包括JAVA_HOME,所以系統(tǒng)環(huán)境變量需要配置JAVA_HOME,或者直接用Java的路徑來替換。

  • 啟動,在bin目錄下運行zkServer.cmd腳本啟動zookeeper。

默認啟動端口2181為。

正常啟動如下:

2.2 搭建Kafka

本地使用kafka同樣也是如下的幾個步驟:

  • 下載Kafka(本文使用2.11版本,下載鏈接見文章末尾)。
  • 環(huán)境變量配置:

查看config文件下面的 server.properties配置文件中的zookeeper的配置。

zookeeper.connect=localhost:2181

在bin/windows文件夾下面kafka-run-class.bat文件中有JAVA_HOME的配置,同樣也可以直接改成系統(tǒng)的Java路徑

  • 在kafka根目錄下使用如下命令啟動kafka,并在zookeeper中注冊。
# .\bin\windows\kafka-server-start.bat .\config\server.properties
  • 創(chuàng)建topic,在bin\windows目錄下使用如下命令。創(chuàng)建名稱為“test”的topic。
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic test
  • 使用windows命令窗口的producer和consumer,在bin\windows目錄下使用如下命令
#test topic的消息生產(chǎn)者
kafka-console-producer.bat --broker-list localhost:9092 --topic test
#test topic的消息消費者
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
#test topic的消息消費者(從頭消費)
kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic

kafka啟動windows界面如下:

3 SpringCloudStream集成Kafka

3.1 引入依賴

由于我們直接使用Spring Cloud Stream 集成Kafka,官方也已經(jīng)有現(xiàn)成的starter。

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>

3.2 關于kafka的配置

spring:
application:
name: shop-server
cloud:
stream:
bindings:
#配置自己定義的通道與哪個中間件交互
input: #MessageChannel里Input和Output的值
destination: test #目標主題 相當于kafka的topic
output:
destination: test1 #本例子創(chuàng)建了另外一個topic (test1)用于區(qū)分不同的功能區(qū)分。
default-binder: kafka #默認的binder是kafka
kafka:
binder:
zk-nodes: localhost:2181
bootstrap-servers: localhost:9092 #kafka服務地址,集群部署的時候需要配置多個,
consumer:
group-id: consumer1
producer:
key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
client-id: producer1
server:
port: 8100

3.3 消費者示例

首先需要定義SubscribableChannel 接口方法使用Input注解。

public interface Sink {
String INPUT = "input";

@Input("input")
SubscribableChannel input();
}

然后簡單的使用 StreamListener 監(jiān)聽某一通道的消息。

@Service
@EnableBinding(Sink.class)
public class MessageSinkHandler {

@StreamListener(Sink.INPUT)
public void handler(Message<String> msg){
System.out.println(" received message : "+msg);

}
}

cloud stream配置中綁定了對應的Kafka topic,如下:

cloud:
stream:
bindings:
#配置自己定義的通道與哪個中間件交互
input: #SubscribableChannel里Input值
destination: test #目標主題

我們使用Kafka console producer 生產(chǎn)消息。

kafka-console-producer.bat --broker-list localhost:9092 --topic test

同時啟動我們的示例SpringBoot項目,使用producer推送幾條消息。

我們同時啟動一個Kafka console consumer。

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test

消費結果如下:

Spring Boot 項目消費消息如下:

3.4 生產(chǎn)者示例

首先需要定義生產(chǎn)者MessageChannel,這里會用到Output注解

public interface KafkaSource {
String OUTPUT = "output";

@Output(KafkaSource.OUTPUT)
MessageChannel output();
}

使用MessageChannel 發(fā)送消息。

@Component
public class MessageService {

@Autowired
private KafkaSource source;

public Object sendMessage(Object msg) {
source.output().send(MessageBuilder.withPayload(msg).build());
return msg;
}

定義一個Rest API 來觸發(fā)消息發(fā)送

@RestController
public class MessageController {

@Autowired
private MessageService messageService;

@GetMapping(value = "/sendMessage/{msg}")
public String sendMessage(@PathVariable("msg") String msg){
messageService.sendMessage("messageService send out : " + msg + LocalDateTime.now());
return "sent message";
}
}

配置中關于producer的配置如下:

cloud:
stream:
bindings:
input:
destination: test
output:
destination: test1 #目標topic

啟動SpringBoot App, 并觸發(fā)如下API call

??http://localhost:8100/sendMessage/JavaNorthProducer??

我們同時啟動一個Kafka console consumer,這里我們使用另一個test1 topic

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1

console consumer消費消息如下:

總結

本章初步介紹了Spring Cloud Stream 集成Kafka的簡單示例,實現(xiàn)了簡單的發(fā)布-訂閱功能。但是Spring Cloud Stream肯定還有更多的功能,我們后續(xù)還將繼續(xù)深入學習更多Stream的功能。

以上示例倉庫:https://github.com/javatechnorth/java-study-note/tree/master/kafka

下載鏈接:

??https://dlcdn.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz??

??https://kafka.apache.org/downloads??

責任編輯:武曉燕 來源: Java技術指北
相關推薦

2018-08-02 15:13:35

2021-12-01 12:30:43

NiceUmiJS前端

2020-09-29 15:08:47

Go UI框架開發(fā)

2010-02-05 13:56:56

Ubuntu Linu

2010-03-11 16:22:08

Python教程

2014-07-17 11:36:27

Android Stu使用教程

2009-06-24 10:58:21

jQuery插件教程

2019-12-03 11:00:08

spring bootspring-kafkJava

2014-04-24 13:35:11

OpenGL ES2.iOSAndroid

2019-05-27 17:01:02

PHPPDO編程語言

2011-05-11 15:10:21

jQueryCSS導航欄

2011-07-07 09:01:52

HTML 5

2024-08-05 08:45:35

SpringKafkaSCRAM

2010-07-06 11:09:52

Server 2008

2024-10-31 11:49:41

Kafka管理死信隊列

2020-02-21 17:33:17

SparkKafka數(shù)據(jù)

2011-08-30 15:32:08

QtQuickQML

2009-07-06 14:43:30

JSP元素

2009-09-29 10:40:12

政府應急指揮平臺

2023-01-11 15:11:36

SpringEhcache
點贊
收藏

51CTO技術棧公眾號