2016-11-04 6 views
0

オブジェクトときにIデータセットのAPIを使用し、私はケースクラスの二種類NPE FLINK基ケースクラスは

case class Geo(country:Int, province:Int, city:Int, county:Int) 


case class AntiFraudLog(
    eventType: Int, 
    valid: Boolean  
) 

case class AntiFraudSession(fraudLogs: Seq[AntiFraudLog]) 

を有するそれからその値がケースクラスであるキー/値のペアを生成しました。

val dataKeyValue: DataSet[(Long, AntiFraudLog)] 

し、別のケースクラスにグループ化されたデータを変換し、同じキー

val groupedSortedData = dataKeyValue groupBy 0 

でグループ項目に試す

val sessionData:DataSet[AntiFraudSession] = groupedSortedData reduceGroup(
    logs => AntiFraudSession(logs.map(_._2).toSeq) 
) 

しかし、私はプログラムを実行すると、私はそのような例外が発生しました

Caused by: java.lang.NullPointerException 
    at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:90) 
    at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:32) 
    at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100) 
    at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30) 
    at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100) 
    at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30) 
    at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) 
    at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:83) 
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:85) 
    at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) 
    at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) 
    at org.apache.flink.api.scala.DataSet$$anon$5$$anonfun$flatMap$1.apply(DataSet.scala:417) 
    at org.apache.flink.api.scala.DataSet$$anon$5$$anonfun$flatMap$1.apply(DataSet.scala:417) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at org.apache.flink.api.scala.DataSet$$anon$5.flatMap(DataSet.scala:417) 
    at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) 
    at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) 
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:163) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) 
    at java.lang.Thread.run(Thread.java:745) 

誰もそれを修正する方法を知っていますか?

答えて

0

Flinkがヌル値を持つコレクションフィールドを持つケースクラスをシリアル化できないように見えます。あなたのシナリオでは、fraudLogs = nullのAntiFraudSessionになります。 sessionDataにそのような要素が現れると思われる変換ロジックが他にもありますか?