2016-11-18 5 views
2

私はScalaでSpark Streamingアプリを書いています。このアプリの目標は、カフカの最新のレコードを消費し、標準出力に出力することです。YARNでKafkaから消費するとSpark Streamingアプリケーションが停止するのはなぜですか?

私がローカルで--master local[n]を使って実行すると、アプリケーションは完全に動作します。私は(と私はから消費していますトピックに生成)糸中のアプリを実行すると、アプリがで立ち往生しかし、:数回上記の行を繰り返した後

16/11/18 20:53:05 INFO JobScheduler: Added jobs for time 1479502385000 ms 

、スパークは、次のエラーを与える:

ストリーミングUIから
16/11/18 20:54:47 WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 9, r3d3.hadoop.REDACTED.REDACTED): java.net.ConnectException: Connection timed out 
at sun.nio.ch.Net.connect0(Native Method) 
at sun.nio.ch.Net.connect(Net.java:454) 
at sun.nio.ch.Net.connect(Net.java:446) 
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648) 
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) 
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) 
at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142) 
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) 
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) 
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) 
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) 
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) 
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) 
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) 
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) 
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150) 
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162) 
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) 
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
at org.apache.spark.util.NextIterator.to(NextIterator.scala:21) 
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
at org.apache.spark.util.NextIterator.toBuffer(NextIterator.scala:21) 
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
at org.apache.spark.util.NextIterator.toArray(NextIterator.scala:21) 
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) 
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

エラー:YARNのアプリケーションログから

org.apache.spark.streaming.dstream.DStream.print(DStream.scala:757) 
com.REDACTED.bdp.Main$.main(Main.scala:88) 
com.REDACTED.bdp.Main.main(Main.scala) 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
java.lang.reflect.Method.invoke(Method.java:498) 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

エラー(標準出力):

YARNのアプリケーションログから
java.lang.NullPointerException 
     at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158) 
     at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66) 
     at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) 
     at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) 
     at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:60) 
     at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:79) 
     at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:77) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:77) 
     at org.apache.spark.scheduler.Task.run(Task.scala:91) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
[2016-11-21 15:57:49,925] ERROR Exception in task 0.1 in stage 33.0 (TID 34) (org.apache.spark.executor.Executor) 
org.apache.spark.util.TaskCompletionListenerException 
     at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:87) 
     at org.apache.spark.scheduler.Task.run(Task.scala:91) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

別のエラー:

[2016-11-21 15:52:32,264] WARN Exception encountered while connecting to the server : (org.apache.hadoop.ipc.Client) 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby 
     at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:375) 
     at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:558) 
     at org.apache.hadoop.ipc.Client$Connection.access$1800(Client.java:373) 
     at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:727) 
     at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:723) 
     at java.security.AccessController.doPrivileged(Native Method) 
     at javax.security.auth.Subject.doAs(Subject.java:422) 
     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) 
     at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:722) 
     at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:373) 
     at org.apache.hadoop.ipc.Client.getConnection(Client.java:1493) 
     at org.apache.hadoop.ipc.Client.call(Client.java:1397) 
     at org.apache.hadoop.ipc.Client.call(Client.java:1358) 
     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) 
     at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source) 
     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:252) 
     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104) 
     at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) 
     at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2116) 
     at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1315) 
     at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1311) 
     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1311) 
     at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1424) 
     at org.apache.spark.deploy.yarn.Client$.org$apache$spark$deploy$yarn$Client$$sparkJar(Client.scala:1195) 
     at org.apache.spark.deploy.yarn.Client$.populateClasspath(Client.scala:1333) 
     at org.apache.spark.deploy.yarn.ExecutorRunnable.prepareEnvironment(ExecutorRunnable.scala:290) 
     at org.apache.spark.deploy.yarn.ExecutorRunnable.env$lzycompute(ExecutorRunnable.scala:61) 
     at org.apache.spark.deploy.yarn.ExecutorRunnable.env(ExecutorRunnable.scala:61) 
     at org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:80) 
     at org.apache.spark.deploy.yarn.ExecutorRunnable.run(ExecutorRunnable.scala:68) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

奇妙な部分は時間の約5%は、アプリがどのような理由のために、成功したカフカから読み込むことです。

クラスタとYARNが正常に動作しているようです。 クラスタはKerberosを使用して保護されています。

このエラーの原因は何ですか?

+0

「5%の時間で、アプリはKafkaから正常に読み込まれます」と「クラスターはKerberosを使用して保護されています」とこれらのイベントの間の時間を考えさせます。 5分後に認証トークンが失効し、ストリーミングジョブが失敗し始めることがありますか? (kerberized/secured Sparkクラスタでは決して働かなかった)。 'r3d3.hadoop.REDACTED.REDACTED'はSparkエグゼキュータを持つホストですね。Web UIからストリーミングタブを最初から最後まで貼り付けることはできますか? –

+0

YARNログを見て、エグゼキュータに正確に何が起きているのかを確認してください。>> Sparkドライバのログ(sthgは 'application_xxxx_xxxxxxxx')でYARNジョブIDを探し、これを使用してYARN UIを検索するか、糸状態;糸ログ-applicationId ' –

+0

読み込みしようとしているカフカのトピックを作成すると、私の仕事は開始直後に失敗します。 – dqian96

答えて

0

tl; dr答えは答えを提供せず、は単にが可能な次のステップを示唆しています。

ストリーミングジョブでイベントが報告される可能性があると私は理解しています。ジョブが実行されて終了できなかった場合は、SparkエグゼキュータとKafkaブローカの間の接続の問題です。

16/11/18 20:54:47 WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 9, r3d3.hadoop.REDACTED.REDACTED): java.net.ConnectException: Connection timed out 
at sun.nio.ch.Net.connect0(Native Method) 
at sun.nio.ch.Net.connect(Net.java:454) 
at sun.nio.ch.Net.connect(Net.java:446) 
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648) 
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) 
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) 
at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142) 
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) 
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) 
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) 
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) 
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) 
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) 
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) 
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) 
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150) 

pattern of the error message次のとおりです。

Lost task [id] in stage [taskSetId] (TID [tid], [host], executor [executorId]): [reason] 

ホストr3d3.hadoop.REDACTED.REDACTED上で実行されているスパークexecutorを持つとあなたのケースに変換しています。

失敗の理由を述べていた以下のものです:

java.net.ConnectException: Connection timed out 
at sun.nio.ch.Net.connect0(Native Method) 
at sun.nio.ch.Net.connect(Net.java:454) 
at sun.nio.ch.Net.connect(Net.java:446) 
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648) 
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) 
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) 
at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142) 
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) 
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) 
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) 
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) 
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) 
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) 
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) 
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) 

そしてカフカブローカーがクライアントのために利用できない可能性がとき私はあなたのケースでいるかもしれないまたはスパークストリーミング・アプリケーションである(自分自身を求めるだろう問題の根本原因を理解するのに貢献していない可能性があります)。

私は、それがApache Sparkとは無関係かもしれないと思って、カフカサークルでもっと答えを探します。

関連する問題