Spark入門:實現(xiàn)WordCount的3種方式
WordCount作為Spark的入門任務(wù),可以很簡單,也可以做到比較復(fù)雜。 本文從實現(xiàn)功能的角度提出了3種實現(xiàn)方式,至于性能影響,會在后文繼續(xù)討論。
注意: 本文使用的Spark版本還是1.6.1.如果讀者您已經(jīng)切換到2.0+版本,請參考GitHub spark的官方例子進行學(xué)習(xí)。 因為2.0版本的API與1.X 并不能完全兼容,特別是2.0開始使用了SparkSession的概念,而不是SparkContext!
***種方式:mapToPair + reduceByKey
這是官方提供的實現(xiàn)方式,應(yīng)該也是網(wǎng)上能找到的最多的例子。
官網(wǎng)地址: http://spark.apache.org/examples.html
核心代碼:
- JavaRDD<String> textFile = sc.textFile("hdfs://...");
- JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() {
- public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); }
- });
- JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
- public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); }
- });
- JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
- public Integer call(Integer a, Integer b) { return a + b; }
- });
- counts.saveAsTextFile("hdfs://...");
總結(jié)上面的步驟:
- flatmap : 將一整段文字映射成一個字符串數(shù)組
- mapToPair: 將word 映射成 (word, 1)
- reduceByKey: 按照key進行g(shù)roup and plus的操作, 得到最終結(jié)果
- collect: 這是Action,上面3個都是Transformation
第二種方式:使用countByValue代替mapToPair + reduceByKey
核心代碼:
- JavaRDD<String> textFile = sc.textFile("hdfs://...");
- JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() {
- public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); }
- });
- Map<String, Long> counts = words.countByValue();
讀文件、flatmap這兩步都是完全一樣的,但是后面直接一個countByValue就搞定了,并且還直接collect到本地了,是不是感覺這一種實現(xiàn)方式更簡潔了呢?
至于性能,一般來說這種方式還不錯,但是這種方式有一些缺點,參考StackOverFlow的描述:
網(wǎng)址: http://stackoverflow.com/questions/25318153/spark-rdd-aggregate-vs-rdd-reducebykey
countByValue would be the fastest way to do this, however its implementation uses hash maps and merges them so if you have a large amount of data this approach may not scale well (especially when you consider how many issues spark already has with memory). You may want to use the standard way of counting in map reduce which would be to map the line and 1 as pairs then reduceBykey like this:
簡單的說,這種方式是使用hash的方式進行merge。 如果處理的數(shù)據(jù)量比較大的時候,效果可能不怎么好。
注意: 這種方式的性能筆者確實還沒有親自實踐過!
第三種方式:AggregateByKey
AggregateByKey 這個方法,可以看做是reduceByKey的增強版,因為reduceByKey的輸出類型與輸入類型要求是完全一致的。比如wordcount 之中的輸入是Tuple2<String, Integer> 輸出也同樣要求是Tuple2<String,Integer>. 但是AggregateByKey的輸出類型可以是不一樣的數(shù)據(jù)類型。 參考下面的代碼:
- val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
- val data = sc.parallelize(keysWithValuesList)
- //Create key value pairs
- val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
- val initialCount = 0;
- val addToCounts = (n: Int, v: String) => n + 1
- val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
- val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
輸出:
- Aggregate By Key sum Results
- bar -> 3
- foo -> 5
可以看到,輸入是<String, String> 而輸出變成了<String, Integer>
注意: 這種方法,并不是處理WordCount的***的選擇,只是說明我們可以使用AggregateByKey這種方式來實現(xiàn)相同的功能
其實還有另外一種實現(xiàn)方式: 使用DataFrame。 但是這種方式需要前期的準備比較多,即如何將數(shù)據(jù)處理并喂給DataFrame。
一般來說,DataFrame的效率相比其他的RDD的實現(xiàn)方式要高不少,如果在前期準備工作上面難度不是太大的話,非常推薦使用DataFrame的方式。