以下は単純なコードセグメントです。sc.wholeTextFiles + toDebugStringは、アクションが実行される前でも長い時間がかかります
inputForCustの合計ファイル数は14762で、平均ファイルサイズは0.5Kで57Mです。ファイルはNFS経由でマウントされたローカルファイルシステムからロードされます。
val inputCustFiles = sc.wholeTextFiles(inputForCust, jobArgs.minPartitions)
println("This prints immediately")
inputCustFiles.toDebugString
println("This prints after 20 mins")
inputCustFiles.count
println("This prints after 10 mins")
注:私たちはwholeTextFile後、いくつかの複雑な変換を持ったし、時間がreduceByKeyで撮影されました!問題を再現するためにコードを単純化しました。
私の質問は、なぜinputCustFiles.toDebugStringが時間がかかるのですか?
inputCustFiles.countに時間がかかる場合は、クラスタ処理能力を利用することが保証されます。しかし、inputCustFiles.toDebugStringはドライバをブロックしています!!!
20分の持続時間で、私はスパークUIで何の活動も見ません。
私は、トレースレベルのログを有効にした場合、私は何も悪いことをやっているか、これはスパークの制限/バグ/設計であれば、私は、ライン
[error] [17/03/17 23:23:27] [DEBUG] BlockManager: Getting local block broadcast_1
[error] [17/03/17 23:23:27] [DEBUG] BlockManager: Level for block broadcast_1 is StorageLevel(true, true, false, true, 1)
[error] [17/03/17 23:23:27] [DEBUG] BlockManager: Getting block broadcast_1 from memory
[error] [17/03/17 23:23:43] [TRACE] HeartbeatReceiver: Checking for hosts with no recent heartbeats in HeartbeatReceiver.
[error] [17/03/17 23:24:43] [TRACE] HeartbeatReceiver: Checking for hosts with no recent heartbeats in HeartbeatReceiver.
:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
任意のアイデアの下に参照してください?
注:
- 私たちは1.6.2を使用しています。
- 入力ファイルの数が変更されると、toDebugStringの処理に時間がかかります。以下は
時間ドライバのスタックトレースが
java.io.FileInputStream.readBytes(Native Method)
java.io.FileInputStream.read(FileInputStream.java:255)
java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
java.io.BufferedInputStream.read(BufferedInputStream.java:345)
sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
java.io.InputStreamReader.read(InputStreamReader.java:184)
java.io.BufferedReader.fill(BufferedReader.java:161)
java.io.BufferedReader.read1(BufferedReader.java:212)
java.io.BufferedReader.read(BufferedReader.java:286)
org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:602)
org.apache.hadoop.util.Shell.runCommand(Shell.java:446)
org.apache.hadoop.util.Shell.run(Shell.java:379)
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:1097)
org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:567)
org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.getPermission(RawLocalFileSystem.java:542)
org.apache.hadoop.fs.LocatedFileStatus.<init>(LocatedFileStatus.java:42)
org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1815)
org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1797)
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:267)
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:217)
org.apache.spark.rdd.WholeTextFileRDD.getPartitions(WholeTextFileRDD.scala:49)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
scala.Option.getOrElse(Option.scala:121)
org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
scala.Option.getOrElse(Option.scala:121)
org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
org.apache.spark.rdd.RDD.firstDebugString$1(RDD.scala:1747)
org.apache.spark.rdd.RDD.toDebugString(RDD.scala:1781)
oculus.storeonce.spark.Test$.main(Test.scala:11)
oculus.storeonce.spark.Test.main(Test.scala)
何か変わったこと – BDR