自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Spark入門:實現(xiàn)WordCount的3種方式

大數(shù)據(jù) Spark
當我們學(xué)習(xí)一門新的語言,HelloWorld通常是我們寫的第一個程序。而WordCount基本上是我們學(xué)習(xí)MapReduce思想與編程的第一個程序,無論是Hadoop的MR或者是Spark的RDD操作學(xué)習(xí)。

[[170182]]

WordCount作為Spark的入門任務(wù),可以很簡單,也可以做到比較復(fù)雜。 本文從實現(xiàn)功能的角度提出了3種實現(xiàn)方式,至于性能影響,會在后文繼續(xù)討論。

注意: 本文使用的Spark版本還是1.6.1.如果讀者您已經(jīng)切換到2.0+版本,請參考GitHub spark的官方例子進行學(xué)習(xí)。 因為2.0版本的API與1.X 并不能完全兼容,特別是2.0開始使用了SparkSession的概念,而不是SparkContext!

***種方式:mapToPair + reduceByKey

這是官方提供的實現(xiàn)方式,應(yīng)該也是網(wǎng)上能找到的最多的例子。

官網(wǎng)地址: http://spark.apache.org/examples.html

核心代碼:

  1. JavaRDD<String> textFile = sc.textFile("hdfs://..."); 
  2.  
  3. JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() { 
  4.  
  5. public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); } 
  6.  
  7. }); 
  8.  
  9. JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { 
  10.  
  11. public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } 
  12.  
  13. }); 
  14.  
  15. JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<IntegerIntegerInteger>() { 
  16.  
  17. public Integer call(Integer a, Integer b) { return a + b; } 
  18.  
  19. }); 
  20.  
  21. counts.saveAsTextFile("hdfs://..."); 

總結(jié)上面的步驟:

  1. flatmap : 將一整段文字映射成一個字符串數(shù)組
  2. mapToPair: 將word 映射成 (word, 1)
  3. reduceByKey: 按照key進行g(shù)roup and plus的操作, 得到最終結(jié)果
  4. collect: 這是Action,上面3個都是Transformation

第二種方式:使用countByValue代替mapToPair + reduceByKey

核心代碼:

  1. JavaRDD<String> textFile = sc.textFile("hdfs://..."); 
  2.  
  3. JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() { 
  4.  
  5. public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); } 
  6.  
  7. }); 
  8.  
  9. Map<String, Long> counts = words.countByValue(); 

讀文件、flatmap這兩步都是完全一樣的,但是后面直接一個countByValue就搞定了,并且還直接collect到本地了,是不是感覺這一種實現(xiàn)方式更簡潔了呢?

至于性能,一般來說這種方式還不錯,但是這種方式有一些缺點,參考StackOverFlow的描述:

網(wǎng)址: http://stackoverflow.com/questions/25318153/spark-rdd-aggregate-vs-rdd-reducebykey

countByValue would be the fastest way to do this, however its implementation uses hash maps and merges them so if you have a large amount of data this approach may not scale well (especially when you consider how many issues spark already has with memory). You may want to use the standard way of counting in map reduce which would be to map the line and 1 as pairs then reduceBykey like this:

簡單的說,這種方式是使用hash的方式進行merge。 如果處理的數(shù)據(jù)量比較大的時候,效果可能不怎么好。

注意: 這種方式的性能筆者確實還沒有親自實踐過!

第三種方式:AggregateByKey

AggregateByKey 這個方法,可以看做是reduceByKey的增強版,因為reduceByKey的輸出類型與輸入類型要求是完全一致的。比如wordcount 之中的輸入是Tuple2<String, Integer> 輸出也同樣要求是Tuple2<String,Integer>. 但是AggregateByKey的輸出類型可以是不一樣的數(shù)據(jù)類型。 參考下面的代碼:

  1. val keysWithValuesList = Array("foo=A""foo=A""foo=A""foo=A""foo=B""bar=C""bar=D""bar=D"
  2.  
  3. val data = sc.parallelize(keysWithValuesList) 
  4.  
  5. //Create key value pairs 
  6.  
  7. val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache() 
  8.  
  9. val initialCount = 0; 
  10.  
  11. val addToCounts = (n: Int, v: String) => n + 1 
  12.  
  13. val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2 
  14.  
  15. val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts) 

輸出:

  1. Aggregate By Key sum Results 
  2.  
  3. bar -> 3 
  4.  
  5. foo -> 5 

可以看到,輸入是<String, String> 而輸出變成了<String, Integer>

注意: 這種方法,并不是處理WordCount的***的選擇,只是說明我們可以使用AggregateByKey這種方式來實現(xiàn)相同的功能

其實還有另外一種實現(xiàn)方式: 使用DataFrame。 但是這種方式需要前期的準備比較多,即如何將數(shù)據(jù)處理并喂給DataFrame。

一般來說,DataFrame的效率相比其他的RDD的實現(xiàn)方式要高不少,如果在前期準備工作上面難度不是太大的話,非常推薦使用DataFrame的方式。

責(zé)任編輯:武曉燕 來源: FlyML
相關(guān)推薦

2009-07-02 14:42:55

ExtJS Grid

2020-02-18 20:00:31

PostgreSQL數(shù)據(jù)庫

2020-02-10 15:50:18

Spring循環(huán)依賴Java

2017-09-05 10:20:15

2015-05-04 10:20:25

2019-01-31 08:15:38

物聯(lián)網(wǎng)農(nóng)業(yè)IoT

2022-08-05 08:27:05

分布式系統(tǒng)線程并發(fā)

2010-03-12 17:52:35

Python輸入方式

2021-04-01 06:01:10

嵌入式開發(fā)應(yīng)用程序開發(fā)技術(shù)

2021-07-19 05:48:30

springboot 攔截器項目

2010-08-13 13:25:53

Flex頁面跳轉(zhuǎn)

2014-12-31 17:42:47

LBSAndroid地圖

2021-06-24 08:52:19

單點登錄代碼前端

2021-11-05 21:33:28

Redis數(shù)據(jù)高并發(fā)

2017-08-14 10:30:13

SparkSpark Strea擴容

2010-06-21 16:39:10

Linux備份

2010-07-14 10:30:26

Perl多線程

2015-04-02 16:54:52

災(zāi)難恢復(fù)VDI災(zāi)難恢復(fù)

2015-04-13 11:39:26

VDI災(zāi)難恢復(fù)

2023-09-07 19:14:05

點贊
收藏

51CTO技術(shù)棧公眾號