2017-11-30 4 views
0

JavaコードでKafkaプロデューサとしてDBデータを使用しようとしています。ソースデータは連続的に増加します(たとえば、1秒あたり20行)。新しいレコードがDBテーブルに挿入されるたびに、データ全体がDBから読み込まれ、カフカのトピックに追加されます。私は、新しく追加された行のみをトピックに送信したい(つまり、テーブルがすでに10行、さらに4行が追加されている場合は、その4行だけをトピックに送る必要がある)。Javaコードを使用してDBからKafka Producerにテーブルデータの差分を読み取るにはどうすればよいですか?

これをJavaで実現する方法はありますか?Kafka APIも使用できますか?

+0

さらに詳しい情報を投稿できますか?データベースからレコードを取得するために実行しているクエリのようなものですか? –

+0

表EMP_DETAILSを (EMPID番号、 ENAME VARCHAR2(100)、 DEPARTMENT_ID番号、 SALARY番号 JOB_ID VARCHAR2(3)、 HIREDATE日、 COMM番号)を作成します。 SELECT EMPID、ENAME、SALARY FROM \t EMP_DETAILS; – mannedear

答えて

2

データベースからKafkaトピックに変更を反映させるために、change-data-captureを使用するほうがずっと簡単です。これを自分で構築しようとすると、すでに完成しているホイールを改革しています;-)

ソースデータベースは何ですか?独自のRDBMS(Oracle、DB2、MS SQLなど)には、GoldenGate、Attunity、DBVisitなどの商用ツールがあります。オープンソースのRDBMS(MySQL、PostgreSQLなど)では、オープンソースのDebeziumツールを見てください。 これらのCDCツールはすべてKafkaと直接統合されています。

その他のオプションは、ユースケース、スケールなどによっては、JDBC Kafka Connect connectorを使用してデータベースから変更された行をプルするだけです。これはCDCほど柔軟ではなくスケーラブルではありませんが、データベースを自分でポーリングしようとするよりも便利で簡単です。

+0

ありがとうRobin !!! MS SQLServerとしてソースDBを使用しています。私はJDBCソースコネクタ上のリンクを通過しましたが、ターゲット結果を達成するためのロジックを取得できませんでした。 Eclipse IDEで使用できるJDBC Source Connectorロジックを実装するためのサンプルコードを提供できますか?私はまた、 "増分問合せモード"を "タイムスタンプと増分列"として使用することをお勧めします。 TIA – mannedear

+0

これはKafka Connectの要点です.Eclipse IDEを実行する必要はありません。設定ファイルを渡すだけです:)参考にしてください:https://www.confluent.io/blog/simplest-useful -kafka-connect-data-pipeline-world-thereabouts-part-1/ https://www.confluent.io/blog/blogthe-simplest-useful-kafka-connect-data-pipeline-in-the-world- or-thereabouts-part-2/ https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/ –

+0

このように質問して申し訳ありませんが、カフカや他のカフカ関連技術にはまったく新しいものです。可能であれば、希望の結果を達成するための詳細なステップバイステップアプローチを私に提供してください。 私が望むのは、タイムスタンプの列に基づいてJDBC Kafka Connectコネクタを使用してデータを引き出すことです。ソーステーブルの列を増やして新しい行と変更された行を特定します。 – mannedear

関連する問題