用Hadoop和MapReduce進行大數(shù)據(jù)分析
如此大規(guī)模的數(shù)據(jù)一度僅限于大企業(yè)、學(xué)校和政府機構(gòu) — 這些機構(gòu)有能力購買昂貴的超級計算機、能夠雇用員工保障其運行。今天,由于存儲成本的降低和處理能力的商品化,一些小公司,甚至個人都可以存儲和挖掘同樣的數(shù)據(jù),推動新一輪的應(yīng)用程序創(chuàng)新。
大數(shù)據(jù)革命技術(shù)之一是MapReduce,一個編程模式,是Google針對大規(guī)模、分布式數(shù)據(jù)而開發(fā)的。在本文中,我將介紹Apache的開源MapReduce實現(xiàn)、Hadoop,也有人將其稱之為云計算的殺手應(yīng)用程序。
關(guān)于Hadoop
Apache的Hadoop框架本質(zhì)上是一個用于分析大數(shù)據(jù)集的機制,不一定位于數(shù)據(jù)存儲中。Hadoop提取出了MapReduce的大規(guī)模數(shù)據(jù)分析引擎,更易于開發(fā)人員理解。Hadoop可以擴展到無數(shù)個節(jié)點,可以處理所有活動和相關(guān)數(shù)據(jù)存儲的協(xié)調(diào)。
Hadoop的眾多特性和配置使其成為一個十分有用且功能強大的框架,其用途和功能令人驚訝。Yahoo!以及其他許多組織已經(jīng)找到了一個高效機制來分析成堆的字節(jié)數(shù)。在單個節(jié)點上運行Hadoop也很容易;您所需要的只是一些需要分析的數(shù)據(jù),以及熟悉一般的Java代碼。Hadoop也可和 uby、Python以及C++一起使用。
作為處理大數(shù)據(jù)集的概念框架,MapReduce對于使用許多計算機來解決分布式問題而言是高度優(yōu)化的。顧名思義,這個框架由兩個函數(shù)構(gòu)成。map 函數(shù)專用于獲取大數(shù)據(jù)輸入,并將其分成小片段,然后交由其他進程進行操作。reduce函數(shù)整理map收集的各個回應(yīng),然后顯示最后的輸出。
在Hadoop中,您可以通過擴展Hadoop自身的基類來定義map和reduce實現(xiàn)。實現(xiàn)和輸入輸出格式被一個指定它們的配置聯(lián)系在一起。 Hadoop非常適合處理包含結(jié)構(gòu)數(shù)據(jù)的大型文件。Hadoop可以對輸入文件進行原始解析,這一點特別有用,這樣您就可以每次處理一行。定義一個map 函數(shù)實際上只是一個關(guān)于確定您從即將輸入的文本行中捕獲什么內(nèi)容的問題。
數(shù)據(jù),無處不在的數(shù)據(jù)!
美國政府產(chǎn)生大量數(shù)據(jù),只有一部分是普通民眾所感興趣的。各種政府機構(gòu)免費發(fā)布關(guān)于US經(jīng)濟健康狀況和更改社會人口統(tǒng)計資料的數(shù)據(jù)。U.S. Geological Survey (USGS)發(fā)布國內(nèi)外地震數(shù)據(jù)。
世界各地每天都有很多個小型地震發(fā)生。其中大多數(shù)發(fā)生在地殼深處,沒有人能感覺到,盡管如此,但是監(jiān)聽站仍然會進行記錄。USGS以CSV(或逗號分隔值)文件的格式發(fā)布每周地震數(shù)據(jù)。
每周文件平均不是很大 — 只有大約100KB左右。但是,它可以作為學(xué)習(xí)Hadoop的基礎(chǔ)。記住,Hadoop有能力處理更 大的數(shù)據(jù)集。
跟蹤震動
我近期從USGS網(wǎng)站下載的CSV文件有大約920多行。如 清單 1 所示:
清單 1.一個USGS地震數(shù)據(jù)文件的行數(shù)統(tǒng)計
- $> wc -l eqs7day-M1.txt
- 920 eqs7day-M1.txt
CVS文件內(nèi)容如清單2所示(這是前兩行):
清單 2. CVS文件的前兩行
- $> head -n 2 eqs7day-M1.txt
- Src,Eqid,Version,Datetime,Lat,Lon,Magnitude,Depth,NST,Region
- ci,14896484,2,"Sunday, December 12, 2010 23:23:20 UTC",33.3040,-116.4130,1.0,11.70,22,
- "Southern California"
這就是我稱之為信息豐富 的文件,尤其是當(dāng)您想到它總共有920行記錄時。然而我只想知道在該文件報告的這一周內(nèi)每一天有多少次地震發(fā)生。我想知道在這7天內(nèi)哪個區(qū)域是地震頻發(fā)區(qū)。
我第一個想到的就是使用簡單的grep命令來搜索每天的地震數(shù)。看看這個文件,我發(fā)現(xiàn)數(shù)據(jù)記錄是從12月12開始的。因此我對該字符串執(zhí)行了一次grep-c,其結(jié)果如清單3所示:
清單 3.12月12有多少次地震發(fā)生?
- $> grep -c 'December 12' eqs7day-M1.txt
- 98
安裝Hadoop如果您之前沒有安裝Hadoop,那么現(xiàn)在就裝。第一步,下載最新版二進制文件,解壓,然后在您的路徑上設(shè)置Hadoop的bin目錄。完成這些您就可以直接執(zhí)行hadoop命令了。使用Hadoop要求您執(zhí)行它的hadoop命令,而不是像您所見到的那樣調(diào)用java命令。您可以向 hadoop命令傳選項,諸如在哪里可以找到您的Java二進制文件(例如,表示您的map和reduce實現(xiàn))。在我的示例中,我創(chuàng)建了一個jar文件,告訴Hadoop我想在我的jar文件內(nèi)運行哪個任務(wù)。我也向Hadoop類路徑添加了一些運行我的應(yīng)用程序所需的附加二進制文件。
現(xiàn)在,我知道在12月12日有98條記錄,也就是說有98次地震。我只能沿著這條記錄向下,對12月10日的記錄執(zhí)行一次grep,接著是 11 號,等等。這聽起來有點乏味。更糟糕的是,我還需要知道在該文件中的是哪幾天。我確實不關(guān)心這些,甚至有時候我可能無法獲取該信息。事實上,我只想知道在七天這樣一個時間段內(nèi)任何一天的地震次數(shù),使用Hadoop我就可以很容易的獲取這一信息。
Hadoop只需要幾條信息就可以回答我的第一個和第二個問題:即,要處理哪條輸入以及如何處理map和reduce。我也必須提供了一個可以將每件事都聯(lián)系起來的作業(yè)。在我開始處理這些代碼之前,我需要花點時間確定我的CSV數(shù)據(jù)整齊有序。
使用opencsv進行數(shù)據(jù)解析
除了地震CSV文件的第一行之外,第一行是文件頭,每一行都是一系列逗號分隔數(shù)據(jù)值。我只對數(shù)據(jù)的3個部分感興趣:日期、地點和震級。為了獲取這些資料,我將使用一個很棒的開源庫opencsv,它將會幫助我分析CSV文件。
作為一個測試優(yōu)先的工具,我首先編寫一個快捷JUnit測試,確認我可以從CSV文件的一個樣例行獲取的我所需要的信息,如清單 4 所示:
清單 4. 解析一個CSV行
- public class CSVProcessingTest {
- private final String LINE = "ci,14897012,2,\"Monday, December 13, 2010 " +
- "14:10:32 UTC\",33.0290,-115." +
- "5388,1.9,15.70,41,\"Southern California\"";
- @Test
- public void testReadingOneLine() throws Exception {
- String[] lines = new CSVParser().parseLine(LINE);
- assertEquals("should be Monday, December 13, 2010 14:10:32 UTC",
- "Monday, December 13, 2010 14:10:32 UTC", lines[3]);
- assertEquals("should be Southern California",
- "Southern California", lines[9]);
- assertEquals("should be 1.9", "1.9", lines[6]);
- }
- }
正如您在清單4中所看到的,opencsv處理逗號分隔值非常容易。該解析器僅返回一組String,所以有可能獲取位置信息(別忘了,在 Java語言中數(shù)組和集合的訪問是從零開始的)。
轉(zhuǎn)換日期格式
當(dāng)使用MapReduce進行處理時,map函數(shù)的任務(wù)是選擇一些要處理的值,以及一些鍵。這就是說,map主要處理和返回兩個元素:一個鍵和一個值?;氐轿抑暗男枨螅沂紫认胫烂刻鞎l(fā)生多少次地震。因此,當(dāng)我在分析地震文件時,我將發(fā)布兩個值:鍵是日期,值是一個計數(shù)器。reduce函數(shù)將對計數(shù)器(只是一些值為1的整數(shù))進行總計。因此,提供給我的是在目標(biāo)地震文件中某一個日期出現(xiàn)的次數(shù)。
由于我只對24小時時段內(nèi)的信息感興趣,我得剔除每個文件中的日期的時間部分。在 清單5中,我編寫了一個快速測試,驗證如何將一個傳入文件中的特定日期信息轉(zhuǎn)換成一個更一般的24小時日期:
清單 5.日期格式轉(zhuǎn)換
- @Test
- public void testParsingDate() throws Exception {
- String datest = "Monday, December 13, 2010 14:10:32 UTC";
- SimpleDateFormat formatter = new SimpleDateFormat("EEEEE, MMMMM dd, yyyy HH:mm:ss Z");
- Date dt = formatter.parse(datest);
- formatter.applyPattern("dd-MM-yyyy");
- String dtstr = formatter.format(dt);
- assertEquals("should be 13-12-2010", "13-12-2010", dtstr);
- }
在清單5中,我使用了SimpleDateFormat Java對象,將CSV文件中格式為Monday, December 13, 2010 14:10:32 UTC的日期String轉(zhuǎn)換成了更一般的13-12-2010。
Hadoop的map和reduce
現(xiàn)在我已經(jīng)找到了處理CSV文件以及其日期格式的解決方法。我要開始在Hadoop中實施我的map和reduce函數(shù)了。這個過程需要理解 Java 泛型,因為 Hadoop 選擇使用顯式類型,為了安全起見。
當(dāng)我使用 Hadoop 定義一個映射實現(xiàn)時,我只擴展Hadoop的Mapper類。然后我可以使用泛型來為傳出鍵和值指定顯式類。類型子句也指定了傳入鍵和值,這對于讀取文件分別是字節(jié)數(shù)和文本行數(shù)。
EarthQuakesPerDateMapper 類擴展了Hadoop的Mapper對象。它顯式地將其輸出鍵指定為一個Text對象,將其值指定為一個IntWritable,這是一個Hadoop特定類,實質(zhì)上是一個整數(shù)。還要注意,class子句的前兩個類型是LongWritable和Text,分別是字節(jié)數(shù)和文本行數(shù)。
由于類定義中的類型子句,我將傳入map方法的參數(shù)類型設(shè)置為在context.write子句內(nèi)帶有該方法的輸出。如果我想指定其他內(nèi)容,將會出現(xiàn)一個編譯器問題,或Hadoop將輸出一個錯誤消息,描述類型不匹配的消息。
清單 6.一個映射實現(xiàn)清單 6 中的map實現(xiàn)比較簡單:本質(zhì)上是,Hadoop為在輸入文件中找到的每一行文本調(diào)用這個類。為了避免除了CSV頭部,首先檢查是否字節(jié)數(shù)(key 對象)為零。然后執(zhí)行清單4和5中的步驟:捕獲傳入日期,進行轉(zhuǎn)換,然后設(shè)置為傳出鍵。我也提供了一個數(shù):1。就是說,我為每個日期編寫一個計數(shù)器,當(dāng) reduce實現(xiàn)被調(diào)用時,獲取一個鍵和一系列值。在本例中,鍵是日期及其值,如 清單7所示:
清單 7.一個 map 輸出和 reduce 輸入的邏輯視圖
以下是引用片段: |
注意,context.write(new Text(dtstr), new IntWritable(1))(在清單6中)構(gòu)建了如 清單7所示的邏輯集合。正如您所了解的,context是一個保存各種信息的Hadoop數(shù)據(jù)結(jié)構(gòu)。context被傳遞到reduce實現(xiàn),reduce獲取這些值為1的值然后總和起來。因此,一個 reduce 實現(xiàn)邏輯上創(chuàng)建如 清單8所示的數(shù)據(jù)結(jié)構(gòu):
清單 8.一個reduce輸出視圖
以下是引用片段: |
我的reduce實現(xiàn)如 清單9所示。與Hadoop的Mapper一樣,Reducer被參數(shù)化了:前兩個參數(shù)是傳入的鍵類型(Text)和值類型(IntWritable),后兩個參數(shù)是輸出類型:鍵和值,這在本例中是相同的。
清單 9.reduce實現(xiàn)
- public class EarthQuakesPerDateReducer extends Reducer<Text, IntWritable, Text,
- IntWritable> {
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values, Context context)
- throws IOException, InterruptedException {
- int count = 0;
- for (IntWritable value : values) {
- count++;
- }
- context.write(key, new IntWritable(count));
- }
- }
我的reduce實現(xiàn)非常簡單。正如我在清單7中所指出的,傳入的是實際上是一個值的集合,在本例中是1的集合,我所做的就是將它們加起來,然后寫出一個新鍵值對表示日期和次數(shù)。我的 reduce 代碼可以挑出您在清單8中所見到的這幾行。邏輯流程看起來像這樣:
以下是引用片段: |
當(dāng)然,這個清單的抽象形式是map -> reduce。
定義一個Hadoop Job
現(xiàn)在我已經(jīng)對我的map和reduce實現(xiàn)進行了編碼,接下來所要做的是將所有這一切鏈接到一個Hadoop Job。定義一個Job比較簡單:您需要提供輸入和輸出、map和reduce實現(xiàn)(如清單6和清單9所示)以及輸出類型。在本例中我的輸出類型和 reduce 實現(xiàn)所用的是同一個類型。
清單 10. 一個將map和redece綁在一起的Job
- public class EarthQuakesPerDayJob {
- public static void main(String[] args) throws Throwable {
- Job job = new Job();
- job.setJarByClass(EarthQuakesPerDayJob.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- job.setMapperClass(EarthQuakesPerDateMapper.class);
- job.setReducerClass(EarthQuakesPerDateReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
在清單10中,我使用一個main方法將所有這一切綁在一起,該方法有兩個參數(shù):地震CSV文件的目錄,以及生成報告的輸出目錄(Hadoop 更喜歡創(chuàng)建該目錄)。
為了執(zhí)行這個小框架,我需要將這些類打包。我還需要告知 Hadoop 在哪里可以找到opencsv二進制文件。然后可以通過命令行執(zhí)行Hadoop,如 清單11所示:
清單 11.執(zhí)行 Hadoop
- $> export HADOOP_CLASSPATH=lib/opencsv-2.2.jar
- $> hadoop jar target/quake.jar com.b50.hadoop.quake.EarthQuakesPerDayJob
- ~/temp/mreduce/in/ ~/temp/mreduce/out
運行這些代碼,Hadoop開始運行時您將可以看到一堆文本在屏幕上一閃而過。我所用的CSV文件相比專門用于處理這種情況的Hadoop,那真是小巫見大巫!hadoop應(yīng)該可以在幾秒鐘內(nèi)完成,具體取決于您的處理功能。
完成這些后,您可以使用任何編輯器查看輸出文件內(nèi)容。還可以選擇直接使用hadoop命令。正如 清單12所示:
清單 12.讀取Hadoop輸出
以下是引用片段: |
如果您像我一樣,在清單12中首先會注意到的就是每天地震數(shù)—12月9日就有178次地震。希望您也會注意到Hadoop實現(xiàn)了我所想要的:整齊地列出我的研究范圍內(nèi)每天的地震次數(shù)。
編寫另一個Mapper
接下來,我想找到地震發(fā)生在哪里,以及如何快速計算出在我的研究范圍內(nèi)記錄地震次數(shù)最多的是哪個區(qū)域。當(dāng)然,您已經(jīng)猜到了,Hadoop可以輕松地做到。在這個案例中,鍵不再是日期而是區(qū)域。因此,我編寫了一個新的Mapper類。
清單 13.一個新的map實現(xiàn)
- public class EarthQuakeLocationMapper extends Mapper<LongWritable, Text, Text,
- IntWritable> {
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException,
- InterruptedException {
- if (key.get() > 0) {
- String[] lines = new CSVParser().parseLine(value.toString());
- context.write(new Text(lines[9]), new IntWritable(1));
- }
- }
- }
和之前獲取日期然后進行轉(zhuǎn)換相比,在清單13中我所作的是獲取位置,這是CSV陣列中的最后一個條目。
相比一個龐大的位置和數(shù)字列表,我將結(jié)果限制在那些7天內(nèi)出現(xiàn)10次的區(qū)域。
清單 14.哪里的地震較多?
- public class EarthQuakeLocationReducer extends
- Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void
- reduce(Text key, Iterable<IntWritable> values, Context context) throws
- IOException, InterruptedException { int count = 0; for (IntWritable value :
- values) { count++; } if (count >= 10) { context.write(key, new
- IntWritable(count)); } } }
清單14中的代碼和清單9中的代碼非常類似;然而,在本例中,我限制了輸出大于或等于10。接下來,我將map和reduce,以及其他 Job 實現(xiàn)綁在一起,進行打包,然后和平常一樣執(zhí)行Hadoop獲取我的新答案。
使用hadoop dfs目錄顯示我所請求的新值:
清單 15.地震區(qū)域分布
以下是引用片段: |
從清單15還可以得到什么?首先,北美洲西海岸,從墨西哥到阿拉斯加是地震高發(fā)區(qū)。其次,阿肯色州明顯位于斷帶層上,這是我沒有意識到的。最后,如果您居住在北部或者是南加州(很多軟件開發(fā)人員都居住于此),您周圍的地方每隔 13 分鐘會震動一次。
結(jié)束語
使用Hadoop分析數(shù)據(jù)輕松且高效,對于它對數(shù)據(jù)分析所提供的支持,我只是了解皮毛而已。Hadoop的設(shè)計旨在以一種分布式方式運行,處理運行 map和reduce的各個節(jié)點之間的協(xié)調(diào)性。作為示例,本文中我只在一個JVM上運行Hadoop,該JVM僅有一個無足輕重的文件。
Hadoop本身是一個功能強大的工具,圍繞它還有一個完整的、不斷擴展的生態(tài)系統(tǒng),可以提供子項目至基于云計算的Hadoop服務(wù)。Hadoop生態(tài)系統(tǒng)演示了項目背后豐富的社區(qū)活動。來自社區(qū)的許多工具證實了大數(shù)據(jù)分析作為一個全球業(yè)務(wù)活動的可行性。有了Hadoop,分布式數(shù)據(jù)挖掘和分析對所有軟件創(chuàng)新者和企業(yè)家都是可用的,包括但不限于Google和Yahoo!這類大企業(yè)。
【編輯推薦】