私はストリーミングを作成するには、以下の使用すべての行使用火花ストリーミング時自身の自己によって相殺
火花ストリーミングカフカデータを読み取るために、プロセスを使用します。
lines = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics,kafkaParams)
);
その後、私はカフカ
lines.foreachRDD((JavaRDD<ConsumerRecord<String, String>> rdd) -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
OffsetRange[] range = new OffsetRange[1];
range[0] = o;
rdd.foreachPartition((Iterator<ConsumerRecord<String, String>> partitionOfRecords) -> {
// get kafka offset
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
// to cache line data
List<String> jsonData = new ArrayList<>();
// to read all line data
while (partitionOfRecords.hasNext()) {
ConsumerRecord<String, String> line = partitionOfRecords.next();
jsonData.add(line.value());
}
// TODO do my own bussiness from jsonData
.......
// HOW can I commit kafka Offset Here??
// this is a method to commit offset
((CanCommitOffsets) lines.inputDStream()).commitAsync(range)
});
});
からのデータを処理するために、このコードを使用して、私は何度も試してみる、私はそれはいくつかの問題を抱えているが見つかりました:
他のパーティションが失敗したときにデータ処理が成功したらどうしますか?それは私のすべてのデータプロセスが戻ってくるはずであるということですか?カフカオフセットはコミットしているため、
私はこのコードを実行していますが、実際に実行することがわかりました。次回このrdd executorが実行されたときに、進行中のoomが殺されるか、次回にKafkaから読み込まれるデータが2倍になる?
あなたのおかげでありがとうございます。私は何ができるのですか?一度だけデータを読むことができますか?私は毎回トランザクションを使用したくないからです。 – DreamHeaven
@DreamHeavenあなた自身でオフセットを保存し、リカバリ時にそれらをリロードする必要があります。私は、より良いストリーミングセマンティクスを提供するStructured Streamingを見てみることをお勧めします。 –