私は奇妙な問題に直面しています。私が理解する限り、Sparkでの操作のDAGは、アクションが実行されたときにのみ実行されます。しかし、reduceByKey()操作(変換)がDAGの実行を開始することがわかります。Apache Spark:reduceByKey変換がDAGを実行するのはなぜですか?
手順を再現します。コードの一部に従ってください。
SparkConf conf =new SparkConf().setMaster("local").setAppName("Test");
JavaSparkContext context=new JavaSparkContext(conf);
JavaRDD<String> textFile = context.textFile("any non-existing path"); // This path should not exist
JavaRDD<String> flatMap = textFile.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(x -> new Tuple2<String, Integer>((String) x, 1));
注:ファイルのパスは、既存のパスであってはいけません。つまり、ファイルは存在してはいけません。
Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
それが開始された意味:あなたはこのコードを実行する場合は、プログラムに次の行を追加して実行した場合、何もexpected.Howeverとして起こりません
は
mapToPair.reduceByKey((x, y) -> x + y);
それは次の例外を提供しますDAGを実行します。 reduceByKey()は変換であるため、collect()やtake()などのアクションが実行されるまではそうでないはずです。
スパークバージョン:2.0.0。あなたの提案を提供してください。
uは、この行で何を意味していますJavaRDDテキストファイル= context.textFile(「任意の非既存のパス」? ); //このパスは存在しないはずです –
context.textFile()は、hdfsまたはローカルの内容を理想的にRDDにロードします。パスが存在しない場合、RDDはどのように形成されるのですか? –
@AviralKumar質問は、変換が遅れて評価されるため、コードが実行されている理由と関係しています。ファイルが存在せず、reduceByKeyを呼び出した後に例外がスローされると、* something *が変換後に実行されていることが証明されます。 – ImDarrenG