1

ScalaでSparkを使用しているKafkaコンシューマアプリケーションでメッセージを処理しています。カフカのメッセージキューからメッセージを処理するのに、通常よりも少し時間がかかることがあります。その時、私は最新のメッセージを消費する必要があります。これは、プロデューサーによって発行され、まだ消費されていないものは無視しています。ここでApache Kafka:カフカから最新のメッセージを受け取るには?

は私のコンシューマコードです:

object KafkaSparkConsumer extends MessageProcessor { 

def main(args: scala.Array[String]): Unit = { 
    val properties = readProperties() 

    val streamConf = new SparkConf().setMaster("local[*]").setAppName("Kafka-Stream") 
    val ssc = new StreamingContext(streamConf, Seconds(1)) 

    val group_id = Random.alphanumeric.take(4).mkString("dfhSfv") 
    val kafkaParams = Map("metadata.broker.list"   -> properties.getProperty("broker_connection_str"), 
         "zookeeper.connect"    -> properties.getProperty("zookeeper_connection_str"), 
         "group.id"      -> group_id, 
         "auto.offset.reset"    -> properties.getProperty("offset_reset"), 
         "zookeeper.session.timeout"  -> properties.getProperty("zookeeper_timeout")) 

    val msgStream = KafkaUtils.createStream[scala.Array[Byte], String, DefaultDecoder, StringDecoder](
         ssc, 
         kafkaParams, 
         Map("moved_object" -> 1), 
         StorageLevel.MEMORY_ONLY_SER 
        ).map(_._2) 

    msgStream.foreachRDD { x => 
    x.foreach { 
     msg => println("Message: "+msg) 
     processMessage(msg) 
    }  
    }       
    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

は、消費者が常にコンシューマアプリケーションでの最新のメッセージを取得することを確認する方法はありますか?あるいは、同じようにKafkaの設定でプロパティを設定する必要がありますか?

この上の任意の助けいただければ幸いです。ありがとう

答えて

0

カフカに接続するときに常に新しい(ランダムな)グループIDを生成することができます。これにより、接続時に新しいメッセージを消費するようになります。

+0

私はコンシューマアプリケーションの実行を開始するたびにランダムなグループIDを生成しています。この方法で最新のメッセージが得られますが、処理に時間がかかる場合は、必要のない古いメッセージを処理し続けることになります。 – Arjun

2

カフカのコンシューマAPIだから方法

void seekToEnd(Collection<TopicPartition> partitions) 

が含まれ、あなたが消費者から割り当てられたパーティションを取得し、最後にそれらのすべてを求めることができます。 seekToBeginningと同様のメソッドがあります。

+0

質問の説明に自分のコードを掲載しました。あなたはそれを見て、この方法をどこに追加するかを教えてください。回答ありがとうございました:) – Arjun

+0

auto.offset.resetにはどのような価値がありますか? – Natalia

+0

最大のオフセットに自動的に値をリセットする '最大値'に設定されています。 – Arjun

0

あなたは、パーティションからの非常に最後のメッセージを取得するために2つのKafkaConsumer APIを利用することができます(ログの圧縮を仮定することは問題ではありません):

  1. public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions):これはあなたに与えられたパーティションの終了オフセットを与えます。終了オフセットは、次に配信されるメッセージのオフセットです。
  2. public void seek(TopicPartition partition, long offset):パーティションごとにこれを実行し、上の呼び出しから1を引いた終了オフセット(0より大きいと仮定します)を指定します。
+0

質問の説明に自分のコードを掲載しました。あなたはそれを見て、この方法をどこに追加するかを教えてください。回答ありがとうございました:) – Arjun

0

はい、staringOffsetをlatestに設定すると、最新のメッセージを消費することができます。

val spark = SparkSession 
    .builder 
    .appName("kafka-reading") 
    .getOrCreate() 

import spark.implicits._ 
val df = spark 
     .readStream 
     .format("kafka") 
     .option("kafka.bootstrap.servers", "localhost:9092") 
     .option("startingOffsets", "latest") 
     .option("subscribe", topicName) 
     .load() 
+0

質問の説明に自分のコードを掲載しました。それを見て、あなたのコードをどこに追加するかを教えてください。回答ありがとう: – Arjun

+0

kafkaParamsを定義するときにこのプロパティを追加する必要があります。 consumer.forcefromstart = false詳細については、コンシューマーのプロパティを参照してください。 https://github.com/dibbhatt/kafka-spark-consumer –

+0

よろしくお願いします。私はそれをチェックするには数日が必要です。私はあなたに戻ってきます。ありがとうございました。 – Arjun

関連する問題