2017-06-20 18 views
2

Confluent Kafka Connectを使用して単純なオブジェクトをdbに挿入することができます。このサポートを複雑なjson/schema構造にする方法が不明です。この機能が使用可能かどうかはわかりません。同様の質問hereが約1年前に尋ねられましたが、今までは答えられていません。助けてください。複雑なまたは入れ子になったjson /スキーマはコンフルエントなKafka Connectでサポートされています

答えて

2

Kafka Connectは、Struct,Map、およびArrayを含む複雑な構造をサポートしています。シンクコネクタには値が渡され、単にシンクコネクタを使用する必要があるため、一般にソースコネクタのみがこれを行う必要があります。 This documentationは、Structを記述するSchemaオブジェクトを構築し、そのスキーマに準拠するStructインスタンスを作成するための基本を記述しています。この場合、構造体の例はフラットな構造に過ぎません。

ただし、別のSchemaインスタンスで定義されたタイプStructのフィールドを簡単に追加できます。実際に、それはちょうどあなたの構造体には複数のレベルにこの単純なパターンを重ねています:SchemaBuilderは流暢なAPIがある

Schema addressSchema = SchemaBuilder.struct().name(ADDRESS) 
    .field("number", Schema.INT16_SCHEMA) 
    .field("street", Schema.STRING_SCHEMA) 
    .field("city", Schema.STRING_SCHEMA) 
    .build(); 
Schema personSchema = SchemaBuilder.struct().name(NAME) 
    .field("name", Schema.STRING_SCHEMA) 
    .field("age", Schema.INT8_SCHEMA) 
    .field("admin", new SchemaBuilder.boolean().defaultValue(false).build()) 
    .field("address", addressSchema) 
    .build(); 

Struct addressStruct = new Struct(addressSchema) 
    .put("number", 100) 
    .put("street", "Main Street") 
    .put("city", "Springfield") 
    .build(); 
Struct personStruct = new Struct(personSchema) 
    .put("name", "Barbara Liskov") 
    .put("age", 75) 
    .put("address", addressStruct) 
    .build(); 

ので、あなたは実際にはカスタムadminブールスキーマビルダーのようにそれを埋め込むことができます。しかし、それはaddressStructを作成するためにSchemaを参照する必要があるため、少し難しくなります。

一般に、ソースコネクタを書き込むときにこれを行う方法を心配する必要があります。 に既存のソースコネクタを使用しようとすると、キーと値の構造をほとんど制御できません。たとえば、Confluent's JDBC source connectorは、別々のSchemaとそのテーブルの各行を別々のStruct(そのスキーマを使用する)として各テーブルをモデリングしています。しかし、行はフラットなので、SchemaStructにはプリミティブ型のフィールドしか含まれません。 MySQLPostgreSQLため

Debezium's CDCコネクタもSchemaとリレーショナルテーブルをモデル化し、行ごとStructオブジェクトに対応するが、CDCは、前および/または変更後の行の状態として行に関する詳細情報を取り込みます。したがって、これらのコネクタは、ネストされたStructオブジェクトを含むテーブルごとにa more complex Schemaを使用します。

各ソースコネクタは独自のメッセージ構造を持っていますが、Kafka ConnectのSingle Message Transforms (SMTs)は、フィルタリング、名前変更、およびソースコネクタによって生成されたメッセージをKafkaに書き込む前に、 、またはシンクコネクタに送信される前にKafkaから読み取られたメッセージに適用されます。

関連する問題