2017-07-15 10 views
-2

SparkSQLストーピングからSparkSQLジョブを呼び出しています。我々は同時に例外を取得しており、カフカの消費者は閉鎖的なエラーです。KafkaConsumerの並行例外は、マルチスレッドアクセスでは安全ではありません

カフカのコンシューマコード

// Start reading messages from Kafka and get DStream 
     final JavaInputDStream<ConsumerRecord<String, byte[]>> consumerStream = KafkaUtils.createDirectStream(
       getJavaStreamingContext(), LocationStrategies.PreferConsistent(), 
       ConsumerStrategies.<String, byte[]>Subscribe(SparkServiceConfParams.AIR.CONSUME_TOPICS, 
         sparkServiceConf.getKafkaConsumeParams())); 

     ThreadContext.put(Constants.CommonLiterals.LOGGER_UID_VAR, CommonUtils.loggerUniqueId()); 
    // Decode each binary message and generate JSON array 
    JavaDStream<String> decodedStream = messagesStream.map(new Function<byte[], String>() {} 

..

// publish generated json gzip to kafka 
    decodedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() { 
     private static final long serialVersionUID = 1L; 

     @Override 
     public void call(JavaRDD<String> jsonRdd4DF) throws Exception { 
      //Dataset<Row> json = sparkSession.read().json(jsonRdd4DF); 
      if(!jsonRdd4DF.isEmpty()) { 
       //JavaRDD<String> jsonRddDF = getJavaSparkContext().parallelize(jsonRdd4DF.collect()); 
       Dataset<Row> json = sparkSession.read().json(jsonRdd4DF); 

       SparkAIRMainJsonProcessor airMainJsonProcessor = new SparkAIRMainJsonProcessor(); 

        AIRDataSetBean processAIRData = airMainJsonProcessor.processAIRData(json, sparkSession); 

エラー詳細

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access 
:ここでは、コードと例外の詳細です

最後にカフカの消費者は、閉じた状態:

org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.IllegalStateException: 

This consumer has already been closed. 
+0

ここで、カフカの消費者を呼び出すコードはありますか? –

+0

@YuvalItzchakovコードを追加しました – Imran

+0

あなたの質問は何ですか?スパークのバグみたいですね... –

答えて

1

この問題は、キャッシュやスパークストリーミングのオプションを永続化を使用して解決されます。このシナリオでは、キャッシュRDDはKafkaから再度読み取られず、問題は解決されます。ストリームの同時使用を可能にします。しかし、賢明にキャッシュオプションを使用してください。コード:

JavaDStream<ConsumerRecord<String, byte[]>> cache = consumerStream.cache(); 
関連する問題