2017-08-28 13 views
2

私はKafkaトピックから読み込むSpark(v2.2)バッチジョブを作成しています。スパークジョブはcronでスケジューリングしています。 非ベース時間ウィンドウはサポートされていないため、Spark Structured Streamingは使用できません。Apache Kafkaのオフセット管理Apache Spark Ba​​tchで

val df = spark 
     .read 
     .format("kafka") 
     .option("kafka.bootstrap.servers", "...") 
     .option("subscribe", s"kafka_topic") 

kafkaトピックが次のバッチジョブの開始位置から知るために、オフセットを設定する必要があります。どうやってやるの?

答えて

1

私はストリームを作成するためにKafkaUtilsを使用していると思いますが、これをパラメータとして渡すことができます。

val inputDStream = KafkaUtils.createDirectStream[String,String](ssc,PreferConsistent, 
          Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets)) 

これが役に立ちます。

+0

down vote accept バッチクエリの場合、最新のオフセットは許可されません。 https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html Spark Streamingに最後のクエリが途切れた場所から新しいクエリを再開する必要があります。 – ngi