自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Uber永久定位系統(tǒng)實(shí)時(shí)數(shù)據(jù)分析過程實(shí)踐!

大數(shù)據(jù)
本文,我們將討論在數(shù)據(jù)處理管道中使用Spark Structured Streaming對(duì)Uber事件數(shù)據(jù)進(jìn)行聚類分析,以檢測(cè)和可視化用戶位置實(shí)踐。(注:本文所用數(shù)據(jù)并非Uber內(nèi)部實(shí)際用戶數(shù)據(jù),文末附具體代碼或者示例獲取渠道)

根據(jù)Gartner所言,到2020年,每個(gè)智慧城市將使用約13.9億輛聯(lián)網(wǎng)汽車,這些汽車配備物聯(lián)網(wǎng)傳感器和其他設(shè)備。城市中的車輛定位和行為模式分析將有助于優(yōu)化流量,更好的規(guī)劃決策和進(jìn)行更智能的廣告投放。例如,對(duì)GPS汽車數(shù)據(jù)分析可以允許城市基于實(shí)時(shí)交通信息來優(yōu)化交通流量。電信公司正在使用移動(dòng)電話定位數(shù)據(jù),識(shí)別和預(yù)測(cè)城市人口的位置活動(dòng)趨勢(shì)和生存區(qū)域。

本文,我們將討論在數(shù)據(jù)處理管道中使用Spark Structured Streaming對(duì)Uber事件數(shù)據(jù)進(jìn)行聚類分析,以檢測(cè)和可視化用戶位置實(shí)踐。(注:本文所用數(shù)據(jù)并非Uber內(nèi)部實(shí)際用戶數(shù)據(jù),文末附具體代碼或者示例獲取渠道)

首先,我們回顧幾個(gè)結(jié)構(gòu)化流媒體涉及的概念,然后探討端到端用例:

使用MapR-ES發(fā)布/訂閱事件流

MapR-ES是一個(gè)分布式發(fā)布/訂閱事件流系統(tǒng),讓生產(chǎn)者和消費(fèi)者能夠通過Apache Kafka API以并行和容錯(cuò)方式實(shí)時(shí)交換事件。

流表示從生產(chǎn)者到消費(fèi)者的連續(xù)事件序列,其中事件被定義為鍵值對(duì)。

 

topic是一個(gè)邏輯事件流,將事件按類別區(qū)分,并將生產(chǎn)者與消費(fèi)者分離。topic按吞吐量和可伸縮性進(jìn)行分區(qū),MapR-ES可以擴(kuò)展到非常高的吞吐量級(jí)別,使用普通硬件可以輕松實(shí)現(xiàn)每秒傳輸數(shù)百萬條消息。

 

你可以將分區(qū)視為事件日志:將新事件附加到末尾,并為其分配一個(gè)稱為偏移的順序ID號(hào)。

 


與隊(duì)列一樣,事件按接收順序傳遞。

 

但是,與隊(duì)列不同,消息在讀取時(shí)不會(huì)被刪除,它們保留在其他消費(fèi)者可用分區(qū)。消息一旦發(fā)布,就不可變且永久保留。

 

讀取消息時(shí)不刪除消息保證了大規(guī)模讀取時(shí)的高性能,滿足不同消費(fèi)者針對(duì)不同目的(例如具有多語(yǔ)言持久性的多個(gè)視圖)處理相同消息的需求。

 


Spark數(shù)據(jù)集,DataFrame,SQL

Spark數(shù)據(jù)集是分布在集群多個(gè)節(jié)點(diǎn)上類對(duì)象的分布式集合,可以使用map,flatMap,filter或Spark SQL來操縱數(shù)據(jù)集。DataFrame是Row對(duì)象的數(shù)據(jù)集,表示包含行和列的數(shù)據(jù)表。

 


Spark結(jié)構(gòu)化流

結(jié)構(gòu)化流是一種基于Spark SQL引擎的可擴(kuò)展、可容錯(cuò)的流處理引擎。通過Structured Streaming,你可以將發(fā)布到Kafka的數(shù)據(jù)視為無界DataFrame,并使用與批處理相同的DataFrame,Dataset和SQL API處理此數(shù)據(jù)。

 

隨著流數(shù)據(jù)的不斷傳播,Spark SQL引擎會(huì)逐步持續(xù)處理并更新最終結(jié)果。

 

事件的流處理對(duì)實(shí)時(shí)ETL、過濾、轉(zhuǎn)換、創(chuàng)建計(jì)數(shù)器、聚合、關(guān)聯(lián)值、豐富其他數(shù)據(jù)源或機(jī)器學(xué)習(xí)、持久化文件或數(shù)據(jù)庫(kù)以及發(fā)布到管道的不同topic非常有用。

 


Spark結(jié)構(gòu)化流示例代碼

下面是Uber事件數(shù)據(jù)聚類分析用例的數(shù)據(jù)處理管道,用于檢測(cè)位置。

 


使用Kafka API將行車位置數(shù)據(jù)發(fā)布到MapR-ES topic

訂閱該topic的Spark Streaming應(yīng)用程序:

  • 輸入U(xiǎn)ber行車數(shù)據(jù)流;
  • 使用已部署的機(jī)器學(xué)習(xí)模型、集群ID和位置豐富行程數(shù)據(jù);

在MapR-DB JSON中存儲(chǔ)轉(zhuǎn)換和豐富數(shù)據(jù)。

 

用例數(shù)據(jù)示例

示例數(shù)據(jù)集是Uber旅行數(shù)據(jù),傳入數(shù)據(jù)是CSV格式,下面顯示了一個(gè)示例,topic依次為:

日期/時(shí)間,緯度,經(jīng)度,位置(base),反向時(shí)間戳

2014-08-06T05:29:00.000-07:00,40.7276,-74.0033,B02682,9223370505593280605

我們使用集群ID和位置豐富此數(shù)據(jù),然后將其轉(zhuǎn)換為以下JSON對(duì)象:

 

  1.  
  2. "_id":0_922337050559328,  
  3. "dt":"2014-08-01 08:51:00" 
  4. "lat":40.6858,  
  5. "lon":-73.9923,  
  6. "base":"B02682" 
  7. "cid":0,  
  8. "clat":40.67462874550765,  
  9. "clon":-73.98667466026531  

 

加載K-Means模型

Spark KMeansModel類用于加載k-means模型,該模型安裝在歷史uber行程數(shù)據(jù)上,然后保存到MapR-XD集群。接下來,創(chuàng)建集群中心ID和位置數(shù)據(jù)集,以便稍后與Uber旅行位置連接。

 

集群中心下方顯示在Zeppelin notebook中的Google地圖上:

 

從Kafka的topic中讀取數(shù)據(jù)

為了從Kafka讀取,我們必須首先指定流格式,topic和偏移選項(xiàng)。有關(guān)配置參數(shù)的詳細(xì)信息,請(qǐng)參閱MapR Streams文檔。

 

這將返回具有以下架構(gòu)的DataFrame:

 

下一步是將二進(jìn)制值列解析并轉(zhuǎn)換為Uber對(duì)象的數(shù)據(jù)集。

將消息值解析為Uber對(duì)象的數(shù)據(jù)集

Scala Uber案例類定義與CSV記錄對(duì)應(yīng)的架構(gòu),parseUber函數(shù)將逗號(hào)分隔值字符串解析為Uber對(duì)象。

 

在下面的代碼中,我們使用parseUber函數(shù)注冊(cè)一個(gè)用戶自定義函數(shù)(UDF)來反序列化消息值字符串。我們?cè)趲в衐f1列值的String Cast的select表達(dá)式中使用UDF,該值返回Uber對(duì)象的DataFrame。

 


使用集群中心ID和位置豐富的Uber對(duì)象數(shù)據(jù)集

VectorAssembler用于轉(zhuǎn)換并返回一個(gè)新的DataFrame,其中包含向量列中的緯度和經(jīng)度要素列。

 

 

k-means模型用于通過模型轉(zhuǎn)換方法從特征中獲取聚類,該方法返回具有聚類ID(標(biāo)記為預(yù)測(cè))的DataFrame。生成的數(shù)據(jù)集與先前創(chuàng)建的集群中心數(shù)據(jù)集(ccdf)連接,以創(chuàng)建UberC對(duì)象的數(shù)據(jù)集,其中包含與集群中心ID和位置相結(jié)合的行程信息。

 

 

最后的數(shù)據(jù)集轉(zhuǎn)換是將唯一ID添加到對(duì)象以存儲(chǔ)在MapR-DB JSON中。createUberwId函數(shù)創(chuàng)建一個(gè)唯一的ID,包含集群ID和反向時(shí)間戳。由于MapR-DB按id對(duì)行進(jìn)行分區(qū)和排序,因此行將按簇的ID新舊時(shí)間進(jìn)行排序。 此函數(shù)與map一起使用以創(chuàng)建UberwId對(duì)象的數(shù)據(jù)集。

 

 


寫入內(nèi)存接收器

 

接下來,為了進(jìn)行調(diào)試,我們可以開始接收數(shù)據(jù)并將數(shù)據(jù)作為內(nèi)存表存儲(chǔ)在內(nèi)存中,然后進(jìn)行查詢。

 

以下是來自 %sqlselect * from uber limit 10 的示例輸出:

 

現(xiàn)在我們可以查詢流數(shù)據(jù),詢問哪段時(shí)間和集群內(nèi)的搭乘次數(shù)最多?(輸出顯示在Zeppelin notebook中)

  1. %sql 

SELECT hour(uber.dt) as hr,cid, count(cid) as ct FROM uber group By hour(uber.dt), cid

 

Spark Streaming寫入MapR-DB

 

用于Apache Spark的MapR-DB連接器使用戶可以將MapR-DB用作Spark結(jié)構(gòu)化流或Spark Streaming的接收器。

 

當(dāng)你處理大量流數(shù)據(jù)時(shí),其中一個(gè)挑戰(zhàn)是存儲(chǔ)位置。對(duì)于此應(yīng)用程序,可以選擇MapR-DB JSON(一種高性能NoSQL數(shù)據(jù)庫(kù)),因?yàn)樗哂蠮SON的可伸縮性和靈活易用性。

JSON模式的靈活性

MapR-DB支持JSON文檔作為本機(jī)數(shù)據(jù)存儲(chǔ)。MapR-DB使用JSON文檔輕松存儲(chǔ),查詢和構(gòu)建應(yīng)用程序。Spark連接器可以輕松地在JSON數(shù)據(jù)和MapR-DB之間構(gòu)建實(shí)時(shí)或批處理管道,并在管道中利用Spark。

 

使用MapR-DB,表按集群的鍵范圍自動(dòng)分區(qū),提供可擴(kuò)展行和快速讀寫能力。在此用例中,行鍵_id由集群ID和反向時(shí)間戳組成,因此表將自動(dòng)分區(qū),并按最新的集群ID進(jìn)行排序。

 

Spark MapR-DB Connector利用Spark DataSource API。連接器體系結(jié)構(gòu)在每個(gè)Spark Executor中都有一個(gè)連接對(duì)象,允許使用MapR-DB(分區(qū))進(jìn)行分布式并行寫入,讀取或掃描。

 

寫入MapR-DB接收器

要將Spark Stream寫入MapR-DB,請(qǐng)使用tablePath,idFieldPath,createTable,bulkMode和sampleSize參數(shù)指定格式。以下示例將cdf DataFrame寫到MapR-DB并啟動(dòng)流。

 

 

使用Spark SQL查詢MapR-DB JSON

Spark MapR-DB Connector允許用戶使用Spark數(shù)據(jù)集在MapR-DB之上執(zhí)行復(fù)雜的SQL查詢和更新,同時(shí)應(yīng)用投影和過濾器下推,自定義分區(qū)和數(shù)據(jù)位置等關(guān)鍵技術(shù)。

 

將數(shù)據(jù)從MapR-DB加載到Spark數(shù)據(jù)集中

要將MapR-DB JSON表中的數(shù)據(jù)加載到Apache Spark數(shù)據(jù)集,我們可在SparkSession對(duì)象上調(diào)用loadFromMapRDB方法,提供tableName,schema和case類。這將返回UberwId對(duì)象的數(shù)據(jù)集:

 

 

使用Spark SQL探索和查詢Uber數(shù)據(jù)

現(xiàn)在,我們可以查詢連續(xù)流入MapR-DB的數(shù)據(jù),使用Spark DataFrames特定于域的語(yǔ)言或使用Spark SQL來詢問。

顯示第一行(注意行如何按_id分區(qū)和排序,_id由集群ID和反向時(shí)間戳組成,反向時(shí)間戳首先排序最近的行)。

  1. df.show 

 

每個(gè)集群發(fā)生多少次搭乘?

  1. df.groupBy("cid").count().orderBy(desc"count")).show 

 

或者使用Spark SQL:

  1. %sql SELECT COUNT(cid), cid FROM uber GROUP BY cid ORDER BY COUNT(cid) DESC 

 

使用Zeppelin notebook中的Angular和Google Maps腳本,我們可以在地圖上顯示集群中心標(biāo)記和最新的5000個(gè)旅行的位置,如下可看出最受歡迎的位置,比如位于曼哈頓的0、3、9。

 

集群0最高搭乘次數(shù)出現(xiàn)在哪個(gè)小時(shí)?

 

  1. df.filter($"\_id" <= "1" 
  2. .select(hour($"dt").alias("hour"), $"cid" 
  3. .groupBy("hour","cid").agg(count("cid" 
  4. .alias("count"))show 

 

一天中的哪個(gè)小時(shí)和哪個(gè)集群的搭乘次數(shù)最多?

  1. %sql SELECT hour(uber.dt), cid, count(cid) FROM uber GROUP BY hour(uber.dt), cid 

 

按日期時(shí)間顯示uber行程的集群計(jì)數(shù)

  1. %sql select cid, dt, count(cid) as count from uber group by dt, cid order by dt, cid limit 100 

 

總結(jié)

本文涉及的知識(shí)點(diǎn)有Spark結(jié)構(gòu)化流應(yīng)用程序中的Spark Machine Learning模型、Spark結(jié)構(gòu)化流與MapR-ES使用Kafka API攝取消息、SparkStructured Streaming持久化保存到MapR-DB,以持續(xù)快速地進(jìn)行SQL分析等。此外,上述討論過的用例體系結(jié)構(gòu)所有組件都可與MapR數(shù)據(jù)平臺(tái)在同一集群上運(yùn)行。

 

代碼:

你可以從此處下載代碼和數(shù)據(jù)以運(yùn)行這些示例:https://github.com/caroljmcdonald/mapr-spark-structuredstreaming-uber

機(jī)器學(xué)習(xí)notebook的Zeppelin查看器:https://www.zepl.com/viewer/github/caroljmcdonald/mapr-spark-structuredstreaming-uber/blob/master/notebooks/SparkUberML.json

Spark結(jié)構(gòu)化流notebook的Zeppelin查看器:https://www.zepl.com/viewer/github/caroljmcdonald/mapr-spark-structuredstreaming-uber/blob/master/notebooks/SparkUberStructuredStreaming.json

SparkSQL notebook的Zenpelin查看器:https://www.zepl.com/viewer/github/caroljmcdonald/mapr-spark-structuredstreaming-uber/blob/master/notebooks/SparkUberSQLMapR-DB.json

此代碼包含在MapR 6.0.1沙箱上運(yùn)行的說明,這是一個(gè)獨(dú)立的VM以及教程和演示應(yīng)用程序,可讓用戶快速使用MapR和Spark。

責(zé)任編輯:未麗燕 來源: it168網(wǎng)站
相關(guān)推薦

2014-01-07 09:25:31

Amazon云計(jì)算Kinesis

2022-07-26 11:42:31

科大訊飛Flink數(shù)據(jù)倉(cāng)庫(kù)

2024-02-19 00:06:06

數(shù)據(jù)分析系統(tǒng)Doris

2012-08-24 08:51:27

IBMdW

2012-08-28 10:52:58

IBMdW

2023-10-13 07:25:50

2021-10-25 17:57:08

數(shù)據(jù)技術(shù)架構(gòu)

2021-07-22 18:29:58

AI

2023-08-29 10:20:00

2022-05-23 13:30:48

數(shù)據(jù)胡實(shí)踐

2023-05-25 08:24:46

Kafka大數(shù)據(jù)

2022-07-14 23:27:57

數(shù)據(jù)分析數(shù)據(jù)驅(qū)動(dòng)可變數(shù)據(jù)

2016-08-10 01:00:21

2016-04-22 17:05:30

2024-08-19 10:06:44

2019-08-19 14:24:39

數(shù)據(jù)分析Spark操作

2011-06-07 17:01:44

2023-10-24 12:21:58

人工智能邊緣計(jì)算

2019-06-19 16:01:14

Spark數(shù)據(jù)分析SparkSQL

2021-09-13 13:46:29

Apache HudiB 站數(shù)據(jù)湖
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)