私はストリームに単一のタイプのメッセージを書き込むKinesisプロデューサーを持っています。私は、このストリームを複数の全く異なるコンシューマアプリケーションで処理したいと考えています。したがって、特定のトピック/ストリームに対して1つのパブリッシャを持つパブ/サブ。また、各消費者がストリームに書き込まれたすべてのメッセージを処理するように、チェックポイントを利用したいと考えています。同じKinesisストリームの複数の異なるコンシューマー
最初は、すべてのコンシューマとプロデューサで同じApp Nameを使用していました。
com.amazonaws.services.kinesis.model.InvalidArgumentException:StartingSequenceNumber 49564236296344566565977952725717230439257668853369405442は*アカウントでPackageCreatedストリームにシャードshardId-000000000000にGetShardIteratorに使用しかし、私は、複数の消費者を開始したら、次のエラーを取得開始しました***********はこのストリームから来ていないため無効です。 (サービス:AmazonKinesis;ステータスコード:400;エラーコード:InvalidArgumentException、IDを要求します..)
これは、彼らが同じアプリケーション名を使用しているとして、消費者がチェックポイントと衝突しているためと考えられます。
チェックポインティングでpub/subを実行する唯一の方法は、各プロデューサにすべての可能なコンシューマについて知ってもらう必要があるコンシューマアプリケーションごとにストリームを持たせることです。これは私が望むより密接に結びついています。それは本当にただのキューです。
カフカは、消費者が自分のチェックポイントを完全にコントロールしているので、私が望むもの、つまりトピック/パーティションの任意の消費をサポートしているようです。チェックポイントでパブ/サブが必要な場合、私の唯一の選択肢はカフカ、または他の代替手段に移行することですか?
各消費者が同一であるマイRecordProcessorコード:
override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
log.trace("Received record(s) from kinesis")
for {
record <- processRecordsInput.getRecords
json <- jawn.parseByteBuffer(record.getData).toOption
msg <- decode[T](json.toString).toOption
} yield subscriber ! msg
processRecordsInput.getCheckpointer.checkpoint()
}
コードは、メッセージを解析し、加入者にそれを送信します。今のところ、すべてのメッセージを正常に受信したとマークしています。 AWS Kinesisダッシュボードでメッセージが送信されているのを見ることができますが、各アプリケーションに独自のAppNameがあり他のメッセージは表示されないため、読み取りは行われません。
:
はこれに最初の応答を確認してください。 Shardイテレータを取得してレコードを読み取る各コンシューマのコードを表示できますか? 基本的に、「キネシス」はまったく同じ目的のために作られています。私は4つの 'Lambda'を持っています。それぞれの処理レコードは異なっていて、まったく同じレコードを消費しています。 – johni
@ johni、私はレコードを解析するために使用しているコードを追加しました。 – CalumMcCall