2017-02-07 1 views
4

私は奇妙な問題に直面しています。私が理解する限り、Sparkでの操作のDAGは、アクションが実行されたときにのみ実行されます。しかし、reduceByKey()操作(変換)がDAGの実行を開始することがわかります。Apache Spark:reduceByKey変換がDAGを実行するのはなぜですか?

手順を再現します。コードの一部に従ってください。

SparkConf conf =new SparkConf().setMaster("local").setAppName("Test"); 
JavaSparkContext context=new JavaSparkContext(conf); 

JavaRDD<String> textFile = context.textFile("any non-existing path"); // This path should not exist 

JavaRDD<String> flatMap = textFile.flatMap(x -> Arrays.asList(x.split(" ")).iterator()); 
JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(x -> new Tuple2<String, Integer>((String) x, 1)); 

注:ファイルのパスは、既存のパスであってはいけません。つまり、ファイルは存在してはいけません。

Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: 

それが開始された意味:あなたはこのコードを実行する場合は、プログラムに次の行を追加して実行した場合、何もexpected.Howeverとして起こりません

mapToPair.reduceByKey((x, y) -> x + y); 

それは次の例外を提供しますDAGを実行します。 reduceByKey()は変換であるため、collect()やtake()などのアクションが実行されるまではそうでないはずです。

スパークバージョン:2.0.0。あなたの提案を提供してください。

+0

uは、この行で何を意味していますJavaRDD テキストファイル= context.textFile(「任意の非既存のパス」? ); //このパスは存在しないはずです –

+0

context.textFile()は、hdfsまたはローカルの内容を理想的にRDDにロードします。パスが存在しない場合、RDDはどのように形成されるのですか? –

+0

@AviralKumar質問は、変換が遅れて評価されるため、コードが実行されている理由と関係しています。ファイルが存在せず、reduceByKeyを呼び出した後に例外がスローされると、* something *が変換後に実行されていることが証明されます。 – ImDarrenG

答えて

2

これは実際には実行されるDAGではないからです(つまり、その全体的な実現化のように)。

どうすればreduceByKey には、作業が必要です。提供しない場合、Sparkは規約とデフォルトに基づいて作成します。コード内の以下のコメントとして「デフォルトpartiionner」:

/** 
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs. 
* 
* If any of the RDDs already has a partitioner, choose that one. 
* 
* Otherwise, we use a default HashPartitioner. For the number of partitions, if 
* spark.default.parallelism is set, then we'll use the value from SparkContext 
* defaultParallelism, otherwise we'll use the max number of upstream partitions. 
* 
* Unless spark.default.parallelism is set, the number of partitions will be the 
* same as the number of partitions in the largest upstream RDD, as this should 
* be least likely to cause out-of-memory errors. 
* 
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. 
*/ 

この定義には、いくつかのケースでは、すべてのアップストリームRDDSからパーティションの数が計算されていることを、意味しています。あなたのケースでは、それは必要なものを実行するために "ファイルシステム"(Hadoop、ローカルかもしれない...)を要求することを意味します(例えば、Hadoop FileSystemへの単一の呼び出しは複数のファイルを返すことができます。 InputFormatで定義されたさまざまな最適化に従って、実際にそれらを探し出すことによってのみ知ることができます)。

実際のDAGではなく、実際のDAG(ここではmap/flatMap/aggregateなど)がここで実行されています。

おそらくキーバリアントによって減らすこの中で、あなた自身のpartitionnerを提供することで、それを避けることができます:

reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] 
+0

お返事ありがとうございます。私はカスタムPartitonerを提供しようとしているし、今もエラーをスローしていません。しかし、私はまだ何もアクションが実行されない場合partitonsを計算し始めた混乱しています。続い は、参考のためパーティコードである: 'パーティショナパーティショナ=新しいパーティション分割(){ \t \t \t最終INT maxPartitions = 2。 \t \t \t @Override \t \t \t公共INT numPartitions(){ \t \t \t \t \t \t \t \t戻りmaxPartitions。 \t \t \t} \t \t \t \t \t \t @Override \t \t \t公共INT getPartition(最終オブジェクトobj){ \t \t \t \tストリングOBJ1 =(文字列)OBJ。 \t \t \t \t return obj1.hashCode()%maxPartitions; \t \t \t} \t \t}; ' –

+0

パーティショナーは「アクション実行」で「オンザフライ」で作成されないため、その時点で必要な変換が作成されます。 (これはスパークの欠点よりもトレードオフのようなものです)。デフォルトのものはそれについて賢明にしようとします(例えば、それが少数のパーティションを作成した場合、それはメモリ不足につながり、それが多すぎると無駄になる可能性があります)。しかし、スマートなことは上流のRDD記述、これを「計算パーティション」と呼びます。デフォルトの並列性パラメータを設定すると、それが本当に必要な場合は「オフ」に切り替わります。 – GPI

+0

あなたの手作りのPartitionnerはHashPartitionnerのように見えますが、Sparkはすでにこれを持っています:https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/HashPartitioner.html – GPI

関連する問題