1
で作業していないシリアルIは、(Javaで)このクラスを持って、私はスパーク(1.6)で使用したい:スパーク:集計
public class Aggregation {
private Map<String, Integer> counts;
public Aggregation() {
counts = new HashMap<String, Integer>();
}
public Aggregation add(Aggregation ia) {
String key = buildCountString(ia);
addKey(key);
return this;
}
private void addKey(String key, int cnt) {
if(counts.containsKey(key)) {
counts.put(key, counts.get(key) + cnt);
}
else {
counts.put(key, cnt);
}
}
private void addKey(String key) {
addKey(key, 1);
}
public Aggregation merge(Aggregation agg) {
for(Entry<String, Integer> e: agg.counts.entrySet()) {
this.addKey(e.getKey(), e.getValue());
}
return this;
}
private String buildCountString(Aggregation rec) {
...
}
}
スパークを開始私はKYROを有効にしてこのクラスを追加(スカラで):
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[Aggregation]))
そして、私はこのようなスパーク集計(スカラ)でそれを使用したい:
rdd.aggregate(new InteractionAggregation)((agg, rec) => agg.add(rec), (a, b) => a.merge(b))
はどういうわけか、これは、「タスクを上げますシリアライズ不可能 "例外が発生します。
しかし、私はマップとクラスを使用して削減する場合、すべてが正常に動作します:
val rdd2= interactionObjects.map(_ => new InteractionAggregation())
rdd2.reduce((a,b) => a.merge(b))
println(rdd2.count())
あなたはエラーが集計で発生する理由のアイデアを持っていますが、マップと/減らすないのですか?
ありがとうございました!
これは実際にこの問題を解決します。しかし、私はそれがKyroを使用してSerializableを実装する必要はないと思った!または、Sparkは集約を使用するときにJava Serializationにフォールバックしますか? – Daniel
メモリが提供されている場合、Kryoはいくつかのコア・スカラ非直列化可能クラスのみを処理しますが、一般的なクラスではありません。 https://github.com/EsotericSoftware/kryo#using-standard-java-serializationを参照してください。 –