2017-12-22 18 views
0

Flink 1.4のリリースで、FlinkKafkaConsumer011にはトピックのための正規表現をread-rad関数btwに渡す機能があります。今、私は、単一のFlinkアプリケーションがBucketingSinkを使ってこれらのトピック(Avroメッセージ)のそれぞれをシンクしてs3内の場所を区切る方法があるかどうか疑問に思っています。例:複数のカフカトピックを異なるパスでs3にシンクします

s3://bucket/topic_1 
s3://bucket/topic_2 
s3://bucket/topic_3 
. 
. 
. 
s3://bucket/topic_n 

これを達成するためのあらゆる指針は非常に高く評価されます。

+0

このヘルプを確認してください。https://stackoverflow.com/questions/41473343/unable-to-write-to-s3-using-s3-sink-using-streamexecutionenvironment-apache-fl – soheil

答えて

0

ルートバケットは異なり、あなたは「ただ」のことができます。各トピックごとに異なるパスをしたい場合:

  1. は、メタデータから話題を取得し、カフカからあなたのメッセージをデシリアライズするとき、あなたの要素に保存し
  2. あなたの要素をフル・プロセスで渡します。
  3. BasePathBucketerのサブクラスを提供します。渡された要素のトピックを問い合せることで、getBucketPathメソッドを再定義してtopicを持つパスを戻します。
関連する問題