0

JDBCシンクコネクタを使用して2つのkafkaトピックを読み込み、手動で作成した2つのOracleテーブルにアップサンプリングしようとしています。各テーブルには1つのプライマリキーがあり、アップモードで使用します。 1つのトピックだけを使用し、pk.fieldsでは1つのフィールドしか使用しないとコネクタが正常に機能しますが、各テーブルから複数の列を入力するとスキーマを認識できません。私は何かをお見逃ししますか?JDBCシンクコネクタ - kafka-connectを使用して複数のトピックから複数のテーブルにアップする

name=oracle_sink_prod 
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector 
tasks.max=1 
topics=KAFKA1011,JAFKA1011 
connection.url=URL 
connection.user=UID 
connection.password=PASSWD 
auto.create=false 
table.name.format=KAFKA1011,JAFKA1011 
pk.mode=record_value 
pk.fields= ID,COMPANY 
auto.evolve=true 
insert.mode=upsert 

//ID is pk of kafka1011 table and COMPANY is of other 

答えて

0

PKが異なる場合は、2つの異なるシンクコネクタを作成してください。同じKafka Connectワーカーで実行できます。

また、カフカメッセージ自体のキーを使用することもできます。詳細は、docを参照してください。これはスケーラビリティの高いオプションであり、メッセージが正しくシンクされてJDBC Sinkに流れるようにする必要があります。

+0

ありがとうたくさんのロビンが、将来的には話題の数が増え続けて100まで増え、100個のコネクタを管理するのは良い選択でしょうか?また、kafkaを使用してデータパイプラインをリアルタイムで作成する計画は、ソースが複数のOracleデータベースになり、シンクがOracleデータウェアハウスになるため、多くのコネクタを管理するのは難しくありませんか? –

+0

私は自分の答えを更新しました。また、 'record_key'オプションを使用して、カフカのメッセージが適切に設定されていることを確認することもできます。あなたはオラクルからデータを引き出すためにどのようなオプションを使用していますか? GoldenGateのようなツールでキーを指定できます。[SMT](https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/)を使用することもできます。 Connect自体でキーを定義する –

関連する問題