學(xué)習(xí)Spark——那些讓你精疲力盡的坑
這一個(gè)月我都干了些什么……
工作上,還是一如既往的寫(xiě)bug并不亦樂(lè)乎的修bug。學(xué)習(xí)上,最近看了一些非專(zhuān)業(yè)書(shū)籍,寫(xiě)點(diǎn)小感悟,我剛稍稍瞄了下,最近五篇居然都跟技術(shù)無(wú)關(guān),看來(lái)我與本行業(yè)已經(jīng)是漸行漸遠(yuǎn)了。
所以,趁著這篇博客,重拾自己,認(rèn)清自己,要時(shí)刻謹(jǐn)記我是一名碼農(nóng)。不過(guò),摸著良心說(shuō),最近的技術(shù)方面也是有所感悟和積累的,比如如何寫(xiě)好設(shè)計(jì)文檔,如何使用延時(shí)隊(duì)列,如何使用防刷技術(shù)等等。當(dāng)然了,今天我們還是沿著“學(xué)習(xí)Spark”這條路繼續(xù)走下去。
這篇就介紹下自己遇到的各種坑。我不知道各位是否遇到過(guò)并能輕松解決,反正我是被這些小問(wèn)題搞得精疲力盡,故在此總結(jié)以備忘。
1.1 Scala與Intellij集成報(bào)錯(cuò)
在Scala安裝成功后,準(zhǔn)備到Intellij上寫(xiě)Scala代碼,發(fā)現(xiàn)Scala都配好了(關(guān)于如何配置,網(wǎng)上資料很多),結(jié)果運(yùn)行Scala程序時(shí)報(bào)錯(cuò)。
錯(cuò)誤:
- Error:scalac: Multiple 'scala-library*.jar' files (scala-library.jar, scala-library.jar, scala-library.jar) in Scala compiler classpath in Scala SDK scala-sdk-2.12.2
解決方法:在OverStackflow上找到了思路。在Intellij中打開(kāi)project structure,刪除已有的Scala的路徑(我的Scala是安裝在/usr/local/Cellar/scala/2.12.2路徑下的),重新添加/usr/local/Cellar/scala/2.12.2/idea/lib目錄即可。
改動(dòng)前
改動(dòng)后
1.2 Scala語(yǔ)法Intellij不認(rèn)
在Intellij中寫(xiě)了一個(gè)Scala的HelloWorld,代碼如下
- /**
- * Created by jackie on 17/5/7.
- */
- package com.jackie.scala.s510
- object HelloWorld {
- def main(args: Array[String]): Unit = {
- println("hello world")
- println(increaseAnother(5));
- println(Array(1,2,3,4).map{(x:Int)=>x+1}.mkString(","));
- println(Array(1,2,3,4) map{(x:Int)=>x+1} mkString(","));
- println(Array(1,2,3,4) map{(x:Int)=>x+1} mkString(","));
- // test object
- var person = new Person()
- person.name_=("john") // name_=()對(duì)應(yīng)java中的setter方法
- println("Person name:" + person.name)
- person.name = "Jackie"
- println("Person name:" + person.name)
- var mp = new MyPerson()
- mp.name_("alihaha")
- println("MyPerson name:" + person.name)
- var pwp = new PersonWithParam("Jackie", 18)
- println("PersonWithParam:" + pwp.toString())
- }
- def increaseAnother(x: Int): Int = x + 1
- }
運(yùn)行的時(shí)候,報(bào)錯(cuò)mkString無(wú)法識(shí)別。
錯(cuò)誤:mkString can't be resolved
解決方法:需要交代下我各個(gè)環(huán)境的版本參數(shù),Intellij-14.0, jdk-8, scala-2.12.2。但是在Intellij中能選擇的Scala***版本只有2.11,所有后來(lái)將Intellij升級(jí)到2017.1版本,這時(shí)候還報(bào)錯(cuò)Error:scalac: Error: org.jetbrains.jps.incremental.scala.remote.ServerException,然后在Intellij中打開(kāi)project structure,將scala由2.12.2換成2.11.7,問(wèn)題解決。
1.3 Spark與Intellij集成的問(wèn)題
Spark環(huán)境都安裝好了,所以想在Intellij中運(yùn)行Spark程序,但是在添加了Spark的相關(guān)依賴(lài)后,發(fā)現(xiàn)無(wú)法編譯通過(guò)。
錯(cuò)誤:
- Exception NoSuchMethodError: com.google.common.collect.MapMaker.keyEquivalence
解決方法:實(shí)現(xiàn)聲明,之前在maven中一直引用的都是spark-core2.10,這時(shí)候報(bào)錯(cuò),我定位問(wèn)題出在Guava上,然后找到所有間接依賴(lài)了Guava的jar,都exclude,問(wèn)題還是沒(méi)有解決。期間添加了Spark的很多依賴(lài),試了都不行,***試了下Spark-core2.11,問(wèn)題解決(有的時(shí)候版本的兼容性真的很坑)。
1.4 hadoop上傳本地文件到HDFS
如果想將本地文件上傳到HDFS,使用hadoop fs -put localDir hdfsDir,前提是保證hadoop啟動(dòng)。
錯(cuò)誤:
- jackie@jackies-MacBook-Pro:~|⇒ hadoop fs -put ~/Documents/doc/README.md /
- 17/05/13 10:56:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- 17/05/13 10:56:40 WARN ipc.Client: Failed to connect to server: localhost/127.0.0.1:8020: try once and fail.
- java.net.ConnectException: Connection refused
- at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
- at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
- at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
- at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
- at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
- at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:681)
- at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:777)
- at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:409)
- at org.apache.hadoop.ipc.Client.getConnection(Client.java:1542)
- at org.apache.hadoop.ipc.Client.call(Client.java:1373)
- at org.apache.hadoop.ipc.Client.call(Client.java:1337)
- at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
- at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
- at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
- at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:787)
- at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
- at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
- at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
- at java.lang.reflect.Method.invoke(Method.java:498)
- at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:398)
- at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
- at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
- at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
- at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:335)
- at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
- at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1700)
- at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1436)
- at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1433)
- at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
- at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1433)
- at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:64)
- at org.apache.hadoop.fs.Globber.doGlob(Globber.java:282)
- at org.apache.hadoop.fs.Globber.glob(Globber.java:148)
- at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1685)
- at org.apache.hadoop.fs.shell.PathData.expandAsGlob(PathData.java:326)
- at org.apache.hadoop.fs.shell.CommandWithDestination.getRemoteDestination(CommandWithDestination.java:195)
- at org.apache.hadoop.fs.shell.CopyCommands$Put.processOptions(CopyCommands.java:256)
- at org.apache.hadoop.fs.shell.Command.run(Command.java:164)
- at org.apache.hadoop.fs.FsShell.run(FsShell.java:315)
- at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
- at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
- at org.apache.hadoop.fs.FsShell.main(FsShell.java:378)
- put: Call From jackies-macbook-pro.local/192.168.73.56 to localhost:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
解決方法:進(jìn)入hadoop安裝目錄(我的是/usr/local/Cellar/hadoop)進(jìn)入sbin下執(zhí)行./start-all.sh啟動(dòng)hadoop服務(wù)。
1.5 Spark啟動(dòng)
上篇在配置Spark時(shí)沒(méi)有配置spark-defaults.conf文件,所以在Spark安裝目錄下(我的是/usr/local/Spark)啟動(dòng)./start-all.sh出錯(cuò)。
錯(cuò)誤:
- spark-shell
- Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
- Setting default log level to "WARN".
- To adjust logging level use sc.setLogLevel(newLevel).
- 17/05/13 13:42:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- 17/05/13 13:42:51 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.73.56:7077
- org.apache.spark.SparkException: Exception thrown in awaitResult
- at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
- at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
- at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
- at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
- at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
- at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
- at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
- at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:88)
- at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:96)
- at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)
- at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
- at java.util.concurrent.FutureTask.run(FutureTask.java:266)
- at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
- at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
- at java.lang.Thread.run(Thread.java:745)
- Caused by: java.io.IOException: Failed to connect to /192.168.73.56:7077
解決方法:將Spark安裝目錄下的conf中的spark-defaults.conf.template拷貝一份出來(lái),重命名為spark-defaults.conf,按照https://sanwen8.cn/p/3bac5Bj.html配置好,再啟動(dòng)Spark,發(fā)現(xiàn)還是報(bào)錯(cuò)
- https://sanwen8.cn/p/3bac5Bj.html Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
- Setting default log level to "WARN".
- To adjust logging level use sc.setLogLevel(newLevel).
- 17/05/13 14:19:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- 17/05/13 14:19:15 ERROR SparkContext: Error initializing SparkContext.
- java.net.ConnectException: Call From jackies-MacBook-Pro.local/192.168.73.56 to 192.168.73.56:8021 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
- at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
于是按照StackOverflow,將spark-defaults.conf中的spark.eventLog.enabled由true改為false,之后再啟動(dòng)成功。
注意:這里我反復(fù)配置了localhost和自己的ip,來(lái)回切換,最終證明只要在/etc/hosts中配置好ip對(duì)應(yīng)映射的名稱(chēng),可以直接用名稱(chēng)即可,不用寫(xiě)ip,而且要保持hadoop中的配置文件和spark中的配置文件要一致,否則針對(duì)會(huì)精疲力盡。
1.6 將運(yùn)算任務(wù)交給Spark運(yùn)行的報(bào)錯(cuò)
運(yùn)行下面的一個(gè)Demo程序
- package com.jackie.scala.s513;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.FlatMapFunction;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFunction;
- import scala.Tuple2;
- import java.util.Arrays;
- import java.util.Iterator;
- import java.util.List;
- import java.util.regex.Pattern;
- /**
- * Created by jackie on 17/5/13.
- */
- public class Simple
- {
- private static final Pattern SPACE = Pattern.compile(" ");
- public static void main(String[] args) throws Exception {
- //創(chuàng)建一個(gè)RDD對(duì)象
- SparkConf conf=new SparkConf().setAppName("Simple").setMaster("local");
- //創(chuàng)建spark上下文對(duì)象,是數(shù)據(jù)的入口
- JavaSparkContext spark=new JavaSparkContext(conf);
- //獲取數(shù)據(jù)源
- JavaRDD<String> lines = spark.textFile("hdfs://jackie:8020/");
- /**
- * 對(duì)于從數(shù)據(jù)源得到的DStream,用戶(hù)可以在其基礎(chǔ)上進(jìn)行各種操作,
- * 對(duì)于當(dāng)前時(shí)間窗口內(nèi)從數(shù)據(jù)源得到的數(shù)據(jù)首先進(jìn)行分割,
- * 然后利用Map和ReduceByKey方法進(jìn)行計(jì)算,當(dāng)然***還有使用print()方法輸出結(jié)果;
- */
- JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String s) {
- return Arrays.asList(SPACE.split(s)).iterator();
- }
- });
- //使用RDD的map和reduce方法進(jìn)行計(jì)算
- JavaPairRDD<String, Integer> ones = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<String, Integer>(s, 1);
- }
- });
- JavaPairRDD<String, Integer> counts = ones.reduceByKey(
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
- List<Tuple2<String, Integer>> output = counts.collect();
- for (Tuple2<?,?> tuple : output) {
- //輸出計(jì)算結(jié)果
- System.out.println(tuple._1() + ": " + tuple._2());
- }
- spark.stop();
- }
- }
這個(gè)程序需要讀取HDFS上根目錄下的README.md文件,但是在此之前我執(zhí)行了"hadoop namenode -format"(注意,這個(gè)操作引起了后面的一系列問(wèn)題)。所以就準(zhǔn)備重新使用hadoop fs -put localDir hdfsDir上傳README.md,結(jié)果這時(shí)候報(bào)錯(cuò)
錯(cuò)誤:
- hadoop fs -put /Users/jackie/Documents/doc/README.md /
- 17/05/13 15:47:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- 17/05/13 15:47:16 WARN hdfs.DataStreamer: DataStreamer Exception
- org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /README.md._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1). There are 0 datanode(s) running and no node(s) are excluded in this operation.
- at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1733)
- at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:265)
- at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2496)
- at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:828)
后來(lái)發(fā)現(xiàn)是datanode沒(méi)有啟動(dòng),然后開(kāi)始找datanode沒(méi)有啟動(dòng)的原因,在這里http://www.aboutyun.com/thread-7931-1-1.html
文中解釋?zhuān)寒?dāng)我們執(zhí)行文件系統(tǒng)格式化時(shí),會(huì)在namenode數(shù)據(jù)文件夾(即配置文件中dfs.name.dir在本地系統(tǒng)的路徑)中保存一個(gè)current/VERSION文件,記錄namespaceID,標(biāo)識(shí)了所格式化的 namenode的版本。如果我們頻繁的格式化namenode,那么datanode中保存(即配置文件中dfs.data.dir在本地系統(tǒng)的路徑)的current/VERSION文件只是你***次格式化時(shí)保存的namenode的ID,因此就會(huì)造成datanode與namenode之間的id不一致。
解決方法:采取的做法是根據(jù)執(zhí)行hadoop namenode –format得到成功的提示。
這時(shí)候再執(zhí)行jps命令,我們就可以看到datanode了
類(lèi)似的,同樣是在執(zhí)行hadoop fs -put /Users/jackie/Documents/doc/README.md /是報(bào)錯(cuò)如下
- hadoop fs -put /Users/jackie/Documents/doc/README.md /
- 17/05/15 09:51:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- 17/05/15 09:51:05 WARN ipc.Client: Failed to connect to server: jackie/192.168.73.56:8020: try once and fail.
- java.net.ConnectException: Connection refused
- at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
- at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
- at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
- at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
- at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
- at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:681)
- at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:777)
- at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:409)
一開(kāi)始以為是ip的配置問(wèn)題,但是反復(fù)修改無(wú)果,后來(lái)發(fā)現(xiàn)使用jps時(shí),沒(méi)有啟動(dòng)namenode,于是在網(wǎng)上找http://blog.csdn.net/bychjzh/article/details/7830508
于是在/usr/local/Cellar/hadoop/hdfs下刪除原來(lái)在core-site.xml中配置的tmp目錄,然后新建了hadoop_tmp目錄,并在core-site.xml中修改成
- <property>
- <name>hadoop.tmp.dir</name>
- <value>/usr/local/Cellar/hadoop/hdfs/hadoop_tmp</value>
- <description>A base for other temporary directories.</description>
- </property>
并執(zhí)行hadoop namenode –format,***在使用start-all.sh啟動(dòng)所有的服務(wù),執(zhí)行上傳文件成功