免責を生成RDDに呼び出された後には戻りません:コールは()combineByKey機能によって
[(T,[Tina, Thomas]), (T,[Tolis]), (C,[Cory, Christine]), (J,[Joseph, Jimmy, James, Jackeline, Juan]), (J,[Jimbo, Jina])]
と私:私は
スパークに新しいです、私は次のようになりますRDDを持っていますcombineByKeyを呼び出して結果としてJavaPairRDDを返します。<文字、整数>
この呼び出しは正しく動作しているようです(この時点から制御フローが渡され、デバッガfooに何らかの値があるようです)
JavaPairRDD<Character, Integer> foo = rdd.combineByKey(createAcc, addAndCount, combine);
System.out.println(foo.collect());
私の問題は、プログラムがfoo.collect()の呼び出し後に戻ってこないということです。 アイデアはありますか?それはcombineByKeyによって呼び出される関数のコードは以下の通りです(:私はスパークバージョン2.0.0およびJava 8
EDITを使用しています私は、Eclipseのデバッガでデバッグしようとしましたが、私はすべての
で機会がなかったです
Function<Iterable<String>, Integer> createAcc =
new Function<Iterable<String>, Integer>() {
public Integer call(Iterable<String> x) {
int counter = 0;
Iterator<String> it = x.iterator();
while (it.hasNext()) {
counter++;
}
return counter;
}
};
Function2<Integer, Iterable<String>, Integer> addAndCount =
new Function2<Integer,Iterable<String>, Integer>() {
public Integer call(Integer acc , Iterable<String> x) {
int counter = 0;
Iterator<String> it = x.iterator();
while (it.hasNext()) {
counter++;
}
return counter + acc;
}
};
Function2<Integer,Integer,Integer> combine =
new Function2<Integer,Integer, Integer>() {
public Integer call(Integer x, Integer y) {
return x+y;
}
};
アップデート2:要求されたログは、明らかに私はスパークに新しいですダミーコード原因は、 combineByKeyを呼び出して私の目標は、各キーにbeloning文字列のリスト)の合計の長さを見つけることです続く
16/11/11 17:21:32 INFO SparkContext: Starting job: count at Foo.java:265 16/11/11 17:21:32 INFO DAGScheduler: Got job 9 (count at Foo.java:265) with 3 output partitions 16/11/11 17:21:32 INFO DAGScheduler: Final stage: ResultStage 20 (count at Foo.java:265) 16/11/11 17:21:32 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 19, ShuffleMapStage 18) 16/11/11 17:21:32 INFO DAGScheduler: Missing parents: List() 16/11/11 17:21:32 INFO DAGScheduler: Submitting ResultStage 20 (MapPartitionsRDD[24] at combineByKey at Foo.java:264), which has no missing parents 16/11/11 17:21:32 INFO MemoryStore: Block broadcast_12 stored as values in memory (estimated size 6.7 KB, free 1946.0 MB) 16/11/11 17:21:32 INFO MemoryStore: Block broadcast_12_piece0 stored as bytes in memory (estimated size 3.4 KB, free 1946.0 MB) 16/11/11 17:21:32 INFO BlockManagerInfo: Added broadcast_12_piece0 in memory on xxx.xxx.xx.xx:55712 (size: 3.4 KB, free: 1946.1 MB) 16/11/11 17:21:32 INFO SparkContext: Created broadcast 12 from broadcast at DAGScheduler.scala:1012 16/11/11 17:21:32 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 20 (MapPartitionsRDD[24] at combineByKey at Foo.java:264) 16/11/11 17:21:32 INFO TaskSchedulerImpl: Adding task set 20.0 with 3 tasks 16/11/11 17:21:32 INFO TaskSetManager: Starting task 0.0 in stage 20.0 (TID 30, localhost, partition 0, ANY, 5288 bytes) 16/11/11 17:21:32 INFO Executor: Running task 0.0 in stage 20.0 (TID 30) 16/11/11 17:21:32 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 3 blocks 16/11/11 17:21:32 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
に変更し、それら? "foo.count()"の結果は何ですか? – Yaron
データのサイズはかなり小さいです(期限切れに使用しているrddは質問に投稿されたものです)。 foo.count()への呼び出しはどちらも返されないようです(私はそれが同じ理由であると仮定します)。 – XII
combineByKey変換は、アクションの呼び出し時にのみ実行されます(count ou collectなど)。この問題はおそらくcombineByKey関数にあります。詳細を教えてください。 – Marie