2016-07-06 17 views
1

で作業していないシリアルIは、(Javaで)このクラスを持って、私はスパーク(1.6)で使用したい:スパーク:集計

public class Aggregation { 
    private Map<String, Integer> counts; 

    public Aggregation() { 
    counts = new HashMap<String, Integer>(); 
    } 

    public Aggregation add(Aggregation ia) { 
    String key = buildCountString(ia); 
    addKey(key); 
    return this; 
    } 

    private void addKey(String key, int cnt) { 
    if(counts.containsKey(key)) { 
     counts.put(key, counts.get(key) + cnt); 
    } 
    else { 
     counts.put(key, cnt); 
    } 
    } 

    private void addKey(String key) { 
    addKey(key, 1); 
    } 

    public Aggregation merge(Aggregation agg) { 
    for(Entry<String, Integer> e: agg.counts.entrySet()) { 
     this.addKey(e.getKey(), e.getValue()); 
    } 
    return this; 
    } 

    private String buildCountString(Aggregation rec) { 
    ... 
    } 
} 

スパークを開始私はKYROを有効にしてこのクラスを追加(スカラで):

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
conf.registerKryoClasses(Array(classOf[Aggregation])) 

そして、私はこのようなスパーク集計(スカラ)でそれを使用したい:

rdd.aggregate(new InteractionAggregation)((agg, rec) => agg.add(rec), (a, b) => a.merge(b)) 

はどういうわけか、これは、「タスクを上げますシリアライズ不可能 "例外が発生します。

しかし、私はマップとクラスを使用して削減する場合、すべてが正常に動作します:

val rdd2= interactionObjects.map(_ => new InteractionAggregation()) 
rdd2.reduce((a,b) => a.merge(b)) 
println(rdd2.count()) 

あなたはエラーが集計で発生する理由のアイデアを持っていますが、マップと/減らすないのですか?

ありがとうございました!

答えて

1

集計クラスはSerializableを実装する必要があります。集約を呼び出すと、ドライバは(新しいAggregation())オブジェクトをすべてのワーカーに送信します。その結果、シリアライゼーションエラーが発生します。

+0

これは実際にこの問題を解決します。しかし、私はそれがKyroを使用してSerializableを実装する必要はないと思った!または、Sparkは集約を使用するときにJava Serializationにフォールバックしますか? – Daniel

+0

メモリが提供されている場合、Kryoはいくつかのコア・スカラ非直列化可能クラスのみを処理しますが、一般的なクラスではありません。 https://github.com/EsotericSoftware/kryo#using-standard-java-serializationを参照してください。 –