ファイルをそのディレクトリに表示するとすぐに、spark.readStreamを使用してディレクトリを継続的に監視し、CSVファイルを読み取るようにします。Spark Structured Streamingを使用してディレクトリを継続的に監視する方法
Spark Streamingのソリューションは含まないようにしてください。私はスパークストリーミングを使用してそれを行う方法を探しています。あなたは、「ファイル」のソースを使用する必要があり、公式documentationで書かれたよう
ファイルをそのディレクトリに表示するとすぐに、spark.readStreamを使用してディレクトリを継続的に監視し、CSVファイルを読み取るようにします。Spark Structured Streamingを使用してディレクトリを継続的に監視する方法
Spark Streamingのソリューションは含まないようにしてください。私はスパークストリーミングを使用してそれを行う方法を探しています。あなたは、「ファイル」のソースを使用する必要があり、公式documentationで書かれたよう
として新しいファイルを読み込みますここでは、このユースケースのための完全なソリューションです。
スタンドアローンモードで実行している場合。
bin/spark-shell --driver-memory 4G
ドライバ内でスタンドアロンモードのエグゼキュータを実行する場合と同様に、エグゼキュータのメモリを設定する必要はありません。 @ T.Gawedaのソリューションを完了として
、以下の解決策を見つける:
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema) // Specify schema of the csv files
.csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory")
csvDf.writeStream.format("console").option("truncate","false").start()
が今火花が連続とすぐに、ディレクトリ内の任意のcsvファイル、あなたのデータフレームの操作を追加すると、指定したディレクトリを監視しますそのファイルに対して "csvDF"が実行されます。
注:スパークは、あなたの火花セッションで
spark.sqlContext.setConf("spark.sql.streaming.schemaInference","true")
:あなたが最初に以下の構成を設定する必要がinferschemaにスパークしたい場合。
:
ファイルソース - データのストリームとしてディレクトリに書き込まれたファイルを読み込みます。サポートされているファイル形式は、text、csv、json、parquetです。最新のリスト、および各ファイル形式のサポートされているオプションについては、DataStreamReaderインターフェイスのドキュメントを参照してください。ファイルは、指定されたディレクトリにアトミックに配置する必要があります。このディレクトリは、ほとんどのファイルシステムでファイル移動操作で実現できます。ドキュメントから取ら
コード例:
// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema) // Specify schema of the csv files
.csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory")
あなたはトリガーを指定しない場合は、スパークはできるだけ早く
質問があります。 avroファイルも読み込むことができますか?これはGoogleのクラウドストレージをサポートしているのですか?つまり、私のgcsバケットに入ってくる新しいファイルを同様に処理したいのですか?この方法はフォールトトレラントですか?つまり、パイプラインに障害が発生した場合、どのように回復するか、どのファイルが処理されたか、どのファイルが新しいものかを知る方法ですか? – user179156
text/jsonの場合、私のストリーミングパイプラインが失敗した場合、新しいストリーミングパイプラインはどこからファイルを使用するのかを知っていますか? – user179156