2016-08-25 14 views
0

私はkafkaとsparkのために新しく、いくつかカウントしようとしていますが、成功していません!問題の詳細は次のとおりです。ありがとう! belowsのようKafka + Java + SparkStreaming + reduceByKeyAndWindow throw例外:org.apache.spark.SparkException:タスクがシリアライズできません。

コード:スレッド "スレッド3" org.apache.spark.SparkExceptionで

例外:belowsのよう

JavaPairDStream<String,Integer> counts = wordCounts.reduceByKeyAndWindow(new AddIntegers(), new SubtractIntegers(), Durations.seconds(8000), Durations.seconds(4000)); 

例外 org.apacheにシリアライズしないタスク .spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:166) ( )org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean SparkContext.scala:1623) org.apache.spark.streaming.dstream.PairDStreamFunctions.reduceByKeyAndWindow(PairDStreamFunctions.scala時:333) org.apache.spark.streaming.dstream.PairDStreamFunctions.reduceByKeyAndWindow(PairDStreamFunctions.scalaにおいて:299 ) でorg.apache.spark.streaming.api.java.JavaPairDStream.reduceByKeyAndWindow(JavaPairDStream.scala:352) KafkaAndDstreamWithIncrement.KDDConsumer.run(KDDConsumer.java:110) に起因する:java.io.NotSerializableException: KafkaAndDstreamWithIncrement.KDDConsumer

+0

私たちに 'addIntegers'と' subtractIntegers'を表示してください。 –

+0

アドバイスありがとうございます!以前は、私はいつも "reduceBykeyAndWindowをオーバーライドする方法"について書いていました。しかし、今私はおそらくaddIntgersとsubractIntegersで間違っていることがわかります。私は試して、そして成功しました、もう一度ありがとう! –

答えて

0

コードとしてbelowsの(静的定義):

static Function2<Integer,Integer,Integer> AddIntegers = new Function2<Integer,Integer,Integer>(){ 
    @Override 
    public Integer call (Integer i1,Integer i2){ 
     return i1 + i2; 
    } 
}; 
static Function2<Integer,Integer,Integer> SubtractIntegers = new Function2<Integer,Integer,Integer>(){ 
    @Override 
    public Integer call (Integer i1,Integer i2){ 
     return i1 - i2; 
    } 
}; 
関連する問題