2017-03-20 11 views
0

以下は単純なコードセグメントです。sc.wholeTextFiles + toDebugStringは、アクションが実行される前でも長い時間がかかります

inputForCustの合計ファイル数は14762で、平均ファイルサイズは0.5Kで57Mです。ファイルはNFS経由でマウントされたローカルファイルシステムからロードされます。

val inputCustFiles = sc.wholeTextFiles(inputForCust, jobArgs.minPartitions) 
println("This prints immediately") 
inputCustFiles.toDebugString 
println("This prints after 20 mins") 
inputCustFiles.count 
println("This prints after 10 mins") 

注:私たちはwholeTextFile後、いくつかの複雑な変換を持ったし、時間がreduceByKeyで撮影されました!問題を再現するためにコードを単純化しました。

私の質問は、なぜinputCustFiles.toDebugStringが時間がかかるのですか?

inputCustFiles.countに時間がかかる場合は、クラスタ処理能力を利用することが保証されます。しかし、inputCustFiles.toDebugStringはドライバをブロックしています!!!

20分の持続時間で、私はスパークUIで何の活動も見ません。

私は、トレースレベルのログを有効にした場合、私は何も悪いことをやっているか、これはスパークの制限/バグ/設計であれば、私は、ライン

[error] [17/03/17 23:23:27] [DEBUG] BlockManager: Getting local block broadcast_1 
[error] [17/03/17 23:23:27] [DEBUG] BlockManager: Level for block broadcast_1 is StorageLevel(true, true, false, true, 1) 
[error] [17/03/17 23:23:27] [DEBUG] BlockManager: Getting block broadcast_1 from memory 
[error] [17/03/17 23:23:43] [TRACE] HeartbeatReceiver: Checking for hosts with no recent heartbeats in HeartbeatReceiver. 
[error] [17/03/17 23:24:43] [TRACE] HeartbeatReceiver: Checking for hosts with no recent heartbeats in HeartbeatReceiver. 
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: 

任意のアイデアの下に参照してください?

注:

  • 私たちは1.6.2を使用しています。
  • 入力ファイルの数が変更されると、toDebugStringの処理に時間がかかります。以下は

時間ドライバのスタックトレースが

java.io.FileInputStream.readBytes(Native Method) 
java.io.FileInputStream.read(FileInputStream.java:255) 
java.io.BufferedInputStream.read1(BufferedInputStream.java:284) 
java.io.BufferedInputStream.read(BufferedInputStream.java:345) 
sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) 
sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) 
sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) 
java.io.InputStreamReader.read(InputStreamReader.java:184) 
java.io.BufferedReader.fill(BufferedReader.java:161) 
java.io.BufferedReader.read1(BufferedReader.java:212) 
java.io.BufferedReader.read(BufferedReader.java:286) 
org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:602) 
org.apache.hadoop.util.Shell.runCommand(Shell.java:446) 
org.apache.hadoop.util.Shell.run(Shell.java:379) 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) 
org.apache.hadoop.util.Shell.execCommand(Shell.java:678) 
org.apache.hadoop.util.Shell.execCommand(Shell.java:661) 
org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:1097) 
org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:567) 
org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.getPermission(RawLocalFileSystem.java:542) 
org.apache.hadoop.fs.LocatedFileStatus.<init>(LocatedFileStatus.java:42) 
org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1815) 
org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1797) 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:267) 
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:217) 
org.apache.spark.rdd.WholeTextFileRDD.getPartitions(WholeTextFileRDD.scala:49) 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
scala.Option.getOrElse(Option.scala:121) 
org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
scala.Option.getOrElse(Option.scala:121) 
org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
org.apache.spark.rdd.RDD.firstDebugString$1(RDD.scala:1747) 
org.apache.spark.rdd.RDD.toDebugString(RDD.scala:1781) 
oculus.storeonce.spark.Test$.main(Test.scala:11) 
oculus.storeonce.spark.Test.main(Test.scala) 
+0

何か変わったこと – BDR

答えて

0

をブロックされている、それは結局のところ、スパークドライバは、ファイルシステムを通過し、それが作成できるように、メタデータを大量に取得します適切なサイズのパーティションの正しい番号。ファイルシステム(HDFSを含む)は小さなファイルの処理には効率的ではないため、このメタデータのファイルシステムをスキャンすると遅延が発生します。

なぜrdd.reduceByKey ...またはrdd.toDebugStringが操作を呼び出すのであり、sc.wholeTextFiles/textFilesやrdd.mapがそうでないかは依然として疑問です!!!

解決策/回避策は、kafka/cassandraのようなデータをホストするか、多数のファイルの内容を単一のファイルにマージすることです。

関連する問題