0

私のシナリオでは、CSVファイルをHDFSにアップロードしています。最終的に書き込みが完了したら、新しいファイルをHDFSディレクトリでどのように処理するのですか?

新しいファイルがアップロードされるとすぐに、新しいファイルをSpark SQLで処理したいとします(たとえば、ファイル内のフィールドの最大値を計算し、ファイルをparquetに変換する)。つまり、各入力ファイルと変換/処理された出力ファイルとの間に1対1のマッピングがあります。

私はSpark Streamingを評価してHDFSディレクトリを聞き、Sparkで「ストリームファイル」を処理していました。

しかし、ファイル全体を処理するには、「ファイルストリーム」がいつ完了するかを知る必要があります。ファイル間のエンドツーエンドの1対1マッピングを維持するために、ファイル全体に変換を適用したいと思います。

マイクロバッチではなくファイル全体をどのように変換できますか?

私が知る限り、Spark Streamingはバッチ(はRDDsにマップされています)への変換を適用することができ、一度に(有限ストリームが完了した時点で)ファイル全体には適用できません。

これは間違いありませんか?もしそうなら、私のシナリオではどのような代替案を検討すべきですか?

+0

ファイルがSpark Streamingで取得される前にHDFSに完全に書き込まれているため、この問題は理解できません。 –

+0

@ cricket_007意味を明確にすることはできますか? –

答えて

1

私は私の知る限りでは...最初の試み

をあなたの質問を誤解している可能性があり、ストリーミングスパークのみにマッピングされたバッチ(DStreamsに変換を適用することができますRDD)であり、一度に(有限ストリームが完了したときに)ファイル全体には適用されません。

これは間違いありませんか?

いいえではありません。です。

スパークストリーミングは、スパークストリーミングのバッチ間隔が経過した時点でHDFSに書き込まれたとおりにファイル全体に一度に変換を適用します。

スパークストリーミングは、ファイルの現在の内容を取り込み、処理を開始します。すぐに新しいファイルがアップロードされますように私は、スパーク/ SparkSQL

で新しいファイルを処理する必要がほとんど不可能スパークと原因からある程度の時間を要するそのアーキテクチャに


瞬間が「アップロードされて」Sparkがそれを処理します。

新品と光沢のあるStructured Streamingまたは(すぐに廃止された)Spark Streamingを使用することを検討してください。

両方のソリューションでは、新しいファイルのディレクトリを監視し、新しいファイルがアップロードされるとSparkジョブをトリガーすることができます(まさにあなたのユースケースです)。引用

構造化されたストリーミングのInput Sources:スパーク2.0で

、いくつかの組み込みの源があります。

  • ファイルソース - ディレクトリに書き込まれたファイルをデータストリームとして読み込みます。サポートされているファイル形式は、text、csv、json、parquetです。最新のリスト、および各ファイル形式のサポートされているオプションについては、DataStreamReaderインターフェイスのドキュメントを参照してください。ファイルは、指定されたディレクトリにアトミックに配置する必要があります。このディレクトリは、ほとんどのファイルシステムでファイル移動操作で実現できます。

も参照ストリーミングのBasic Sourcesスパーク:

ソケットのほか、StreamingContextのAPIは、入力ソースとしてファイルからDStreamsを作成するためのメソッドを提供します。

ファイルが Streamsの:(つまり、HDFS、S3、NFSなど)でHDFSのAPIと互換性のある任意のファイルシステム上のファイルからデータを読み取るために、DSTREAMを作成することができるように:

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory) 

Spark Streamingは、ディレクトリdataDirectoryを監視し、そのディレクトリで作成されたファイル(サポートされていないネストされたディレクトリに書き込まれたファイル)を処理します。あなたの条件を与えられたものの

1つの警告:

私は、「ファイルストリーム」は完了したときに知っておく必要があるだろう

Sparkでこれをしないでください。

引用スパークストリーミングのBasic Sources再び:

  • ファイルがアトミックに移動またはデータディレクトリにそれらの名前を変更することにより、DATADIRECTORYで作成する必要があります。

  • 一度移動すると、ファイルを変更してはなりません。したがって、ファイルが継続的に追加されている場合、新しいデータは読み込まれません。

ラッピングアップ...あなたはファイルのみが完了してスパークを使用して処理するための準備ができているとき、スパークの腕時計というディレクトリにファイルを移動必要があります。これはSparkの範囲外です。

+0

あなたの答えをありがとう、btw私は私の質問の要点を述べる必要があります。ファイル全体を変換するにはどうしたらいいのですか?だから私は[引用符]を書いたので、「ファイルストリーム」がいつ完了するかを知る必要があります。ファイル間のエンドツーエンドの1対1マッピングを維持するために、ファイル全体に変換を適用する必要があります。 –

+0

@Andreaファイル全体を決定する要素を明確にする必要があります。 HDFSは「ファイルストリーム」を認識しません。書き込まれたファイルの各部分は*ファイル全体*として認識されます。 –

0

DFSInotifyEventInputStreamを使用してHadoopディレクトリを監視し、ファイル作成時にSparkジョブをプログラムで実行することができます。

この記事を参照してください: HDFS file watcher

+0

Spark Streamingはフォルダを見ることができます。そのクラスは不要です –

+0

Spark Streamingでファイルをファイルごとにどのように処理しますか?一度に2つのファイルを書き込むとどうなりますか? –

+0

ファイルごとに何を意味していますか? Spark Streamingは、ドキュメンテーション(他の回答にコピーされている)に記載されているように、*アトミックにターゲットディレクトリ*に移動したすべてのファイルをピックアップするので、2つのファイルは2つの別個のレコードとして扱われます。 –

関連する問題