有特點(diǎn)的流處理引擎NiFi
前面寫了flink的文章,其實(shí)流處理不止有flink、storm、spark streaming,說實(shí)話這些其實(shí)都是比較傳統(tǒng)的流處理框架。今天介紹一個(gè)大家不一定用得很多,但是卻很有特點(diǎn)的東西,NiFi。
NiFi的來源
Apache NiFi項(xiàng)目,它是一種實(shí)時(shí)數(shù)據(jù)流處理 系統(tǒng),在去年由美國安全局(NSA)開源并進(jìn)入Apache社區(qū),NiFi初始的項(xiàng)目名稱是Niagarafiles。當(dāng)NiFi項(xiàng)目開源之后,一些早先在NSA的開發(fā)者們創(chuàng)立了初創(chuàng)公司Onyara,Onyara隨之繼續(xù)NiFi項(xiàng)目的開發(fā)并提供相關(guān)的支持。Hortonworks公司收購了Onyara并將其開發(fā)者整合到自己的團(tuán)隊(duì)中,形成HDF(Hortonworks Data Flow)平臺。
NiFi的特點(diǎn)
下面是官方的一些關(guān)鍵能力介紹,可以認(rèn)真看看:
Apache NiFi supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. Some of the high-level capabilities and objectives of Apache NiFi include:
- Web-based user interface
- Seamless experience between design, control, feedback, and monitoring
- Highly configurable
- Loss tolerant vs guaranteed delivery
- Low latency vs high throughput
- Dynamic prioritization
- Flow can be modified at runtime
- Back pressure
- Data Provenance
- Track dataflow from beginning to end
- Designed for extension
- Build your own processors and more
- Enables rapid development and effective testing
- Secure
- SSL, SSH, HTTPS, encrypted content, etc...
- Multi-tenant authorization and internal authorization/policy management
總結(jié)來說,做為一個(gè)流處理引擎,NiFi的核心差異化能力主要有兩點(diǎn):
豐富的算子
整合了大量數(shù)據(jù)源的處理能力,詳細(xì)的可以登錄nifi官網(wǎng)
(https://nifi.apache.org/docs.html)詳細(xì)看各個(gè)算子的能力,下面列一列算子,讓大家有個(gè)感覺,,還是相當(dāng)豐富的。
Processors
- AttributeRollingWindow 1.3.0
- AttributesToJSON 1.3.0
- Base64EncodeContent 1.3.0
- CaptureChangeMySQL 1.3.0
- CompareFuzzyHash 1.3.0
- CompressContent 1.3.0
- ConnectWebSocket 1.3.0
- ConsumeAMQP 1.3.0
- ConsumeEWS 1.3.0
- ConsumeIMAP 1.3.0
- ConsumeJMS 1.3.0
- ConsumeKafka 1.3.0
- ConsumeKafka_0_10 1.3.0
- ConsumeKafkaRecord_0_10 1.3.0
- ConsumeMQTT 1.3.0
- ConsumePOP3 1.3.0
- ConsumeWindowsEventLog 1.3.0
- ControlRate 1.3.0
- ConvertAvroSchema 1.3.0
- ConvertAvroToJSON 1.3.0
- ConvertAvroToORC 1.3.0
- ConvertCharacterSet 1.3.0
- ConvertCSVToAvro 1.3.0
- ConvertExcelToCSVProcessor 1.3.0
- ConvertJSONToAvro 1.3.0
- ConvertJSONToSQL 1.3.0
- ConvertRecord 1.3.0
- CreateHadoopSequenceFile 1.3.0
- DebugFlow 1.3.0
- DeleteDynamoDB 1.3.0
- DeleteGCSObject 1.3.0
- DeleteHDFS 1.3.0
- DeleteS3Object 1.3.0
- DeleteSQS 1.3.0
- DetectDuplicate 1.3.0
- DistributeLoad 1.3.0
- DuplicateFlowFile 1.3.0
- EncryptContent 1.3.0
- EnforceOrder 1.3.0
- EvaluateJsonPath 1.3.0
- EvaluateXPath 1.3.0
- EvaluateXQuery 1.3.0
- ExecuteFlumeSink 1.3.0
- ExecuteFlumeSource 1.3.0
- ExecuteProcess 1.3.0
- ExecuteScript 1.3.0
- ExecuteSQL 1.3.0
- ExecuteStreamCommand 1.3.0
- ExtractAvroMetadata 1.3.0
- ExtractCCDAAttributes 1.3.0
- ExtractEmailAttachments 1.3.0
- ExtractEmailHeaders 1.3.0
- ExtractGrok 1.3.0
- ExtractHL7Attributes 1.3.0
- ExtractImageMetadata 1.3.0
- ExtractMediaMetadata 1.3.0
- ExtractText 1.3.0
- ExtractTNEFAttachments 1.3.0
- FetchAzureBlobStorage 1.3.0
- FetchDistributedMapCache 1.3.0
- FetchElasticsearch 1.3.0
- FetchElasticsearch5 1.3.0
- FetchElasticsearchHttp 1.3.0
- FetchFile 1.3.0
- FetchFTP 1.3.0
- FetchGCSObject 1.3.0
- FetchHBaseRow 1.3.0
- FetchHDFS 1.3.0
- FetchParquet 1.3.0
- FetchS3Object 1.3.0
- FetchSFTP 1.3.0
- FuzzyHashContent 1.3.0
- GenerateFlowFile 1.3.0
- GenerateTableFetch 1.3.0
- GeoEnrichIP 1.3.0
- GetAzureEventHub 1.3.0
- GetCouchbaseKey 1.3.0
- GetDynamoDB 1.3.0
- GetFile 1.3.0
- GetFTP 1.3.0
- GetHBase 1.3.0
- GetHDFS 1.3.0
- GetHDFSEvents 1.3.0
- GetHDFSSequenceFile 1.3.0
- GetHTMLElement 1.3.0
- GetHTTP 1.3.0
- GetIgniteCache 1.3.0
- GetJMSQueue 1.3.0
- GetJMSTopic 1.3.0
- GetKafka 1.3.0
- GetMongo 1.3.0
- GetSFTP 1.3.0
- GetSNMP 1.3.0
- GetSolr 1.3.0
- GetSplunk 1.3.0
- GetSQS 1.3.0
- GetTCP 1.3.0
- GetTwitter 1.3.0
- HandleHttpRequest 1.3.0
- HandleHttpResponse 1.3.0
- HashAttribute 1.3.0
- HashContent 1.3.0
- IdentifyMimeType 1.3.0
- InferAvroSchema 1.3.0
- InvokeHTTP 1.3.0
- InvokeScriptedProcessor 1.3.0
- ISPEnrichIP 1.3.0
- JoltTransformJSON 1.3.0
- ListAzureBlobStorage 1.3.0
- ListDatabaseTables 1.3.0
- ListenBeats 1.3.0
- ListenHTTP 1.3.0
- ListenLumberjack 1.3.0
- ListenRELP 1.3.0
- ListenSMTP 1.3.0
- ListenSyslog 1.3.0
- ListenTCP 1.3.0
- ListenUDP 1.3.0
- ListenWebSocket 1.3.0
- ListFile 1.3.0
- ListFTP 1.3.0
- ListGCSBucket 1.3.0
- ListHDFS 1.3.0
- ListS3 1.3.0
- ListSFTP 1.3.0
- LogAttribute 1.3.0
- LogMessage 1.3.0
- LookupAttribute 1.3.0
- LookupRecord 1.3.0
- MergeContent 1.3.0
- ModifyBytes 1.3.0
- ModifyHTMLElement 1.3.0
- MonitorActivity 1.3.0
- Notify 1.3.0
- ParseCEF 1.3.0
- ParseEvtx 1.3.0
- ParseSyslog 1.3.0
- PartitionRecord 1.3.0
- PostHTTP 1.3.0
- PublishAMQP 1.3.0
- PublishJMS 1.3.0
- PublishKafka 1.3.0
- PublishKafka_0_10 1.3.0
- PublishKafkaRecord_0_10 1.3.0
- PublishMQTT 1.3.0
- PutAzureBlobStorage 1.3.0
- PutAzureEventHub 1.3.0
- PutCassandraQL 1.3.0
- PutCloudWatchMetric 1.3.0
- PutCouchbaseKey 1.3.0
- PutDatabaseRecord 1.3.0
- PutDistributedMapCache 1.3.0
- PutDynamoDB 1.3.0
- PutElasticsearch 1.3.0
- PutElasticsearch5 1.3.0
- PutElasticsearchHttp 1.3.0
- PutElasticsearchHttpRecord 1.3.0
- PutEmail 1.3.0
- PutFile 1.3.0
- PutFTP 1.3.0
- PutGCSObject 1.3.0
- PutHBaseCell 1.3.0
- PutHBaseJSON 1.3.0
- PutHDFS 1.3.0
- PutHiveQL 1.3.0
- PutHiveStreaming 1.3.0
- PutHTMLElement 1.3.0
- PutIgniteCache 1.3.0
- PutJMS 1.3.0
- PutKafka 1.3.0
- PutKinesisFirehose 1.3.0
- PutKinesisStream 1.3.0
- PutLambda 1.3.0
- PutMongo 1.3.0
- PutParquet 1.3.0
- PutRiemann 1.3.0
- PutS3Object 1.3.0
- PutSFTP 1.3.0
- PutSlack 1.3.0
- PutSNS 1.3.0
- PutSolrContentStream 1.3.0
- PutSplunk 1.3.0
- PutSQL 1.3.0
- PutSQS 1.3.0
- PutSyslog 1.3.0
- PutTCP 1.3.0
- PutUDP 1.3.0
- PutWebSocket 1.3.0
- QueryCassandra 1.3.0
- QueryDatabaseTable 1.3.0
- QueryDNS 1.3.0
- QueryElasticsearchHttp 1.3.0
- QueryRecord 1.3.0
- QueryWhois 1.3.0
- ReplaceText 1.3.0
- ReplaceTextWithMapping 1.3.0
- ResizeImage 1.3.0
- RouteHL7 1.3.0
- RouteOnAttribute 1.3.0
- RouteOnContent 1.3.0
- RouteText 1.3.0
- ScanAttribute 1.3.0
- ScanContent 1.3.0
- ScrollElasticsearchHttp 1.3.0
- SegmentContent 1.3.0
- SelectHiveQL 1.3.0
- SetSNMP 1.3.0
- SplitAvro 1.3.0
- SplitContent 1.3.0
- SplitJson 1.3.0
- SplitRecord 1.3.0
- SplitText 1.3.0
- SplitXml 1.3.0
- SpringContextProcessor 1.3.0
- StoreInKiteDataset 1.3.0
- TailFile 1.3.0
- TransformXml 1.3.0
- UnpackContent 1.3.0
- UpdateAttribute 1.3.0
- UpdateCounter 1.3.0
- UpdateRecord 1.3.0
- ValidateCsv 1.3.0
- ValidateXml 1.3.0
- Wait 1.3.0
- YandexTranslate 1.3.0
Controller Services
- AvroReader 1.3.0
- AvroRecordSetWriter 1.3.0
- AvroSchemaRegistry 1.3.0
- AWSCredentialsProviderControllerService 1.3.0
- CouchbaseClusterService 1.3.0
- CSVReader 1.3.0
- CSVRecordSetWriter 1.3.0
- DBCPConnectionPool 1.3.0
- DistributedMapCacheClientService 1.3.0
- DistributedMapCacheServer 1.3.0
- DistributedSetCacheClientService 1.3.0
- DistributedSetCacheServer 1.3.0
- FreeFormTextRecordSetWriter 1.3.0
- GCPCredentialsControllerService 1.3.0
- GrokReader 1.3.0
- HBase_1_1_2_ClientMapCacheService 1.3.0
- HBase_1_1_2_ClientService 1.3.0
- HiveConnectionPool 1.3.0
- HortonworksSchemaRegistry 1.3.0
- IPLookupService 1.3.0
- JettyWebSocketClient 1.3.0
- JettyWebSocketServer 1.3.0
- JMSConnectionFactoryProvider 1.3.0
- JsonPathReader 1.3.0
- JsonRecordSetWriter 1.3.0
- JsonTreeReader 1.3.0
- PropertiesFileLookupService 1.3.0
- ScriptedLookupService 1.3.0
- ScriptedReader 1.3.0
- ScriptedRecordSetWriter 1.3.0
- SimpleCsvFileLookupService 1.3.0
- SimpleKeyValueLookupService 1.3.0
- StandardHttpContextMap 1.3.0
- StandardSSLContextService 1.3.0
- XMLFileLookupService 1.3.0
Reporting Tasks
- AmbariReportingTask 1.3.0
- ControllerStatusReportingTask 1.3.0
- DataDogReportingTask 1.3.0
- MonitorDiskUsage 1.3.0
- MonitorMemory 1.3.0
- ScriptedReportingTask 1.3.0
- SiteToSiteBulletinReportingTask 1.3.0
- SiteToSiteProvenanceReportingTask 1.3.0
- SiteToSiteStatusReportingTask 1.3.0
- StandardGangliaReporter 1.3.0
優(yōu)雅的界面
就是我個(gè)人認(rèn)為非常不錯(cuò)的界面,可以看到非常詳細(xì)的數(shù)據(jù)流向。
NiFi在Hortonworks的定位
因?yàn)镹iFi可以對來自多種數(shù)據(jù)源的流數(shù)據(jù)進(jìn)行處理,Hortonworks認(rèn)為HDF平臺非常適合用于物聯(lián)網(wǎng) (IoAT)的數(shù)據(jù)處理。HDF中的數(shù)據(jù)流動(dòng)可以是多個(gè)方向,甚至是點(diǎn)對點(diǎn)的,用戶可以同收集到的數(shù)據(jù)流進(jìn)行交互,這種交互甚至可以延伸到數(shù)據(jù)源,比如一些傳感器或是設(shè)備。按照Hortonworks公司的說法,HDF產(chǎn)品是對HDP產(chǎn)品的補(bǔ)充,前者主要處理移動(dòng)中的數(shù)據(jù),而后者基于Hadoop技術(shù),主要負(fù)責(zé)從靜止的數(shù)據(jù)中獲取洞察??梢钥匆豢碒ortonworks官方宣傳對HDF的定位,已經(jīng)號稱是端到端流數(shù)據(jù)處理分析。
Hortonworks DataFlow (HDF) provides the only end-to-end platform that collects, curates, analyzes and acts on data in real-time, on-premises or in the cloud, with a drag-and-drop visual interface. HDF is an integrated solution with Apache Nifi/MiNifi, Apache Kafka, Apache Storm and Druid.
上圖是概要介紹HDF三大部分,The HDF streaming data analytics platform includes data Flow Management, Stream Processing, and Enterprise Services.Nifi是作為數(shù)據(jù)管理和接入,可以延伸部署到邊緣網(wǎng)關(guān)的重要能力。
結(jié)語
如果你的項(xiàng)目中也有同樣的對多數(shù)據(jù)源的處理訴求,NiFi是個(gè)不錯(cuò)的選擇。
【本文為51CTO專欄作者“大數(shù)據(jù)和云計(jì)算”的原創(chuàng)稿件,轉(zhuǎn)載請通過微信公眾號獲取聯(lián)系和授權(quán)】