2017-02-07 5 views
1

私はNiFiを使い始めました。私は、Hiveにデータをロードするためのユースケースに取り組んでいます。私はCSVファイルを取得し、SplitTextを使用して着信フローファイルを複数のフローファイル(レコードごとに分割レコード)に分割します。次に、ConvertToAvroを使用して、分割されたCSVファイルをAVROファイルに変換します。その後、AVROファイルをHDFS内のディレクトリに配置し、ReplaceText + PutHiveQLプロセッサを使用して「LOAD DATA」コマンドをトリガします。NiFiを使ってHiveにデータをロードする最良の方法は何ですか?

(パーティションデータを取得するためにLOAD DATAは動的パーティショニングをサポートしていないため、レコードごとにファイルレコードを分割しています。流れは次のようになります。

GETFILE(CSV)--- SPLITTEXT(分割ライン数:1とヘッダー行数:1)--- ExtractText(使用正規表現のパーティションフィールドの値を取得しますそして

--- PutHiveQL PutHDFS(HDFSの場所への書き込み)--- のreplaceText(パーティション情報とLOADデータCMD)---スキーマの指定 ConvertToAvro()---)属性に割り当てます

T私は一度に各レコードにCSVファイルを分割しているので、あまりにも多くのavroファイルを生成します。たとえば、CSVファイルに100個のレコードがある場合、100個のAVROファイルが作成されます。パーティションの値を取得したいので、一度に1つずつレコードを分割する必要があります。私は何らかの方法があることを知りたいのですが、私たちはレコードをレコードごとに分割することなくこのことを達成できます。私はそれをバッチすることを意味します。私はこれにはまだ新しいので、私はまだこれを解読することはできません。これで私を助けてください。

PS:このユースケースを達成するための代替アプローチがある場合は、私に提案してください。

答えて

1

Avroレコードをパーティションの値に基づいてグループ化することを検討していますか?一意の値ごとに1つのAvroファイルですか?または、いくつかのLOAD DATAコマンドにパーティションの値が必要なだけです(すべてのレコードで単一のAvroファイルを使用します)。

前者の場合、1つのステップ(つまり、1つのCSVドキュメント)を解析、グループ化、集約、変換する必要があるため、カスタムプロセッサまたはExecuteScriptが必要になる可能性があります。後者の場合は、にあなたの流れを並べ替えることができます: - >ConvertCSVToAvro - >PutHDFS - >ConvertAvroToJSON - >SplitJson - >EvaluateJsonPath - >のreplaceText

GETFILE - >PutHiveQL

このフローでは、CSVファイル全体(単一のAvroファイル)をHDFSに格納し、その後にspl (EvaluateAvroPathプロセッサを持たないのでJSONに変換した後)、パーティションの値を取得し、Hive DDLステートメント(LOAD DATA)を生成します。

+0

私は実際にカスタムプロセッサを使用せずに最初のものを実行しました。 MergeContentを使ってバッチ処理を行った。それは今私が望むように動作します。しかし、私は第2の考えに興味を持っています。私はそれを試してみる。私は1つの質問がある。私たちはHDFSにAVROファイルを1つだけ持っているので、パーティションの値を読み込んだ後、LOAD DATAコマンドをトリガしたときに、レコードがどのようにすべてのパーティションにわたってHiveに移動するのかをLOADコマンドで確認します。 AVROファイルの一部を移動できますか?私は、単一のAVROが複数のAVROに分割され、対応するパーティションに配置されるような意味ですか? –

+0

すべてのパーティションが同じデータで終了するため、2番目の方法は機能しません。 PutHDFSの前にRouteOnAttributeとMergeContentを使用した場合は、正しい方法のように聞こえます。 – mattyb

+0

はい。 2つめはすべてを単一のパーティションに入れます。ちなみに、私はRouteOnAtrributeを使用していません。 MergeContentのCorrelation Attributeプロパティを使用してジョブを実行しただけです。とにかく、ありがとう。 –

関連する問題