ScalaでSparkを使用しているKafkaコンシューマアプリケーションでメッセージを処理しています。カフカのメッセージキューからメッセージを処理するのに、通常よりも少し時間がかかることがあります。その時、私は最新のメッセージを消費する必要があります。これは、プロデューサーによって発行され、まだ消費されていないものは無視しています。ここでApache Kafka:カフカから最新のメッセージを受け取るには?
は私のコンシューマコードです:
object KafkaSparkConsumer extends MessageProcessor {
def main(args: scala.Array[String]): Unit = {
val properties = readProperties()
val streamConf = new SparkConf().setMaster("local[*]").setAppName("Kafka-Stream")
val ssc = new StreamingContext(streamConf, Seconds(1))
val group_id = Random.alphanumeric.take(4).mkString("dfhSfv")
val kafkaParams = Map("metadata.broker.list" -> properties.getProperty("broker_connection_str"),
"zookeeper.connect" -> properties.getProperty("zookeeper_connection_str"),
"group.id" -> group_id,
"auto.offset.reset" -> properties.getProperty("offset_reset"),
"zookeeper.session.timeout" -> properties.getProperty("zookeeper_timeout"))
val msgStream = KafkaUtils.createStream[scala.Array[Byte], String, DefaultDecoder, StringDecoder](
ssc,
kafkaParams,
Map("moved_object" -> 1),
StorageLevel.MEMORY_ONLY_SER
).map(_._2)
msgStream.foreachRDD { x =>
x.foreach {
msg => println("Message: "+msg)
processMessage(msg)
}
}
ssc.start()
ssc.awaitTermination()
}
}
は、消費者が常にコンシューマアプリケーションでの最新のメッセージを取得することを確認する方法はありますか?あるいは、同じようにKafkaの設定でプロパティを設定する必要がありますか?
この上の任意の助けいただければ幸いです。ありがとう
私はコンシューマアプリケーションの実行を開始するたびにランダムなグループIDを生成しています。この方法で最新のメッセージが得られますが、処理に時間がかかる場合は、必要のない古いメッセージを処理し続けることになります。 – Arjun