2017-09-07 3 views
1

以下のようにカスタムアキュムレータを作成しました。これは(設定、地図などのように、私はCollectionAccumulatorがリストのための知っている)任意のコレクションのためのアキュムレータを作成するために、スパークAPIはありません、カスタム累算器のjava.lang.NullPointerException

public class ABCAccumulator extends AccumulatorV2<String, Set> implements Serializable { 
    Set<String> set = new HashSet(); 
@Override 
    public void add(String v) { 
     set.add(v); 
    } 
} 

まず、シリアライズのですか?

第二に、私は以下のようにRDDのすべての値を追加するには、このアキュムレータを使用しています:

ABCAccumulator acc = new ABCAccumulator(); 
sparkContext.register(acc); 

rdd.foreach(record -> { 
acc.add(record.getName()); 
}); 

しかし、私は私のコードを実行したとき、私は例外を取得:

org.apache.spark.SparkException: Task not serializable 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
     at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
     at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
     at org.apache.spark.SparkContext.clean(SparkContext.scala:2287) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
     at org.apache.spark.rdd.RDD.foreach(RDD.scala:916) 
     at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:351) 
     at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45) 
     at com.def.ghi.jkl.mno.ActualClass.lambda$main$ed7564e9$1(ActualClass.java:154) 
     at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272) 
     at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272) 
     at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) 
     at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
     at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
     at scala.util.Try$.apply(Try.scala:192) 
     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) 
     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.NullPointerException 
     at org.apache.spark.util.AccumulatorV2.copyAndReset(AccumulatorV2.scala:129) 
     at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:167) 
     at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136) 
     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) 
     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) 

は親切

を助けます
+0

あなたは完全なスタックトレースを追加してくださいすることができますか? –

+0

フルスタックトレースが追加されました –

答えて

1

ABCAccumulatorにはすべてのメソッドが正しく実装されていないため、失敗していると思います。

は似たようなお試しください:

class ABCAccumulator extends AccumulatorV2<String,Set<String>> { 
     Set<String> values = new HashSet<String>(); 
     @Override 
     public boolean isZero() { 
      return values.size()==0; 
     } 

     @Override 
     public AccumulatorV2<String, Set<String>> copy() { 
      return this; 
     } 

     @Override 
     public void reset() { 
      values.clear(); 
     } 

     @Override 
     public void add(String v) { 
      values.add(v); 
     } 

     @Override 
     public void merge(AccumulatorV2<String, Set<String>> other) { 
      for(String str: other.value()){ 
       add(str); 
      } 
     } 

     @Override 
     public Set<String> value() { 
      return values; 
     } 
    }