2017-11-11 7 views
0

私はScala 2.11とAkka Streams Kafka 0.17を使用しています。Akka Streams KafkaでProducerMessageを構築する際にオフセットパラメータを設定する方法は?

私はストリームあります

  • SourceSource.actorRefを使用して作成されます。ここでは、アクターは一定の間隔で実行され、連続してメッセージを生成し、ストリームに出力されます。
  • ProducerFlowとして添付しました。プロデューサーはProducerMessage.Messageをカフカのトピックにプッシュします。
  • 一部のDB操作。

のように見えた、ProducerMessage.Messageを構築しながら、私は問題を抱えている:私は簡単に実際のメッセージが含まれているrecordパラメータを渡すことができ

final case class Message[K, V, +PassThrough](
    record: ProducerRecord[K, V], 
    passThrough: PassThrough 
) 

。しかし、私はpassThroughパラメータで何を渡すべきかわかりません。 docsによれば:

passThroughフィールドは Consumer#flow通し、Resultに含まれる任意の要素を保持することができます。これは、ダウンストリーム操作でいくつかのコンテキストを渡す必要がある場合に便利な です。 これはunzip/zipで行うことができますが、これはより便利です。 は、例えば、または ConsumerMessage.CommittableOffsetBatchであり、後でコミットすることができます。 フロー。私の場合

にはカフカトピックにサブスクライブし、私のストリームのSourcecomittableSource又はplainSource)を生成する任意のカフカの消費者が存在しません。その場合、私はドキュメントに記載されているように消費者オフセットを渡してしまいます。しかし、私の場合、俳優はそのような消費者をシミュレートしています。つまり、ConsumerMessage.CommittableOffsetへのアクセス権がありません。では、ここでpassThroughパラメータのために何を渡しますか?この場合のベストプラクティスは何でしょうか?

答えて

0

reactive-kafkaチームで問題を転送した後、私は答えを得ました。基本的には、pass throughにユースケースがない場合は、なしに設定してみてください。または使用しない、または単に空の文字列""に設定してみてください。

おそらくProducer.plainSinkを使用する場合は、ProducerMessage.Messageを構築する必要はありません。その後、Kafka ProducerRecordを直接構築することができます。 ProducerMessage.Messageケースクラスは、pass throughが必要な場合や必要な場合の単なるコンテナです。通過する要素以外に、それはただKafka ProducerRecordを含んでいます。

関連する問題