2016-11-18 11 views
3

filtered.foreachPartition(iter => {によってトリガされる次のシリアル化の問題を解決できません。私はそのforeachPartitionはシリアル化の問題を解決することができますが、そうではありません。では、redisPoolの使い方は?Spark Streamingジョブ実行時のシリアル化の問題

EDIT(私はそれがより明確にするコードを更新):

val redis_host = "localhost" 
val redist_port = 6379 
messages.foreachRDD(rdd => { 
    rdd.foreachPartition(iter => { 
    val redisPool = new Pool(new JedisPool(new JedisPoolConfig(), redis_host, redis_port, 2000)) 
    iter.foreach({ msg => 
     println(msg.mkString(",")) 
    }) 
    }) 
}) 

私は、変数redis_hostredis_portが直列化可能でないことを前提としていますが、コードが上で動作できるように、どのように私はそれらを正しくシリアライズありませんクラスタだけでなく、ローカル?

上記-示すコードは、エラーがスローされます。

org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:135) at org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:134) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer Serialization stack: - object not serializable (class: org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer, value: [email protected]fba5c74) - field (class: org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, name: $outer, type: class org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer) - object (class org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, ) - field (class: org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, name: $outer, type: class org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1) - object (class org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 30 more Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:135) at org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:134) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer Serialization stack: - object not serializable (class: org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer, value: [email protected]fba5c74) - field (class: org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, name: $outer, type: class org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer) - object (class org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, ) - field (class: org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, name: $outer, type: class org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1) - object (class org.test.manager.service.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

+0

シリアライズ例外を共有できますか? – maasg

+0

mapPartitionsまたはforeachPartitionを使用すると、redisPool(ワーカーでローカルに作成され、マスターで作成した場合はシリアル化されません)が役立ちますが、RDD内のオブジェクトはシリアライズ可能ですか? myDstreamのコンテンツを含む完全な例を提供した場合は助けになります –

+1

何が 'フィルタリングされました?なぜ 'rdd'を使っていないのですか? –

答えて

0

ソリューションは、あなたの匿名関数の内部で遅延してプールを初期化することです。 javaでは、次のようなことができます:

messages.foreachRDD(new RedisFunction(redis_host, redis_port)) 
messages.count() 

class RedisFunction implements F { 
    private Pool pool = null; 
    private final String redis_host; 
    private final String redis_port; 

    RedisFunction(String redis_host, String redis_port) { 
     this.redis_host = redis_host; 
     this.redis_port = redis_port; 
     initPool(); 
    } 
    private void initPool() { 
     this.pool = new Pool(new JedisPool(new JedisPoolConfig(), redis_host, redis_port, 2000)) 
    } 
    public Void call(JavaRDD<> rdd) { 
     if(this.pool == null) { 
      initPool(); 
     } 
     rdd = rdd.map(....); /*your rdd transformations go here*/ 
     rdd.count(); //spark action 
    } 
} 

上記の例は、シリアル化の問題を解決するのに役立ちます。

関連する問題