2017-09-24 21 views
0

Kafka Connectを使用して、文字列として保存されたJSON値を実際のJSON構造に変換することができるかどうかを判断しようとしています。JSON文字列を実際のJSONに変換するKafka Connect

このような変換を探してみましたが、見つからなかった。例として、これは元のようになります。

{ 
    "UserID":2105058535, 
    "DocumentID":2105058535, 
    "RandomJSON":"{\"Tags\":[{\"TagID\":1,\"TagName\":\"Java\"},{\"TagID\":2,\"TagName\":\"Kafka\"}]}" 
} 

そして、これが私の目標です:

{ 
    "UserID":2105058535, 
    "DocumentID":2105058535, 
    "RandomJSON":{ 
    "Tags":[ 
     { 
     "TagID":1, 
     "TagName":"Java" 
     }, 
     { 
     "TagID":2, 
     "TagName":"Kafka" 
     } 
    ] 
    } 
} 

私はそれが違いを行う場合Elasticsearchシンクコネクタのこれらの変換を行うためにしようとしています。

これを行うためにLogstashをJSONフィルタと併用することができますが、私はKafka Connectを使用する方法があるかどうかを知りたいと思います。

答えて

2

このようなサウンドは、Single Message Transform(これはESだけでなくあらゆるコネクタにも適用可能です)ですが、あなたが記述していることをそのまま受けているものはありません。 APIはdocumented hereです。

+0

がそれを手に入れた、どうもありがとう!私はちょうどこれを自分で実装する必要があります。 :) –

+0

あなたがしたらPRを提出してください:-D –

+0

これは簡単ではないようです:私が正しく理解していれば、このダイナミックが必要な場合、私は何とかそのJSONを解析し、そのためのAvroスキーマを生成し、それを新しいフィールドとして配置する必要があります。 –

0

私にも同様の問題がありましたが、逆でした。私はJsonでデータを持っていたので、Cassandra Sinkを使ってCassandraに格納するために、その一部をJson文字列表現に変換する必要がありました。私はを終了しました。はトピックから読み込み、Jsonオブジェクトを別のトピックに出力します。これは、コネクタによって読み取られます。あなたが望むようシリアライズmapValuesを呼び出して、あなたのカフカストリームで読み取るか、ジャクソンPOJOを作成し、ために値を書き込む - -

トピック文書<>トピックdocument.elasticsearch

関連する問題