Spark入門:Spark Streaming 概覽
概覽
Spark Streaming是Spark API的一個(gè)可橫向擴(kuò)容,高吞吐量,容錯(cuò)的實(shí)時(shí)數(shù)據(jù)流處理引擎,Spark能夠從Kafka、Flume、Kinesis或者TCP等等輸入獲取數(shù)據(jù),然后能夠使用復(fù)雜的計(jì)算表達(dá)式如map,reduce,join和window對(duì)數(shù)據(jù)進(jìn)行計(jì)算。計(jì)算完后的數(shù)據(jù)能夠被推送到文件系統(tǒng),數(shù)據(jù)庫(kù),和實(shí)時(shí)的儀表盤。另外,你也可以使用Spark ML和圖計(jì)算處理實(shí)時(shí)數(shù)據(jù)流。
Spark Streaming接受到了實(shí)時(shí)數(shù)據(jù)后,把它們分批進(jìn)行切割,然后再交給Spark進(jìn)行數(shù)據(jù)的批量處理。
Spark Streaming對(duì)離散化的數(shù)據(jù)流提供了高級(jí)別的抽象DStream,所有進(jìn)入的數(shù)據(jù)流都會(huì)被處理為DStreams,在內(nèi)部,DStream是一個(gè)順序排列的RDD。
快速起步
***個(gè)實(shí)例是如何從TCP輸入中計(jì)算單詞出現(xiàn)的次數(shù)
首先,我們創(chuàng)建一個(gè)JavaStreamingContext對(duì)象,它是所有Streaming函數(shù)的主入口,再創(chuàng)建一個(gè)帶有2個(gè)線程的StreamingContext對(duì)象,每1秒進(jìn)行一次批處理。
- import org.apache.spark.*;
- import org.apache.spark.api.java.function.*;
- import org.apache.spark.streaming.*;
- import org.apache.spark.streaming.api.java.*;
- import scala.Tuple2;
- SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
- JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
創(chuàng)建一個(gè)偵聽本地9999的TCP數(shù)據(jù)源
- JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
我們把接受到的數(shù)據(jù)按照空格進(jìn)行切割
- JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
對(duì)單詞進(jìn)行統(tǒng)計(jì)
- JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
- JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
- wordCounts.print();
把字符串拍扁->映射->進(jìn)行去重統(tǒng)計(jì),***調(diào)用print函數(shù)把數(shù)據(jù)打印到控制臺(tái)中
- jssc.start(); // Start the computation
- jssc.awaitTermination(); // Wait for the computation to terminate
***,啟動(dòng)整個(gè)計(jì)算過(guò)程
為了完成這次實(shí)驗(yàn),還需要使用nc作為Server進(jìn)行配合
- nc -lk 9999
Spark提供了示例,可以使用 ./bin/run-example streaming.JavaNetworkWordCount localhost 9999 來(lái)體驗(yàn)WordCount