私はScala 2.11とAkka Streams Kafka 0.17を使用しています。Akka Streams KafkaでProducerMessageを構築する際にオフセットパラメータを設定する方法は?
私はストリームあります
- が
Source
がSource.actorRef
を使用して作成されます。ここでは、アクターは一定の間隔で実行され、連続してメッセージを生成し、ストリームに出力されます。 Producer
をFlow
として添付しました。プロデューサーはProducerMessage.Message
をカフカのトピックにプッシュします。- 一部のDB操作。
のように見えた、ProducerMessage.Message
を構築しながら、私は問題を抱えている:私は簡単に実際のメッセージが含まれているrecord
パラメータを渡すことができ
final case class Message[K, V, +PassThrough](
record: ProducerRecord[K, V],
passThrough: PassThrough
)
。しかし、私はpassThrough
パラメータで何を渡すべきかわかりません。 docsによれば:
passThrough
フィールドはConsumer#flow
通し、Result
に含まれる任意の要素を保持することができます。これは、ダウンストリーム操作でいくつかのコンテキストを渡す必要がある場合に便利な です。 これはunzip/zipで行うことができますが、これはより便利です。 は、例えば、またはConsumerMessage.CommittableOffsetBatch
であり、後でコミットすることができます。 フロー。私の場合
にはカフカトピックにサブスクライブし、私のストリームのSource
(comittableSource
又はplainSource
)を生成する任意のカフカの消費者が存在しません。その場合、私はドキュメントに記載されているように消費者オフセットを渡してしまいます。しかし、私の場合、俳優はそのような消費者をシミュレートしています。つまり、ConsumerMessage.CommittableOffset
へのアクセス権がありません。では、ここでpassThrough
パラメータのために何を渡しますか?この場合のベストプラクティスは何でしょうか?