0
Javaを使用してapache sparkを操作する。 JavaPairRDD RDD1を取得しました.RDD1の値の合計を作成して、別のJavaPairRdd RDD2を作成したいとします。しかし、次のコードを実行すると、エラーメッセージなしでtest_3変換がブロックされます。私はそれが別の変換の中でrdd変換またはアクションを実行する問題に関連していると思います。値を操作してJavaPairRDDを変換する(Sum)Spark
JavaPairRDD<Key, JavaPairRDD<Integer, Double>> test_2 = test_1.mapToPair(new PairFunction<Tuple2<Key, JavaPairRDD<Integer, Double>>, Key, JavaPairRDD<Integer, Double>>() {
@Override
public Tuple2<Key, JavaPairRDD<Integer, Double>> call(Tuple2<Key, JavaPairRDD<Integer, Double>> t) throws Exception {
return new Tuple2(t._1,t._2.reduceByKey((Double val1, Double val2)
-> Math.pow(Math.abs(val1 - val2), 2)));
}
});
JavaPairRDD<Key, JavaPairRDD<Integer, Double>> test_3 = test_2.mapToPair
(new PairFunction<Tuple2<Key, JavaPairRDD<Integer, Double>>, Key, JavaPairRDD<Integer, Double>>() {
@Override
public Tuple2<Key, JavaPairRDD<Integer, Double>> call(Tuple2<Key, JavaPairRDD<Integer, Double>> t)
throws Exception {
return new Tuple2(t._1,t._2.values().reduce((Double t1, Double t2) -> t1+t2));
}});
JavaPairRDD<Key, Double> test_4= test_3.mapToPair
(new PairFunction<Tuple2<Key, JavaPairRDD<Integer, Double>>, Key, Double>() {
@Override
public Tuple2<Key, Double> call(Tuple2<Key, JavaPairRDD<Integer, Double>> t) throws Exception {
return new Tuple2(t._1,t._2.values().first());
}
});
実際、私たちはRDDとして値を持つことができます。あなたが見ることができれば、私はtest2 RDDのアクションを収集するときに問題なく結果を得ることができます。対照的に、ここでの問題は、それらを合計するために値操作の呼び出しに関連しています。つまり、以前のJavaPairRDD(test_2 ----> test_3)の値の合計値を持つ新しいjavaPairRDDを作成する必要があります。 – imountasser
RDDの中でRDDをどうやって使うことができるのかは私にはあまり意味がありません。あなたが 'test_2'で収集したときに、結果をプリントアウトすると、何が得られますか?各キーにはtuple2のリストがありますか? – LuckyGuess
私がtest_2でcollectを実行した場合、私は結果として、同じキーを持つJavaPairRDDと下のformuleを使って計算された値を取得します。今はtest2を使ってpairRDDの値の合計を計算して同じことをする必要がありますが、RDD変換内でアクションを呼び出すのでブロックします。 – imountasser