2017-12-28 17 views
0

私はストリーミングを作成するには、以下の使用すべての行使用火花ストリーミング時自身の自己によって相殺

火花ストリーミングカフカデータを読み取るために、プロセスを使用します。

lines = KafkaUtils.createDirectStream(
      jssc, 
      LocationStrategies.PreferConsistent(), 
      ConsumerStrategies.<String, String>Subscribe(topics,kafkaParams) 
    ); 

その後、私はカフカ

lines.foreachRDD((JavaRDD<ConsumerRecord<String, String>> rdd) -> { 
      OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); 
      OffsetRange[] range = new OffsetRange[1]; 
      range[0] = o; 

      rdd.foreachPartition((Iterator<ConsumerRecord<String, String>> partitionOfRecords) -> { 
      // get kafka offset 
      OffsetRange o = offsetRanges[TaskContext.get().partitionId()]; 
      // to cache line data 
      List<String> jsonData = new ArrayList<>(); 
      // to read all line data 
      while (partitionOfRecords.hasNext()) { 
       ConsumerRecord<String, String> line = partitionOfRecords.next(); 
       jsonData.add(line.value()); 
      } 
      // TODO do my own bussiness from jsonData 
      ....... 
      // HOW can I commit kafka Offset Here?? 
      // this is a method to commit offset 
      ((CanCommitOffsets) lines.inputDStream()).commitAsync(range) 
     }); 
    }); 

からのデータを処理するために、このコードを使用して、私は何度も試してみる、私はそれはいくつかの問題を抱えているが見つかりました:

  1. 他のパーティションが失敗したときにデータ処理が成功したらどうしますか?それは私のすべてのデータプロセスが戻ってくるはずであるということですか?カフカオフセットはコミットしているため、

  2. 私はこのコードを実行していますが、実際に実行することがわかりました。次回このrdd executorが実行されたときに、進行中のoomが殺されるか、次回にKafkaから読み込まれるデータが2倍になる?

答えて

0

他のパーティション が失敗したときにどのようにそれは私のデータ処理成功場合は動作しますか?それは私のすべてのデータプロセスが戻ってくるはずであるということですか?特定のタスクが失敗した場合 オフセットカフカが

コミット持っているので、スパークはspark.task.maxFailures設定に従って場所でそれを再実行しようとします。番号が合格すると、ジョブ全体が失敗します。 commitAsyncより前の部分が失敗した場合は、オフセットをコミットしないようにする必要があります。私はこのコードを実行している

、その後、私はそれが本当に動作コミット実行した場合、進行状況OOM または殺され、次回次回このRDDキュータの実行が、それは私からの読み取りいくつかのデータを意味する場合 ですカフカは倍増するだろうか?

はい。ジョブが次のバッチ反復の前に強制終了された場合、Sparkはすでに処理されたデータの再読み込みを試みます。

+0

あなたのおかげでありがとうございます。私は何ができるのですか?一度だけデータを読むことができますか?私は毎回トランザクションを使用したくないからです。 – DreamHeaven

+0

@DreamHeavenあなた自身でオフセットを保存し、リカバリ時にそれらをリロードする必要があります。私は、より良いストリーミングセマンティクスを提供するStructured Streamingを見てみることをお勧めします。 –

関連する問題