Nifi/HDFを使用してMS SQLからデルタレコードを読み取るためにどのように、これらのテーブルは、毎秒更新を選択し、内側を想定してみましょう多かれ少なかれ見て、この私はMS SQLでのいくつかのテーブルを持っている
SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID
WHERE table2.UpdateTime >= ${lastUpdateTime} AND table2.G_ID > ${lastID}
のようなクエリを取得します次のようにクエリ結果5レコードを結合します。
クエリが初めて実行される場合、${lastUpdateTime}
と${lastG_ID}
は0に設定され、5つのレコード以下に戻ります。レコードを処理した後、クエリはmax(G_ID)
つまり5とmax(UpdateTime)
、つまり1512010479をetl_stat
テーブルに格納します。以下に示すようなテーブルは、別の5件の新しいレコードを追加する場合
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
1 1512010470 12591225 DUMMY_DATA DUMMY_ID
2 1512096873 12591538 DUMMY_DATA DUMMY_ID
3 1512096875 12591539 DUMMY_DATA DUMMY_ID
4 1512010477 12591226 DUMMY_DATA DUMMY_ID
5 1512010479 12591227 DUMMY_DATA DUMMY_ID
:
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
1 1512010470 12591225 DUMMY_DATA DUMMY_ID
2 1512096873 12591538 DUMMY_DATA DUMMY_ID
3 1512096875 12591539 DUMMY_DATA DUMMY_ID
4 1512010477 12591226 DUMMY_DATA DUMMY_ID
5 1512010479 12591227 DUMMY_DATA DUMMY_ID
6 1512010480 12591230 DUMMY_DATA DUMMY_ID
7 1512010485 12591231 DUMMY_DATA DUMMY_ID
8 1512010490 12591232 DUMMY_DATA DUMMY_ID
9 1512010493 12591233 DUMMY_DATA DUMMY_ID
10 1512010500 12591234 DUMMY_DATA DUMMY_ID
クエリが最初
etl_stat table
から
max(G_ID)
と
max(UpdateTime)
を読み取り、
SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5
を次のように、クエリをフレームう
ようにクエリが返します下記のような5つのデルタレコードのみです。
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
6 1512010480 12591230 DUMMY_DATA DUMMY_ID
7 1512010485 12591231 DUMMY_DATA DUMMY_ID
8 1512010490 12591232 DUMMY_DATA DUMMY_ID
9 1512010493 12591233 DUMMY_DATA DUMMY_ID
10 1512010500 12591234 DUMMY_DATA DUMMY_ID
だから、クエリはそれを実行するたびに最初etl_stat
テーブルからmax(G_ID)
とmax(UpdateTime)
を読み、上記のように選択し、内側には、クエリに参加するフレームとデルタの変更を取得する必要があります。
次のように私は上記のユースケースを実装しているSPARKのSQLに
を使用したアーキテクチャであり、AS:
1)JDBCスパークがetl_stat
テーブルからmax(G_ID)
とmax(UpdateTime)
を取得するためにフェニックスのテーブルを読み込みます。
2)JDBCスパークは、2インナークエリに参加するステップを実行セレクトインナ)はこのSELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5
3のようなクエリに参加JDBCスパークフレームMS SQLサーバプロセスからのデルタメッセージのHBaseに記録およびインサートを読み出します。
4)HBaseのに成功した挿入した後、スパークは、最新のG_ID
すなわち10とUpdateTime
すなわち1512010500.
5でetl_stat
テーブルを更新)このジョブは1分ごとに実行するようにスケジュールするcronとなっています。
私はNifiにこのユースケースを移動したいNIFI
を使用したアーキテクチャであるために、私は、MS SQL DBからレコードを読み、カフカにこのレコードを送信するためにNiFiを使用したいです。
Kafkaに正常に公開すると、NiFiはG_IDとUpdateTimeをデータベースに保存します。
メッセージがKafkaに届くと、スパークストリーミングはKafkaからのメッセージを読み、既存のビジネスロジックを使用してHBaseに保存します。
毎回、Nifiプロセッサはデルタレコードを取得してKafkaに発行するために、max(G_ID)
とmax(UpdateTime)
を使用してフレーム選択内部結合クエリを実行する必要があります。
Nifi/HDFを初めて使用しています。 Nifi/HDFを使用してこれを実装するには、あなたの助けと指導が必要です。このユースケースのソリューション/アーキテクチャがより優れている場合は、お勧めします。
このような長い投稿には申し訳ありません。
すぐにお返事ありがとうございます。この使用例では、Nifi/HDFを使用したソリューションが必要です。 – nilesh1212
誰かがNiFiを使ってこの問題を解決してくれたら、助けてください。 – nilesh1212
@daggett – nilesh1212