大數(shù)據(jù)傳輸方法淺析
前言
近年來(lái),隨著社會(huì)服務(wù)信息化的高速發(fā)展,在互聯(lián)網(wǎng)、物聯(lián)網(wǎng)、金融、物流、電磁等各方面數(shù)據(jù)都呈現(xiàn)指數(shù)級(jí)的增長(zhǎng)。大數(shù)據(jù)的傳輸是大數(shù)據(jù)處理基本流程的重要一環(huán),高性能的數(shù)據(jù)傳輸可以為后續(xù)數(shù)據(jù)分析特別是實(shí)時(shí)分析提供保障。本文簡(jiǎn)要介紹了主流的大數(shù)據(jù)傳輸方法和多源異構(gòu)數(shù)據(jù)傳輸?shù)脑O(shè)計(jì)方案,為大家提供參考。
1、大數(shù)據(jù)傳輸相關(guān)背景
2003年起,Google公司相繼發(fā)表了Google FS、MapReduce、BigTable等3個(gè)系統(tǒng)(框架)的論文,說(shuō)明了這3個(gè)產(chǎn)品的詳細(xì)設(shè)計(jì)方法,為后來(lái)全球的大數(shù)據(jù)發(fā)展奠定了基礎(chǔ)。由于數(shù)據(jù)量和效率的問(wèn)題,傳統(tǒng)的單機(jī)存儲(chǔ)與計(jì)算已經(jīng)不適應(yīng)時(shí)代的發(fā)展,多節(jié)點(diǎn)的分布式存儲(chǔ)逐漸取而代之,這種方法可以在多個(gè)廉價(jià)的節(jié)點(diǎn)上同時(shí)存儲(chǔ)和并行計(jì)算,并且提供了很好的容錯(cuò)能力。
隨著大數(shù)據(jù)技術(shù)的不斷發(fā)展,更多高性能的處理框架走上了歷史舞臺(tái),形成了大數(shù)據(jù)生態(tài)系統(tǒng)。例如分布式存儲(chǔ)有HDFS、Hbase、hive等,分布式計(jì)算有MapReduce、Spark、Storm等,而作為該生態(tài)系統(tǒng)的重要組成部分,數(shù)據(jù)傳輸模塊必不可少,現(xiàn)在比較流行的有Kafka、Logstash、Sqoop等。
在數(shù)據(jù)傳輸?shù)倪^(guò)程中,不論是類(lèi)似將文件導(dǎo)入數(shù)據(jù)庫(kù)的離線數(shù)據(jù)傳輸,還是類(lèi)似實(shí)時(shí)采集數(shù)據(jù)傳輸?shù)綌?shù)據(jù)庫(kù)進(jìn)行計(jì)算的實(shí)時(shí)傳輸,我們都希望具有高速優(yōu)質(zhì)的傳輸效率,同時(shí),還要求數(shù)據(jù)傳輸達(dá)到良好的安全性、穩(wěn)定性、可靠性。另一方面,對(duì)于實(shí)時(shí)性要求比較高的,例如金融股票、數(shù)據(jù)可視化等方面需要獲得快速的響應(yīng),而對(duì)于傳入數(shù)據(jù)倉(cāng)庫(kù)保存的可以有一定延遲。
基于最基本的用戶(hù)需求,大數(shù)據(jù)傳輸機(jī)制應(yīng)當(dāng)遵循以下原則:
(1)模型安全性。大數(shù)據(jù)計(jì)算一般是由幾十個(gè)甚至上百個(gè)節(jié)點(diǎn)組成的,在獲取數(shù)據(jù)的時(shí)候,節(jié)點(diǎn)與數(shù)據(jù)源之間,節(jié)點(diǎn)與節(jié)點(diǎn)之間,都會(huì)有占有較大的I/O使用率,數(shù)據(jù)傳輸之間必須滿足必要的安全性。對(duì)于保密要求較高的數(shù)據(jù),更要建立全面的數(shù)據(jù)保護(hù)措施,以防數(shù)據(jù)泄露。
(2)傳輸可靠性。隨著計(jì)算存儲(chǔ)設(shè)備和數(shù)據(jù)傳輸通道的不斷升級(jí),數(shù)據(jù)的傳輸速度和效率逐漸提高。在獲取數(shù)據(jù)源的時(shí)候,數(shù)據(jù)管道必須提供一個(gè)可靠的傳輸,以達(dá)到至少交付一次的保證。
(3)網(wǎng)絡(luò)自適應(yīng)性。用戶(hù)和分析設(shè)備可以根據(jù)自身的需求,適應(yīng)數(shù)據(jù)傳輸?shù)姆?wù),最大化對(duì)接數(shù)據(jù)格式,達(dá)到良好的對(duì)接效果。
2、主流傳輸方法
目前在大數(shù)據(jù)的廣泛應(yīng)用中,Kafka、Logstash、Sqoop等都是傳輸數(shù)據(jù)的重要途徑,這里簡(jiǎn)要介紹傳輸原理。
2.1Kafka
Kafka最初由Linkedin公司開(kāi)發(fā),是一個(gè)分布式、分區(qū)的、多副本的、多訂閱者,基于zookeeper協(xié)調(diào)的分布式日志系統(tǒng),常見(jiàn)可以用于web/nginx日志、訪問(wèn)日志,消息服務(wù)等等,Linkedin于2010年將該系統(tǒng)貢獻(xiàn)給了Apache基金會(huì)并成為頂級(jí)開(kāi)源項(xiàng)目。
Kafka主要設(shè)計(jì)特點(diǎn)如下:
- 以時(shí)間復(fù)雜度為O(1)的方式提供消息持久化能力,即使對(duì)TB級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間的訪問(wèn)性能。
- 高吞吐率。即使在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒100K條消息的傳輸。
- 支持Kafka Server間的消息分區(qū),及分布式消費(fèi),同時(shí)保證每個(gè)part內(nèi)的消息順序傳輸。
- 同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理。
- Scale out:支持在線水平擴(kuò)展。
圖1 kafka的架構(gòu)
圖1展示了一個(gè)典型的kafka集群的架構(gòu),每個(gè)集群中都包含若干個(gè)生產(chǎn)者(producer),這些生產(chǎn)者可以是來(lái)自數(shù)據(jù)采集設(shè)備的硬件數(shù)據(jù)源,亦可以是服務(wù)器產(chǎn)生的日志信息等等;每個(gè)集群中都有若干的服務(wù)代理(broker),每個(gè)服務(wù)代理一般安裝在一個(gè)節(jié)點(diǎn)服務(wù)器上,kafka支持平行擴(kuò)展,集群中服務(wù)代理的數(shù)量越多,吞吐量也會(huì)越高。生產(chǎn)者生產(chǎn)的數(shù)據(jù)可以向一個(gè)指定的topic中寫(xiě)入,消費(fèi)者可以根據(jù)自己的需求,向指定的topic中拉取數(shù)據(jù)。
為了進(jìn)一步提高數(shù)據(jù)傳輸?shù)耐掏侣剩琸afka將每個(gè)topic分為若干個(gè)part,每個(gè)part下面都會(huì)存儲(chǔ)對(duì)應(yīng)的數(shù)據(jù)和索引文件。當(dāng)創(chuàng)建topic時(shí),可以指定part的數(shù)量,part數(shù)量越多,系統(tǒng)的吞吐量就會(huì)越大,但是也會(huì)占用更多的資源。kafka收到生產(chǎn)者發(fā)送的數(shù)據(jù)后,就跟根據(jù)一定的均衡策略,將數(shù)據(jù)存放到某一個(gè)part下,等待消費(fèi)者來(lái)消費(fèi)數(shù)據(jù)。
除此之外,kafka還為數(shù)據(jù)建立了副本,當(dāng)數(shù)據(jù)節(jié)點(diǎn)發(fā)生意外時(shí),其他的副本通過(guò)一定的機(jī)制擔(dān)起主part的作用,從而使系統(tǒng)具有高可用性。kafka提供了至少一次的交付保證,生產(chǎn)者發(fā)送數(shù)據(jù)到節(jié)點(diǎn),節(jié)點(diǎn)會(huì)反饋該消息是否存儲(chǔ),若未收到確認(rèn)信息,生產(chǎn)者則會(huì)重復(fù)發(fā)送該信息;同樣的,消費(fèi)者消費(fèi)數(shù)據(jù)發(fā)送收到的反饋,節(jié)點(diǎn)記錄被消費(fèi)的位置,下次消費(fèi)則從該位置開(kāi)始。這些機(jī)制都保證了至少一次的可靠交付。
在安全性方面,kafka使用了SSL或者SASL驗(yàn)證來(lái)自客戶(hù)端(生產(chǎn)者和消費(fèi)者)以及其他broker和工具到broker的鏈接身份,在傳輸?shù)倪^(guò)程中也可以選擇對(duì)數(shù)據(jù)進(jìn)行加密,對(duì)客戶(hù)端的讀寫(xiě)授權(quán),雖然可能會(huì)導(dǎo)致集群性能下降,但對(duì)于保密性較高的數(shù)據(jù)來(lái)說(shuō),是可以接受的。
2.2Logstash
Logstash 是免費(fèi)且開(kāi)放的服務(wù)器端數(shù)據(jù)處理管道,能夠從多個(gè)來(lái)源采集數(shù)據(jù),與此同時(shí)這根管道還可以讓你根據(jù)自己的需求在中間加上濾網(wǎng)轉(zhuǎn)換過(guò)濾數(shù)據(jù),然后將數(shù)據(jù)發(fā)送到用戶(hù)指定的數(shù)據(jù)庫(kù)中。
圖2 Logstash數(shù)據(jù)傳輸
圖3 Logstash 結(jié)構(gòu)
Logstash將數(shù)據(jù)流中每一條數(shù)據(jù)稱(chēng)之為一個(gè)event,處理流水線有三個(gè)主要角色完成:inputs –> filters –> outputs,原始數(shù)據(jù)進(jìn)入logstash后在內(nèi)部流轉(zhuǎn)并不是以原始數(shù)據(jù)的形式流轉(zhuǎn),在input處被轉(zhuǎn)換為event,在output event處被轉(zhuǎn)換為目標(biāo)格式的數(shù)據(jù)。
當(dāng)有一個(gè)輸入數(shù)據(jù)時(shí),input會(huì)從文件中取出數(shù)據(jù),然后通過(guò)json codec將數(shù)據(jù)轉(zhuǎn)換成logstash event。這條event會(huì)通過(guò)queue流入某一條pipline處理線程中,首先會(huì)存放在batcher中。當(dāng)batcher達(dá)到處理數(shù)據(jù)的條件(如一定時(shí)間或event一定規(guī)模)后,batcher會(huì)把數(shù)據(jù)發(fā)送到filter中,filter對(duì)event數(shù)據(jù)進(jìn)行處理后轉(zhuǎn)到output,output就把數(shù)據(jù)輸出到指定的輸出位置。輸出后還會(huì)返回ACK給queue,包含已經(jīng)處理的event,queue會(huì)將已處理的event進(jìn)行標(biāo)記。
假如 Logstash 節(jié)點(diǎn)發(fā)生故障,Logstash 會(huì)通過(guò)持久化隊(duì)列來(lái)保證至少將運(yùn)行中的事件送達(dá)一次。那些未被正常處理的消息會(huì)被送往死信隊(duì)列 (dead letter queue) 以便做進(jìn)一步處理。由于具備了這種吸收吞吐量的能力,無(wú)需采用額外的隊(duì)列層,Logstash 就能平穩(wěn)度過(guò)高峰期。此外,還能充分確保采集管道的安全性。
3、多源異構(gòu)數(shù)據(jù)傳輸設(shè)計(jì)
在數(shù)據(jù)不斷壯大的過(guò)程中,我們往往會(huì)根據(jù)自身的需求,收集不同類(lèi)型的數(shù)據(jù),存儲(chǔ)在不同的數(shù)據(jù)庫(kù)中,使用數(shù)據(jù)時(shí)也會(huì)從不同的數(shù)據(jù)源讀取數(shù)據(jù)進(jìn)行分析和處理。這些不同的存儲(chǔ)方式、不同的采集的系統(tǒng)、不同的數(shù)據(jù)格式,從簡(jiǎn)單的文件數(shù)據(jù)庫(kù)到復(fù)雜的網(wǎng)絡(luò)數(shù)據(jù)庫(kù),共同構(gòu)成了異構(gòu)數(shù)據(jù)源。為了將數(shù)據(jù)統(tǒng)一處理,根據(jù)可視化等現(xiàn)實(shí)需求,就需要將各個(gè)異構(gòu)數(shù)據(jù)源通過(guò)一個(gè)引擎銜接起來(lái),為數(shù)據(jù)的大批量處理和展示提供更為標(biāo)準(zhǔn)化的讀取方式。
目前,以異構(gòu)數(shù)據(jù)批處理為目標(biāo)的應(yīng)用有springbatch、kettle、datax等,他們各自有各自的特點(diǎn):
Springbatch是spring提供的一個(gè)輕量級(jí)、全面的批處理數(shù)據(jù)處理框架,無(wú)需用戶(hù)交互即可最有效地處理大量信息的自動(dòng)化,復(fù)雜處理,并且提供了可重用的功能,這些功能對(duì)于處理大量的數(shù)據(jù)至關(guān)重要。
Kettle是一款國(guó)外開(kāi)源的ETL工具,他可以通過(guò)Spoon來(lái)允許你運(yùn)行或者轉(zhuǎn)換任務(wù),支持從不同的數(shù)據(jù)源讀取、操作和寫(xiě)入數(shù)據(jù),在規(guī)定的時(shí)間間隔內(nèi)用批處理的模式自動(dòng)運(yùn)行。
Datax一個(gè)異構(gòu)數(shù)據(jù)源離線同步工具,致力于實(shí)現(xiàn)包括關(guān)系型數(shù)據(jù)庫(kù)(MySQL、Oracle等)、HDFS、hive、ODPS、HBase、FTP等各種異構(gòu)數(shù)據(jù)源之間穩(wěn)定高效的數(shù)據(jù)同步功能。
下面介紹一種輕量級(jí)的ETL工具,主要作用就是從不同源獲取數(shù)據(jù),然后做統(tǒng)一的處理,最后再寫(xiě)入各種目標(biāo)源。它基本特性是:
基于Springboot開(kāi)發(fā),輕量級(jí)別、快速、簡(jiǎn)單,入門(mén)門(mén)檻低
擴(kuò)展性強(qiáng),各個(gè)模塊均是獨(dú)立的,可以以插件的形式進(jìn)行開(kāi)發(fā)
可以通過(guò)UI界面來(lái)構(gòu)建任務(wù)并操作,總體監(jiān)控平臺(tái)的數(shù)據(jù)實(shí)時(shí)情況
基于Disruptor做緩沖,同時(shí)使用redis等內(nèi)存緩存,保證高速處理任務(wù)
該ETL工具將整個(gè)系統(tǒng)分為如下模塊:Input、Reader、Transport、Convert、Writer和Output,在系統(tǒng)上層已經(jīng)定義好各個(gè)模塊的接口,開(kāi)發(fā)者根據(jù)自己的需要個(gè)性化定義自己的模塊,只需繼承上層接口即可實(shí)現(xiàn)模塊的嵌入。系統(tǒng)運(yùn)行的簡(jiǎn)化基本流程如圖4所示。
圖4 ETL工具運(yùn)行簡(jiǎn)化流程
這里所有的模塊都有一定的標(biāo)準(zhǔn)來(lái)接入系統(tǒng),然后使用各數(shù)據(jù)源提供的API來(lái)讀寫(xiě)數(shù)據(jù),例如輸入可以從文件讀取、mysql、hbase、hdfs、kafka、http等,輸出同樣支持這些數(shù)據(jù)源,最終解決異構(gòu)數(shù)據(jù)源相互傳輸數(shù)據(jù)不兼容的問(wèn)題。
系統(tǒng)在應(yīng)對(duì)緩沖和讀寫(xiě)速度上均設(shè)置可選的策略,可以基于java的調(diào)度器,綜合當(dāng)前輸入輸出的任務(wù)數(shù)量,來(lái)調(diào)整輸入輸出線程池以及線程的數(shù)量,以使數(shù)據(jù)的傳輸達(dá)到最大的性能。
4、總 結(jié)
現(xiàn)在數(shù)據(jù)采集的設(shè)備無(wú)處不在,在各種格式的數(shù)據(jù)匯入不同數(shù)據(jù)倉(cāng)庫(kù)、數(shù)據(jù)倉(cāng)庫(kù)之間互相接入數(shù)據(jù)都需要一個(gè)高效、可靠、安全的數(shù)據(jù)通道,本文介紹了大數(shù)據(jù)傳輸?shù)囊恍┍尘爸R(shí),同時(shí)簡(jiǎn)要描述了當(dāng)前主流數(shù)據(jù)傳輸工具的應(yīng)用和個(gè)性化異構(gòu)數(shù)據(jù)引擎的設(shè)計(jì)問(wèn)題。本文參考了一些文獻(xiàn)和網(wǎng)絡(luò)資源,對(duì)他們的觀點(diǎn)和技術(shù)對(duì)本文的貢獻(xiàn)表示感謝。
參考文獻(xiàn)[1] https://www.cnblogs.com/qingyunzong/p/9004509.html
[2] https://blog.csdn.net/chenleiking/article/details/73563930[3]https://gitee.com/starblues/rope/wikis/pages?sort_id=1863419&doc_id=507971