自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

如何使用Kafka Connect創(chuàng)建用于處理實(shí)時(shí)數(shù)據(jù)的開源數(shù)據(jù)管道?

譯文
開發(fā) 前端 Kafka
本文介紹了如何使用完全開源的技術(shù)創(chuàng)建實(shí)時(shí)數(shù)據(jù)管道,這類開源技術(shù)包括 Kafka Connect、Apache Kafka和Kibana 等。

[[413839]]

【51CTO.com快譯】Kafka Connect是一種特別強(qiáng)大的開源數(shù)據(jù)流工具;有了它,將Kafka與其他數(shù)據(jù)技術(shù)結(jié)合使用非常輕松。作為一種分布式技術(shù),Kafka Connect提供了特別高的可用性和獨(dú)立于Kafka集群的彈性擴(kuò)展。Kafka Connect使用源或sink連接件發(fā)送進(jìn)出Kafka主題的數(shù)據(jù),無需代碼即可與多種非Kafka技術(shù)實(shí)現(xiàn)整合。

圖1

可靠的開源Kafka連接件可供許多流行的數(shù)據(jù)技術(shù)使用,您還有機(jī)會(huì)編寫自己的連接件。本文介紹了一個(gè)真實(shí)的實(shí)際數(shù)據(jù)用例,即如何使用Kafka Connect將來自Kafka的實(shí)時(shí)流數(shù)據(jù)與Elasticsearch(以啟用索引Kafka記錄的可擴(kuò)展搜索)和Kibana(以便可視化那些結(jié)果)整合起來。

圖2

針對(duì)表明Kafka和Kafka Connect優(yōu)點(diǎn)的一個(gè)用例,我受到CDC新冠疫情數(shù)據(jù)跟蹤器的啟發(fā)?;贙afka的跟蹤器從多個(gè)位置、以多種格式并使用多種協(xié)議收集實(shí)時(shí)新冠病毒檢測數(shù)據(jù),并將這些事件處理成易于使用的可視化結(jié)果。跟蹤器還有必要的數(shù)據(jù)治理機(jī)制,以確保結(jié)果快速到達(dá),并值得信任。

我開始尋找一個(gè)同樣復(fù)雜且引人注目的用例——但理想情況下,不像新冠疫情那樣令人擔(dān)憂。最終,我發(fā)現(xiàn)了一個(gè)有趣的領(lǐng)域:月潮,包括公開可用的流REST API和采用簡單JSON格式的豐富數(shù)據(jù)。

月潮數(shù)據(jù)

潮汐遵循太陰日,這是一個(gè)24小時(shí)50分鐘的周期;在此期間,地球完全自轉(zhuǎn)到軌道衛(wèi)星下方的同一點(diǎn)。每個(gè)太陰日有月球引力引起的兩個(gè)高潮和兩個(gè)低潮:

圖3. 來自美國國家海洋和大氣管理局

美國國家海洋和大氣管理局(NOAA)提供了一個(gè)REST API,可以從全球潮汐站輕松獲取詳細(xì)的傳感器數(shù)據(jù)。

圖4

比如說,下列REST調(diào)用指定了潮汐站ID、數(shù)據(jù)類型(我選擇了海平面)和數(shù)據(jù)(平均海平面),并請(qǐng)求一個(gè)采用公制單位的最近結(jié)果:

https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8724580&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json

該調(diào)用返回JSON結(jié)果,含有潮汐站的經(jīng)緯度、時(shí)間和水位值。請(qǐng)注意,您必須記住您調(diào)用的是什么,以便了解所返回結(jié)果的數(shù)據(jù)類型、數(shù)據(jù)和單位!

  1. {"metadata": { 
  2.    "id":"8724580"
  3.    "name":"Key West"
  4.    "lat":"24.5508”, 
  5.    "lon":"-81.8081"}, 
  6.  "data":[{ 
  7.    "t":"2020-09-24 04:18"
  8.    "v":"0.597"
  9.       "s":"0.005""f":"1,0,0,0""q":"p"}]} 

啟動(dòng)數(shù)據(jù)管道(使用REST源連接件)

要開始創(chuàng)建Kafka Connect流數(shù)據(jù)管道,我們必須先準(zhǔn)備Kafka集群和Kafka Connect集群。

圖5

接下來,我們引入一個(gè)REST連接件,比如這個(gè)可用的開源連接件。我們會(huì)將其部署到AWS S3存儲(chǔ)桶(如果需要,參照這些說明)。 然后我們將要求Kafka Connect集群使用S3存儲(chǔ)桶,對(duì)它同步以便在集群中可見,配置連接件,最后讓它運(yùn)行起來。這種“BYOC”(自帶連接件)方法確保您有無數(shù)的方法來尋找滿足特定要求的連接件。

 

圖6

下列示例演示使用“curl”命令將完全開源的Kafka Connect部署環(huán)境配置成可使用REST API。請(qǐng)注意,您需要更改URL、名稱和密碼以匹配您自己的部署:

  1. curl https://connectorClusterIP:8083/connectors -k -u name:password -X POST -H 'Content-Type: application/json' -d ' 
  2.     "name""source_rest_tide_1"
  3.     "config": { 
  4.       "key.converter":"org.apache.kafka.connect.storage.StringConverter"
  5.       "value.converter":"org.apache.kafka.connect.storage.StringConverter"
  6.       "connector.class""com.tm.kafka.connect.rest.RestSourceConnector"
  7.       "tasks.max""1"
  8.       "rest.source.poll.interval.ms""600000"
  9.       "rest.source.method""GET"
  10.       "rest.source.url""https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8454000&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json"
  11.       "rest.source.headers""Content-Type:application/json,Accept:application/json"
  12.       "rest.source.topic.selector""com.tm.kafka.connect.rest.selector.SimpleTopicSelector"
  13.       "rest.source.destination.topics""tides-topic" 
  14.     } 

該代碼創(chuàng)建的連接件任務(wù)以10分鐘為間隔輪詢REST API,并將結(jié)果寫入到“tides-topic”Kafka主題。通過隨機(jī)選擇五個(gè)潮汐傳感器以這種方式收集數(shù)據(jù),潮汐數(shù)據(jù)現(xiàn)在通過五個(gè)配置和五個(gè)連接件填充了潮汐主題。

圖7

結(jié)束管道(使用Elasticsearch sink連接件)

為了將該潮汐數(shù)據(jù)放在某個(gè)地方,我們將在管道末端引入Elasticsearch集群和Kibana。 我們將配置一個(gè)開源Elasticsearch sink連接件,以便向Elasticsearch發(fā)送數(shù)據(jù)。

圖8

以下示例配置使用sink名稱、類、Elasticsearch索引和我們的Kafka主題。如果索引尚未存在,會(huì)創(chuàng)建一個(gè)有默認(rèn)映射的索引。

  1. curl https://connectorClusterIP:8083/connectors -k -u name:password -X POST -H 'Content-Type: application/json' -d ' 
  2.   "name" : "elastic-sink-tides"
  3.   "config" : 
  4.   { 
  5.     "connector.class" : "com.datamountaineer.streamreactor.connect.elastic7.ElasticSinkConnector"
  6.     "tasks.max" : 3, 
  7.     "topics" : "tides"
  8.     "connect.elastic.hosts" : ”ip", 
  9.     "connect.elastic.port" : 9201, 
  10.     "connect.elastic.kcql" : "INSERT INTO tides-index SELECT * FROM tides-topic"
  11.     "connect.elastic.use.http.username" : ”elasticName", 
  12.     "connect.elastic.use.http.password" : ”elasticPassword" 
  13.   } 
  14. }' 

該管道現(xiàn)在可運(yùn)作起來。然而,由于默認(rèn)索引映射,進(jìn)入到Tides索引的所有潮汐數(shù)據(jù)是字符串。

圖9

需要自定義映射以準(zhǔn)確地繪制我們的時(shí)間序列數(shù)據(jù)。我們將為下面的潮汐索引創(chuàng)建這個(gè)自定義映射,使用JSON“t”字段用于自定義日期,“v”作為兩倍數(shù),“name”作為代表聚合的關(guān)鍵字。

  1. curl -u elasticName:elasticPassword ”elasticURL:9201/tides-index"  -X PUT -H 'Content-Type: application/json' -d' 
  2. "mappings" : { 
  3.   "properties" : { 
  4.      "data" : { 
  5.         "properties" : { 
  6.              "t" : { "type" : "date"
  7.                      "format" : "yyyy-MM-dd HH:mm" 
  8.              }, 
  9.              "v" : { "type" : "double" }, 
  10.              "f" : { "type" : "text" }, 
  11.              "q" : { "type" : "text" }, 
  12.              "s" : { "type" : "text" } 
  13.              } 
  14.        }, 
  15.        "metadata" : { 
  16.           "properties" : { 
  17.              "id" : { "type" : "text" }, 
  18.              "lat" : { "type" : "text" }, 
  19.              "long" : { "type" : "text" }, 
  20.              "name" : { "type" : ”keyword" } }}}}         }' 

每次更改Elasticsearch索引映射時(shí),通常都需要Elasticsearch“重新索引”(刪除索引并重新索引所有數(shù)據(jù))。數(shù)據(jù)既可以從現(xiàn)有的Kafka sink連接件重放,就像我們在這個(gè)用例中所做的那樣,也可以使用Elasticsearch重新索引操作來獲取。

使用Kibana可視化數(shù)據(jù)

為了可視化潮汐數(shù)據(jù),我們先用Kibana創(chuàng)建一個(gè)索引模式,將“t”配置為時(shí)間過濾器字段。然后,我們將創(chuàng)建一個(gè)可視化,選擇線圖類型。最后,我們將配置圖設(shè)置,以便y軸顯示30分鐘內(nèi)的平均潮位,x 軸顯示隨時(shí)間變化的該數(shù)據(jù)。

結(jié)果是下圖顯示了五個(gè)樣本潮汐站的潮汐變化,管道從這些潮汐站收集數(shù)據(jù):

圖10

結(jié)果

我們可以從可視化中清楚地看到潮汐的周期性,每個(gè)太陰日出現(xiàn)兩次高潮。

圖11

更令人驚訝的是,每個(gè)全球潮汐站的高潮和低潮之間的間隔不一樣。這不僅受月球的影響,還受太陽、當(dāng)?shù)氐乩?、天氣和氣候變化的影響。這個(gè)示例Kafka Connect管道利用Kafka、Elasticsearch和Kibana幫助演示可視化的優(yōu)點(diǎn):它們通??梢越沂驹紨?shù)據(jù)無法揭示的信息!

原文標(biāo)題:How to Use Kafka Connect to Create an Open Source Data Pipeline for Processing Real-Time Data,作者:Paul Brebner

【51CTO譯稿,合作站點(diǎn)轉(zhuǎn)載請(qǐng)注明原文譯者和出處為51CTO.com】

責(zé)任編輯:華軒 來源: 今日頭條
相關(guān)推薦

2024-01-26 08:00:00

Python數(shù)據(jù)管道

2023-10-11 14:37:21

工具開發(fā)

2023-12-13 09:00:00

2023-11-21 08:11:48

Kafka的分區(qū)策略

2015-06-16 16:49:25

AWSKinesis實(shí)時(shí)數(shù)據(jù)處理

2023-05-25 08:24:46

Kafka大數(shù)據(jù)

2022-09-22 10:53:38

實(shí)時(shí)數(shù)據(jù)ML 模型

2013-09-23 09:24:33

2015-11-09 09:58:31

大數(shù)據(jù)Lambda架構(gòu)

2023-12-11 08:00:00

架構(gòu)FlinkDruid

2022-03-07 07:18:18

Netflix機(jī)器學(xué)習(xí)架構(gòu)

2022-03-16 10:20:57

數(shù)據(jù)智慧城市傳感器

2012-05-18 10:49:36

SAP大數(shù)據(jù)HANA

2017-08-09 13:30:21

大數(shù)據(jù)Apache Kafk實(shí)時(shí)處理

2019-08-19 14:24:39

數(shù)據(jù)分析Spark操作

2018-05-14 13:51:39

RDS Binlog架構(gòu)Kafka集群

2020-04-28 11:04:51

數(shù)據(jù)架構(gòu)互聯(lián)網(wǎng)Flink

2023-09-26 09:29:08

Java數(shù)據(jù)

2023-07-20 08:00:00

可視化數(shù)據(jù)Python

2020-07-08 10:11:18

數(shù)據(jù)中心實(shí)時(shí)數(shù)據(jù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)