2016-08-26 29 views
2

私はストリームに単一のタイプのメッセージを書き込むKinesisプロデューサーを持っています。私は、このストリームを複数の全く異なるコンシューマアプリケーションで処理したいと考えています。したがって、特定のトピック/ストリームに対して1つのパブリッシャを持つパブ/サブ。また、各消費者がストリームに書き込まれたすべてのメッセージを処理するように、チェックポイントを利用したいと考えています。同じKinesisストリームの複数の異なるコンシューマー

最初は、すべてのコンシューマとプロデューサで同じApp Nameを使用していました。

com.amazonaws.services.kinesis.model.InvalidArgumentException:StartingSequenceNumber 49564236296344566565977952725717230439257668853369405442は*アカウントでPackageCreatedストリームにシャードshardId-000000000000にGetShardIteratorに使用しかし、私は、複数の消費者を開始したら、次のエラーを取得開始しました***********はこのストリームから来ていないため無効です。 (サービス:AmazonKinesis;ステータスコード:400;エラーコード:InvalidArgumentException、IDを要求します..)

これは、彼らが同じアプリケーション名を使用しているとして、消費者がチェックポイントと衝突しているためと考えられます。

チェックポインティングでpub/subを実行する唯一の方法は、各プロデューサにすべての可能なコンシューマについて知ってもらう必要があるコンシューマアプリケーションごとにストリームを持たせることです。これは私が望むより密接に結びついています。それは本当にただのキューです。

カフカは、消費者が自分のチェックポイントを完全にコントロールしているので、私が望むもの、つまりトピック/パーティションの任意の消費をサポートしているようです。チェックポイントでパブ/サブが必要な場合、私の唯一の選択肢はカフカ、または他の代替手段に移行することですか?

各消費者が同一であるマイRecordProcessorコード:

override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = { 
    log.trace("Received record(s) from kinesis") 
    for { 
    record <- processRecordsInput.getRecords 
    json <- jawn.parseByteBuffer(record.getData).toOption 
    msg <- decode[T](json.toString).toOption 
    } yield subscriber ! msg 
    processRecordsInput.getCheckpointer.checkpoint() 
} 

コードは、メッセージを解析し、加入者にそれを送信します。今のところ、すべてのメッセージを正常に受信したとマークしています。 AWS Kinesisダッシュボードでメッセージが送信されているのを見ることができますが、各アプリケーションに独自のAppNameがあり他のメッセージは表示されないため、読み取りは行われません。

+1

はこれに最初の応答を確認してください。 Shardイテレータを取得してレコードを読み取る各コンシューマのコードを表示できますか? 基本的に、「キネシス」はまったく同じ目的のために作られています。私は4つの 'Lambda'を持っています。それぞれの処理レコードは異なっていて、まったく同じレコードを消費しています。 – johni

+0

@ johni、私はレコードを解析するために使用しているコードを追加しました。 – CalumMcCall

答えて

3

キネシスのストリームから複数のコンシューマー&へのパブリッシャーのパターンがサポートされています。コンシューマごとに個別のストリームは必要ありません。

どうすればよいですか?すべての消費者に異なるアプリケーション名を付ける必要があります。こうすることで、ある消費者のチェックポイント情報が別の消費者のチェックポイント情報と衝突することはありません。私はこのエラーを取得する方法を理解するために苦労していhttps://forums.aws.amazon.com/message.jspa?messageID=554375

+0

さて、私の実装では別の場所で何か間違ったことをする必要があります。私が正しい道を歩んでいることを明確にするのに役立ってくれてありがとう。 – CalumMcCall

+0

同じアプリ内で複数のコンシューマーはどうですか?あなたは、同じストリームから同じサービス/アプリケーションのインスタンスのプールを読み込めますか? – MadHacker

関連する問題