2016-09-09 3 views
0

プロデューサーがKafkaトピックに公開したメッセージの検証を行うためにインターセプターを追加しようとしています。私はカフカのトピックで実行されるスキーマの検証に加え、いくつかのバリデーションを行う必要があります。私が従った手順は次のとおりです。Kafkaプロデューサーインターセプター

  1. ProducerInterceptor Interfaceを拡張したJavaクラスを作成しました。
  2. クラスをコンパイルし、クラスパスに含まれるフォルダに配置されたjarファイルを作成しました。
  3. Kafkaインストール内のproducer.propertiesにintercetors.classes = classnameを追加しました。

しかし、トピックにメッセージを公開すると、私が書いたカスタムインターセプタクラスは呼び出されません。 (私は何のエラーも出ていません。メッセージはトピックに完全に公開されています)。

私はhaverこの上https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors

してくださいアドバイスを言及しました。

答えて

0

プロパティ名はinterceptor.classesが、この質問はかなり古いです

+0

返信いただきありがとうございます。はい、producer.propertiesのinterceptor.classesとして正しく記述されています。入力ミスを申し訳ありません。 –

0

をintercetors.classesではありませんので、私はあなたがその間に解決策を見つけたと仮定します。しかし、他の人を助ける場合には、メッセージの内容に基づいて異なるトピックにメッセージをディスパッチする私のProducerInterceptorクラスは、ストリームに指定された出力が既に存在しない限り呼び出されませんでした。

私が最初に試したのは、出力トピックを指定する必要がないと思ったからです。これは動作しません:

val builder: KStreamBuilder = new KStreamBuilder 
val input = builder.stream("input-topic") 

val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor) 
stream.start() 

しかし、この処理が行われます。

val builder: KStreamBuilder = new KStreamBuilder 
val input = builder.stream("input-topic").through("dummy-output-topic") 

val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor) 
stream.start() 

それは何も第二の例ではそのdummy-output-topicに公開しないされることを注目に値します、とto代わりのthroughを使用しても動作するように表示されていること同じ方法。

val builder: KStreamBuilder = new KStreamBuilder 
val input = builder.stream("input-topic") 
    .map(new CustomKeyValueMapper) 
    .through("dummy-output-topic") 

val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor) 
stream.start() 

私はこれらの例には、誰もが作業を支援願っています:私のコードは、実際より次のようになりますので、私の場合は

は、私は、さまざまなトピックにそれらをディスパッチするインターセプタを使用する前に、レコードを変更するmapを呼び出しましたProducerInterceptorさんは私と同じ間違いをしました。

関連する問題