2

話はこれのようです。私はカフカブローカーと特定のオブジェクト(私は自分のトピックを送信するためにjsonify)を持っています。私はキーとして使用したいIDを持っています。Spring Cloud Streamsはメッセージにカフカキーを設定していませんか?

現在、私はIDを抽出してキーとして返すクラスを設定するために 'partitionKeyExtractorClass'設定を使用しています。

それは次のようになります。

def extractKey(Message<?> message) { 
    log.info('Extracting key from message') 
    String id = new JsonSlurper().parseText(new String(message.payload)).properties.id 
    log.info("Got = ${id}") 

    return id 
} 

私の実際の問題は

はこれです...私は、トピックにメッセージを閲覧したときに私のメッセージを保持しているConsumerRecordキーがnullであると言うことですバグ?私は何か間違っているのですか?これに関する文書化はそれ以上のことではありません。

答えて

1

あなたはpartitionkeyを混合しています。

現在、KafkaMessageChannelBinderkeyMessageに対して決定するオプションを提供していません。

あなたが強力に使用できる唯一の既存の機能はKafkaHeaders.MESSAGE_KEY次のとおりです。だから、

Object messageKey = this.messageKeyExpression != null 
      ? this.messageKeyExpression.getValue(this.evaluationContext, message) 
      : message.getHeaders().get(KafkaHeaders.MESSAGE_KEY); 

outputメッセージの前に、あなたが鍵を計算する必要がありますし、そのヘッダにそれを置きます。

+1

これは容認できる解決策ではありません。私は同じ問題があります。私はrawモードの出力を使用します。私は自分のメッセージに見出しは入れたくありませんが、キーを設定する必要があります。私はapache camelに切り替える必要はありません。 –

+0

あなたの懸念は明らかではありません。これは、現在のKafkaバインダーの実装では、mesageKeyが省略されているだけの制限です。しかし、問題を記入してください。 Kafkaバインダーを設定するだけで、あまり気にしていれば、今のところそのヘッダーをマップしないでください。そして、私は、Apache CamelがSpring Cloud Streamをどのように置き換えることができるのか絶対に理解していません。 –

+1

問題は、Spring Cloud Streams以外のストリームを他のプロデューサとコンシューマと統合することです。カフカのトピックの注文を保証するには、メッセージキーが重要です。 Spring Cloud Streamsのコントロールヘッダーとは、他のすべてのプロデューサとコンシューマがSpring Cloud Streamsの特定のヘッダーを処理する必要があることを意味します。私は、カフカのメッセージが、私が欲しいものと正確に一致する必要があり、Spring Cloud Streamsが望んでいるものではありません。シンプルなキーと文字列メッセージが必要です。生のモードではそれができますが、注文を維持する鍵はありません。 –

関連する問題