2017-12-01 11 views
0

Nifi/HDFを使用してMS SQLからデルタレコードを読み取るためにどのように、これらのテーブルは、毎秒更新を選択し、内側を想定してみましょう多かれ少なかれ見て、この私はMS SQLでのいくつかのテーブルを持っている

SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID 
WHERE table2.UpdateTime >= ${lastUpdateTime} AND table2.G_ID > ${lastID} 

のようなクエリを取得します次のようにクエリ結果5レコードを結合します。

クエリが初めて実行される場合、${lastUpdateTime}${lastG_ID}は0に設定され、5つのレコード以下に戻ります。レコードを処理した後、クエリはmax(G_ID)つまり5とmax(UpdateTime)、つまり1512010479をetl_statテーブルに格納します。以下に示すようなテーブルは、別の5件の新しいレコードを追加する場合

G_ID  UpdateTime ID   Name    T_NAME 
------------------------------------------------------------------- 
1   1512010470 12591225  DUMMY_DATA  DUMMY_ID  
2   1512096873 12591538  DUMMY_DATA  DUMMY_ID  
3   1512096875 12591539  DUMMY_DATA  DUMMY_ID  
4   1512010477 12591226  DUMMY_DATA  DUMMY_ID  
5   1512010479 12591227  DUMMY_DATA  DUMMY_ID  

G_ID  UpdateTime ID   Name    T_NAME 
------------------------------------------------------------------- 
1   1512010470 12591225  DUMMY_DATA  DUMMY_ID  
2   1512096873 12591538  DUMMY_DATA  DUMMY_ID  
3   1512096875 12591539  DUMMY_DATA  DUMMY_ID  
4   1512010477 12591226  DUMMY_DATA  DUMMY_ID  
5   1512010479 12591227  DUMMY_DATA  DUMMY_ID 
6   1512010480 12591230  DUMMY_DATA  DUMMY_ID 
7   1512010485 12591231  DUMMY_DATA  DUMMY_ID 
8   1512010490 12591232  DUMMY_DATA  DUMMY_ID 
9   1512010493 12591233  DUMMY_DATA  DUMMY_ID 
10   1512010500 12591234  DUMMY_DATA  DUMMY_ID 
クエリが最初 etl_stat tableから max(G_ID)max(UpdateTime)を読み取り、 SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5を次のように、クエリをフレームう

ようにクエリが返します下記のような5つのデルタレコードのみです。

G_ID  UpdateTime ID   Name    T_NAME 
------------------------------------------------------------------- 
6   1512010480 12591230  DUMMY_DATA  DUMMY_ID 
7   1512010485 12591231  DUMMY_DATA  DUMMY_ID 
8   1512010490 12591232  DUMMY_DATA  DUMMY_ID 
9   1512010493 12591233  DUMMY_DATA  DUMMY_ID 
10   1512010500 12591234  DUMMY_DATA  DUMMY_ID 

だから、クエリはそれを実行するたびに最初etl_statテーブルからmax(G_ID)max(UpdateTime)を読み、上記のように選択し、内側には、クエリに参加するフレームとデルタの変更を取得する必要があります。

次のように私は上記のユースケースを実装しているSPARKのSQLに

を使用したアーキテクチャであり、AS:

1)JDBCスパークがetl_statテーブルからmax(G_ID)max(UpdateTime)を取得するためにフェニックスのテーブルを読み込みます。

2)JDBCスパークは、2インナークエリに参加するステップを実行セレクトインナ)はこのSELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5

3のようなクエリに参加JDBCスパークフレームMS SQLサーバプロセスからのデルタメッセージのHBaseに記録およびインサートを読み出します。

4)HBaseのに成功した挿入した後、スパークは、最新のG_IDすなわち10とUpdateTimeすなわち1512010500.

5でetl_statテーブルを更新)このジョブは1分ごとに実行するようにスケジュールするcronとなっています。

私はNifiにこのユースケースを移動したいNIFI

を使用したアーキテクチャであるために、私は、MS SQL DBからレコードを読み、カフカにこのレコードを送信するためにNiFiを使用したいです。

Kafkaに正常に公開すると、NiFiはG_IDとUpdateTimeをデータベースに保存します。

メッセージがKafkaに届くと、スパークストリーミングはKafkaからのメッセージを読み、既存のビジネスロジックを使用してHBaseに保存します。

毎回、Nifiプロセッサはデルタレコードを取得してKafkaに発行するために、max(G_ID)max(UpdateTime)を使用してフレーム選択内部結合クエリを実行する必要があります。

Nifi/HDFを初めて使用しています。 Nifi/HDFを使用してこれを実装するには、あなたの助けと指導が必要です。このユースケースのソリューション/アーキテクチャがより優れている場合は、お勧めします。

このような長い投稿には申し訳ありません。

答えて

0

あなたが説明しているのは、JDBC Kafka Connect connectorの機能です。設定ファイルを設定し、ロードしてください。完了しました。 Kafka ConnectはApache Kafkaの一部です。余分なツールや技術が不要です。

また、適切なチェンジデータキャプチャ(CDC)を検討することもできます。独自のRDBMS(Oracle、DB2、MS SQLなど)には、GoldenGate、Attunity、DBVisitなどの商用ツールがあります。オープンソースのRDBMS(MySQL、PostgreSQLなど)では、オープンソースのDebeziumツールを見てください。 これらのCDCツールはすべてKafkaと直接統合されています。

+0

すぐにお返事ありがとうございます。この使用例では、Nifi/HDFを使用したソリューションが必要です。 – nilesh1212

+0

誰かがNiFiを使ってこの問題を解決してくれたら、助けてください。 – nilesh1212

+0

@daggett – nilesh1212

関連する問題