5
Spark Direct Streamを使用して、カフカの特定のメッセージのオフセットを取得して保存しようとしています。 Sparkのドキュメントは、各パーティションの範囲オフセットを取得するのは簡単ですが、必要なのはキューのフルスキャン後にトピックの各メッセージの開始オフセットを格納することです。Kafka + SparkStreamingで特定のメッセージオフセットを取得することはできますか?
Spark Direct Streamを使用して、カフカの特定のメッセージのオフセットを取得して保存しようとしています。 Sparkのドキュメントは、各パーティションの範囲オフセットを取得するのは簡単ですが、必要なのはキューのフルスキャン後にトピックの各メッセージの開始オフセットを格納することです。Kafka + SparkStreamingで特定のメッセージオフセットを取得することはできますか?
はい、MessageAndMetadataバージョンcreateDirectStream
を使用すると、message metadata
にアクセスできます。
tuple3
のDstreamを返す例があります。上記の例で
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaParams = Map[String, String]("metadata.broker.list" -> (kafkaBroker))
var fromOffsets = Map[TopicAndPartition, Long]()
val topicAndPartition: TopicAndPartition = new TopicAndPartition(kafkaTopic.trim, 0)
val topicAndPartition1: TopicAndPartition = new TopicAndPartition(kafkaTopic1.trim, 0)
fromOffsets += (topicAndPartition -> inputOffset)
fromOffsets += (topicAndPartition1 -> inputOffset1)
val messagesDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple3[String, Long, String]](ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic ,mmd.offset, mmd.message().toString)
})
tuple3._1
はtopic
、tuple3._2
はoffset
とtuple3._3
はmessage
を有している必要がありますがあります。
希望すると便利です。
私がこのように正しいとすれば、特定のオフセットから読み取ることができます。パーティション内の各メッセージの開始オフセットを計算する簡単な方法があるかどうかは疑問です。私が必要とするのは、各メッセージのオフセットを格納し、このコードを使用して特定のメッセージを読み取ることです。ありがとうございました! –
はい、正しくありましたが、上記のコードでは、 'messagesDStream'の各メッセージに関連するオフセットを取得します。 'createDirectStream'は' Tuple3'の 'Dstream'を与え、' tuple3'の各タプルに 'topic-name'と' message'とそれに関連する 'offset'を得ます。 – avr
こんにちは、遅い応答には申し訳ありません..働いています。しかし、私は、 "fromOffset"がパーティションをスキャンする開始オフセットであると仮定しています。ありがとうございます。 –