2016-11-21 13 views
0

JSONのKafkaをプロデュース/コンシューマします。プロパティの下に使用してJSONでHDFSに保存:Kafka JsonConverterを使用してJSON形式のHDFSシンクを接続します。

key.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter=org.apache.kafka.connect.json.JsonConverter 
key.converter.schemas.enable=false 
value.converter.schemas.enable=false 

はプロデューサー:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \ 
     --data '{"schema": {"type": "boolean", "optional": false, "name": "bool", "version": 2, "doc": "the documentation", "parameters": {"foo": "bar" }}, "payload": true }' "http://localhost:8082/topics/test_hdfs_json" 

消費者:

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-hdfs/quickstart-hdfs.properties 

問題-1:

例外を取得
key.converter.schemas.enable=true 

value.converter.schemas.enable=true 

org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields 
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332) 

問題-2:二つの特性上有効では

すべての問題を投げされていませんが、データは、HDFS上で書かれていません。

どのような提案も高く評価されます。

おかげ

答えて

2

変換器は、データコネクタによって解釈され、HDFSに書き込まれるカフカトピックから翻訳される方法を指します。 HDFSコネクタは、avroまたは角形でHDFSへの書き込みのみをサポートしています。フォーマットをJSON hereに拡張する方法に関する情報があります。このような拡張を行う場合は、それをコネクタのオープンソースプロジェクトに提供することをお勧めします。 HDFSに書き込まれる入力JSON形式のメッセージについて

+0

感謝を設定してください! –

+0

@dawsawネイティブkafka connect apiを使用してそのような拡張機能が達成できるかどうか知っていますか? –

+0

すでにKafkaに同梱されているJsonConverterがあります。私はここでの質問は、HDFSコネクタの出力形式に固有のものだと思います。なぜなら、あなたの質問を正しく理解していれば、Connect自体で何もしないで、必ずコネクタを拡張するということです。 – dawsaw

0

、プロパティの下にあなたの提案のための

key.converter=org.apache.kafka.connect.storage.StringConverter 
value.converter=org.apache.kafka.connect.storage.StringConverter 
key.converter.schemas.enable=false 
value.converter.schemas.enable=false 
+0

Akshatをチェックします。ご意見ありがとうございます –

関連する問題