2016-12-23 6 views
1

私はカフカのトピックからメッセージを読んでいくつか更新するSpark 2.0.2の新しく構造化されたストリーミングを使用していますそれからカサンドラテーブル:kafkaからスパークストラクチャ構造のスチーム - チェックポイントからの再開後に最後のメッセージが再度処理される

val readStream = sparkSession.readStream 
    .format("kafka") 
    .option("subscribe", "maxwell") 
    .option("kafka.bootstrap.servers", "localhost:9092") 
    .load 
    .as[KafkaMessage] 
    .map(<transform KafkaMessage to Company>) 

val writeStream = readStream 
    .writeStream 
    .queryName("CompanyUpdatesInCassandra") 
    .foreach(new ForeachWriter[Company] { 
    def open(partitionId: Long, version: Long): Boolean = { 
     true 
    } 

    def process(company: Company): Unit = { 
     ... 
    } 

    def close(errorOrNull: Throwable): Unit = {} 
    } 
    .start 
    .awaitTermination 

私もsparkSession上のチェックポイントの位置( "spark.sql.streaming.checkpointLocation")を設定しました。これにより、ストリーミングアプリが再開するとすぐに停止している間に到着したメッセージを受け取ることができます。

しかし、このチェックポイントの場所を設定してから、エラーが発生しても正しく処理されていても、以前のバッチの最後のメッセージを一貫して処理することに気付きました。

ここで間違っているのは何ですか?これは非常に一般的な使用例のようです。

さらに詳しい情報:

ここでは関係ログ(トピック5876が正常に前のバッチで処理された最後のトピックである)を参照してください:

また
[INFO] 12:44:02.294 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Resuming streaming query, starting with batch 31 
[DEBUG] 12:44:02.297 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Found possibly uncommitted offsets {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5877)]} 
[DEBUG] 12:44:02.300 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Resuming with committed offsets: {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5876)]} 
[DEBUG] 12:44:02.301 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Stream running from {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5876)]} to {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5877)]} 
[INFO] 12:44:02.310 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: GetBatch called with start = Some([(maxwell-0,5876)]), end = [(maxwell-0,5877)] 
[INFO] 12:44:02.311 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Partitions added: Map() 
[DEBUG] 12:44:02.313 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: TopicPartitions: maxwell-0 
[DEBUG] 12:44:02.318 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Sorted executors: 
[INFO] 12:44:02.415 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(maxwell-0,5876,5877,None) 
[DEBUG] 12:44:02.467 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Retrieving data from KafkaSource[Subscribe[maxwell]]: Some([(maxwell-0,5876)]) -> [(maxwell-0,5877)] 
[DEBUG] 12:44:09.242 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Creating iterator for KafkaSourceRDDOffsetRange(maxwell-0,5876,5877,None) 
[INFO] 12:44:09.879 [Executor task launch worker-0] biz.meetmatch.streaming.CompanyUpdateListener$$anon$1: open (partitionId:0, version:31) 
[DEBUG] 12:44:09.880 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Get spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor maxwell-0 nextOffset -2 requested 5876 
[INFO] 12:44:09.881 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Initial fetch for maxwell-0 5876 
[DEBUG] 12:44:09.881 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Seeking to spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor maxwell-0 5876 
[DEBUG] 12:44:10.049 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Polled spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor [maxwell-0] 1 

を私はストリームを殺したときに、私が作ります確かに、データの損失を避けるために、正常に停止されています。新しいオフセットが発生した場合

sys.ShutdownHookThread 
{ 
    writeStream.stop 
    sparkSession.stop 
} 

答えて

3

は現在、構造化されたストリーミングは、状態をチェックポイント。したがって、説明したケースが予想されます。最後にコミットされたバッチは、リカバリ後に再処理される可能性があります。しかし、これは内部実装です。バッチをコミットするときにチェックポイントを実行すると、チェックポイントが失敗する可能性があり、シンクのForeachWriterもこのケースを処理する必要があるとします。

通常、シンクは常に冪等でなければなりません。

更新:Spark 2.2.0では、ストラクチャードストリーミングは、正常に完了した場合、回復後にバッチを再実行しません。

+0

です。私は、最後のバッチは、バッチをコミットしてからチェックポイントを実行するまでに何かが実際に間違った後にのみ再処理されるという印象を受けました。とにかく、ForeachWriterは等しくなければならないので、それは大きな問題ではありません。ありがとう! –

+1

今は、実際に内部を単純化しています(次のバッチを開始することによってバッチを完全なものとしてマークします)。私は将来、これを最適化する可能性が高いと思います。あなたが言ったように、あなたはまだ正確に一度のセマンティクスを気にしている場合、あなたはライターを無力にする必要があります。 –

関連する問題