Twitter Storm進(jìn)階初步設(shè)置
本篇Blog是一個(gè)簡(jiǎn)單的Storm入門例子,目的讓讀者明白Storm是怎樣的運(yùn)行機(jī)制。以及后續(xù)會(huì)放出的幾篇Storm高級(jí)特性以及最終將Storm融入Hadoop 2.x的YARN中。目的讀者是已經(jīng)進(jìn)階大數(shù)據(jù)的Hadoop,Spark用戶,或者了解Storm想深入理解Storm的讀者用戶。
項(xiàng)目Pom(Storm jar沒有提交到Maven中央倉(cāng)庫(kù),需要在項(xiàng)目中加入下面的倉(cāng)庫(kù)地址):
- <repositories>
- <repository>
- <id>central</id>
- <name>Maven Repository Switchboard</name>
- <layout>default</layout>
- <url>http://maven.oschina.net/content/groups/public/</url>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- <repository>
- <id>clojars</id>
- <url>https://clojars.org/repo/</url>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- <releases>
- <enabled>true</enabled>
- </releases>
- </repository>
- </repositories>
- <dependencies>
- <dependency>
- <groupId>org.yaml</groupId>
- <artifactId>snakeyaml</artifactId>
- <version>1.13</version>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.3.3</version>
- </dependency>
- <dependency>
- <groupId>org.clojure</groupId>
- <artifactId>clojure</artifactId>
- <version>1.5.1</version>
- </dependency>
- <dependency>
- <groupId>storm</groupId>
- <artifactId>storm</artifactId>
- <version>0.9.0.1</version>
- </dependency>
- <dependency>
- <groupId>storm</groupId>
- <artifactId>libthrift7</artifactId>
- <version>0.7.0</version>
- </dependency>
- </dependencies>
下面是一個(gè)Storm的HelloWord的例子,代碼有刪減,熟悉Storm的讀者自然能把代碼組織成一個(gè)完整的例子。
- public static void main(String[] args) {
- Config conf = new Config();
- conf.put(Config.STORM_LOCAL_DIR, "/Volumes/Study/data/storm");
- conf.put(Config.STORM_CLUSTER_MODE, "local");
- //conf.put("storm.local.mode.zmq", "false");
- conf.put("storm.zookeeper.root", "/storm");
- conf.put("storm.zookeeper.session.timeout", 50000);
- conf.put("storm.zookeeper.servers", "nowledgedata-n15");
- conf.put("storm.zookeeper.port", 2181);
- //conf.setDebug(true);
- //conf.setNumWorkers(2);
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("words", new TestWordSpout(), 2);
- builder.setBolt("exclaim2", new DefaultStringBolt(), 5)
- .shuffleGrouping("words");
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, builder.createTopology());
- }
Config.STORM_LOCAL_DIR是配置一個(gè)本地路徑,Storm會(huì)在這個(gè)路徑寫入一些配置信息和臨時(shí)數(shù)據(jù)。
Config.STORM_CLUSTER_MODE是運(yùn)行模式,local和distributed兩個(gè)選項(xiàng),即本地模式和分布式模式。本地模式在運(yùn)行時(shí)時(shí)多線程模擬的,開發(fā)測(cè)試用;分布式模式在分布式集群下是多進(jìn)程的,真正的分布式。
Storm的Spout和Blot高可用是通過ZooKeeper協(xié)調(diào)的,storm.zookeeper.root是一個(gè)ZooKeeper地址,并且有對(duì)應(yīng)的端口號(hào)
Debug是測(cè)試模式,有更詳細(xì)的日志信息。
TestWordSpout是一個(gè)Storm自帶的例子,用來隨機(jī)的產(chǎn)生new String[] {“nathan”, “mike”, “jackson”, “golda”, “bertels”};列表中的字符串,用來提供數(shù)據(jù)源。
其中DefaultStringBolt的源碼:
- OutputCollector collector;
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
- public void execute(Tuple tuple) {
- log.info("rev a message: " + tuple.getString(0));
- collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
- collector.ack(tuple);
- }
運(yùn)行日志:
- 10658 [Thread-29-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
- 10658 [Thread-31-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
- 10758 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
- 10758 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
- 10859 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
- 10859 [Thread-29-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels
- 10961 [Thread-31-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
- 10961 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
- 11061 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
- 11062 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
- 11162 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels
- 11163 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
數(shù)據(jù)由一個(gè)Storm叫做噴嘴(Spout,也相當(dāng)一個(gè)水龍頭,能產(chǎn)生數(shù)據(jù)的來源端)產(chǎn)生,然后傳遞給后端一連串的的Blot,最終被轉(zhuǎn)換和消費(fèi)。而Spout和Blot都是并行的,并行度都可以自己設(shè)置(本地運(yùn)行是靠多線程模擬的)。如:
- builder.setSpout("words", new TestWordSpout(), 2);
- builder.setBolt("exclaim2", new DefaultStringBolt(), 5)
噴嘴TestWordSpout的并行度是2,DefaultStringBolt的并行度是5.
從日志可以看出,數(shù)據(jù)經(jīng)過噴嘴到達(dá)預(yù)先定于的一個(gè)Blot,打印了日志。我測(cè)試代碼設(shè)置的并行度是5,日志中統(tǒng)計(jì),確實(shí)是5個(gè)線程:
- Thread-29-exclaim2
- Thread-31-exclaim2
- Thread-26-exclaim2
- Thread-33-exclaim2
- Thread-35-exclaim2
關(guān)于Storm是是什么?這里有詳細(xì)的介紹。
借用OSC網(wǎng)友的話說,Hadoop就是商場(chǎng)里自動(dòng)升降式的電梯,用戶需要排隊(duì)等待,選按樓層,然后到達(dá);而Storm就像是自動(dòng)扶梯,扶梯預(yù)先設(shè)置好運(yùn)行后,來人就立即運(yùn)走,目的地是明確的。
Storm按我的理解,Storm和Hadoop是完全不同的,設(shè)計(jì)上也沒有半點(diǎn)擬合的部分。Storm更像是我之前介紹過的Spring Integration,是一個(gè)數(shù)據(jù)流系統(tǒng)。它能把數(shù)據(jù)按照預(yù)設(shè)定的流程,把數(shù)據(jù)做各種轉(zhuǎn)換,傳遞,分解,合并,***數(shù)據(jù)到達(dá)后端存儲(chǔ)。只不過Storm是可以分布式,而且分布式的能力也是可以自己設(shè)置。
Storm的這種特性很適合大數(shù)據(jù)類的ETL系統(tǒng)開發(fā)。