自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

使用Spark Streaming SQL進行PV/UV統(tǒng)計

大數(shù)據(jù) Spark
使用Spark Streaming SQL,并結(jié)合Redis可以很方便進行PV/UV的統(tǒng)計。本文將介紹通過Streaming SQL消費Loghub中存儲的用戶訪問信息,對過去1分鐘內(nèi)的數(shù)據(jù)進行PV/UV統(tǒng)計,將結(jié)果存入Redis中。

1.背景介紹

PV/UV統(tǒng)計是流式分析一個常見的場景。通過PV可以對訪問的網(wǎng)站做流量或熱點分析,例如廣告主可以通過PV值預(yù)估投放廣告網(wǎng)頁所帶來的流量以及廣告收入。另外一些場景需要對訪問的用戶作分析,比如分析用戶的網(wǎng)頁點擊行為,此時就需要對UV做統(tǒng)計。

使用Spark Streaming SQL,并結(jié)合Redis可以很方便進行PV/UV的統(tǒng)計。本文將介紹通過Streaming SQL消費Loghub中存儲的用戶訪問信息,對過去1分鐘內(nèi)的數(shù)據(jù)進行PV/UV統(tǒng)計,將結(jié)果存入Redis中。

2.準(zhǔn)備工作

  • 創(chuàng)建E-MapReduce 3.23.0以上版本的Hadoop集群。
  • 下載并編譯E-MapReduce-SDK包

 

  1. git clone git@github.com:aliyun/aliyun-emapreduce-sdk.git 
  2. cd aliyun-emapreduce-sdk 
  3. git checkout -b master-2.x origin/master-2.x 
  4. mvn clean package -DskipTests 

編譯完后, assembly/target目錄下會生成emr-datasources_shaded_${version}.jar,其中${version}為sdk的版本。

數(shù)據(jù)源

本文采用Loghub作為數(shù)據(jù)源,有關(guān)日志采集、日志解析請參考日志服務(wù)。

3.統(tǒng)計PV/UV

一般場景下需要將統(tǒng)計出的PV/UV以及相應(yīng)的統(tǒng)計時間存入Redis。其他一些業(yè)務(wù)場景中,也會只保存最新結(jié)果,用新的結(jié)果不斷覆蓋更新舊的數(shù)據(jù)。以下首先介紹第一種情況的操作流程。

3.1啟動客戶端

命令行啟動streaming-sql客戶端

  1. streaming-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-${version}.jar --driver-class-path emr-datasources_shaded_2.11-${version}.jar 

也可以創(chuàng)建SQL語句文件,通過streaming-sql -f的方式運行。

3.1定義數(shù)據(jù)表

數(shù)據(jù)源表定義如下

 

  1. CREATE TABLE loghub_source(user_ip STRING, __time__ TIMESTAMP)  
  2. USING loghub  
  3. OPTIONS( 
  4. sls.project=${sls.project}, 
  5. sls.store=${sls.store}, 
  6. access.key.id=${access.key.id}, 
  7. access.key.secret=${access.key.secret}, 
  8. endpoint=${endpoint}); 

其中,數(shù)據(jù)源表包含user_ip和__time__兩個字段,分別代表用戶的IP地址和loghub上的時間列。OPTIONS中配置項的值根據(jù)實際配置。

結(jié)果表定義如下

 

  1. CREATE TABLE redis_sink  
  2. USING redis  
  3. OPTIONS( 
  4. table='statistic_info'
  5. host=${redis_host}, 
  6. key.column='interval'); 

其中,statistic_info為Redis存儲結(jié)果的表名,interval對應(yīng)統(tǒng)計結(jié)果中的interval字段;配置項${redis_host}的值根據(jù)實際配置。

3.2創(chuàng)建流作業(yè)

 

  1. CREATE SCAN loghub_scan 
  2. ON loghub_source 
  3. USING STREAM 
  4. OPTIONS( 
  5. watermark.column='__time__'
  6. watermark.delayThreshold='10 second'); 
  7.  
  8. CREATE STREAM job 
  9. OPTIONS( 
  10. checkpointLocation=${checkpoint_location}) 
  11. INSERT INTO redis_sink 
  12. SELECT COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval 
  13. FROM loghub_scan 
  14. GROUP BY TUMBLING(__time__, interval 1 minute), window; 

4.3查看統(tǒng)計結(jié)果

最終的統(tǒng)計結(jié)果如下圖所示

使用Spark Streaming SQL進行PV/UV統(tǒng)計

可以看到,每隔一分鐘都會生成一條數(shù)據(jù),key的形式為表名:interval,value為pv和uv的值。

3.4實現(xiàn)覆蓋更新

將結(jié)果表的配置項key.column修改為一個固定的值,例如定義如下

 

  1. CREATE TABLE redis_sink 
  2. USING redis  
  3. OPTIONS( 
  4. table='statistic_info'
  5. host=${redis_host}, 
  6. key.column='statistic_type'); 

創(chuàng)建流作業(yè)的SQL改為

 

  1. CREATE STREAM job 
  2. OPTIONS( 
  3. checkpointLocation='/tmp/spark-test/checkpoint'
  4. INSERT INTO redis_sink 
  5. SELECT "PV_UV" as statistic_type,COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval 
  6. FROM loghub_scan 
  7. GROUP BY TUMBLING(__time__, interval 1 minute), window; 

最終的統(tǒng)計結(jié)果如下圖所示

使用Spark Streaming SQL進行PV/UV統(tǒng)計

可以看到,Redis中值保留了一個值,這個值每分鐘都被更新,value包含pv、uv和interval的值。

4.總結(jié)

本文簡要介紹了使用Streaming SQL結(jié)合Redis實現(xiàn)流式處理中統(tǒng)計PV/UV的需求。后續(xù)文章,我將介紹Spark Streaming SQL的更多內(nèi)容。

責(zé)任編輯:未麗燕 來源: 阿里云棲社區(qū)
相關(guān)推薦

2025-03-05 08:40:00

RedisJava開發(fā)

2017-08-14 10:30:13

SparkSpark Strea擴容

2017-06-06 08:31:10

Spark Strea計算模型監(jiān)控

2021-11-01 13:11:45

FlinkPvUv

2016-10-16 13:48:54

多維分析 UVPV

2016-12-19 14:35:32

Spark Strea原理剖析數(shù)據(jù)

2021-06-03 08:10:30

SparkStream項目Uv

2017-10-13 10:36:33

SparkSpark-Strea關(guān)系

2021-08-20 16:37:42

SparkSpark Strea

2021-08-08 22:08:41

Redis開發(fā)網(wǎng)頁

2021-06-06 13:10:12

FlinkPvUv

2018-04-09 12:25:11

2016-01-28 10:11:30

Spark StreaSpark大數(shù)據(jù)平臺

2016-05-11 10:29:54

Spark Strea數(shù)據(jù)清理Spark

2023-10-24 20:32:40

大數(shù)據(jù)

2019-12-13 08:25:26

FlinkSpark Strea流數(shù)據(jù)

2017-09-26 09:35:22

2021-05-09 22:48:40

SQL數(shù)據(jù)庫變量

2015-08-26 11:12:11

數(shù)據(jù)溢出SQL注入SQL報錯注入

2017-10-11 11:10:02

Spark Strea大數(shù)據(jù)流式處理
點贊
收藏

51CTO技術(shù)棧公眾號