私はSpark Streamingアプリケーションを構築していますが、私の要件は監視対象のディレクトリ内のすべての既存ファイルを読み込むことです。ファイルフィルタがSpark StreamingContext.fileStream(...)APIで動作しません
私はこのためにStreamingContext.fileStream(...)
APIを使用しています。このAPIは、フィルタ関数を渡すために必要です。私の場合、私はすべてのファイルを読む必要があるので、私はいつもtrue
を返しています。 newFilesOnly
フラグがStreamingContext.fileStream(...)
の場合はfalse
に設定されています。
は[ここAPI doc]
ません。しかし、どんなフィルタ機能が戻るかnewFilesOnly
フラグがDSTREAM対応で作成したRDDSは空で、に設定されています。
は、ここでは、コードスニペットです:私はフィルタ機能とnewFilesOnlyフラグからの戻り値のさまざまな組み合わせを試してみました
val ssc = new StreamingContext(sparkConf, Seconds(30))
val filterF = new Function[Path, Boolean] {
def apply(x: Path): Boolean = {
println("In File " + x.toString) //Prints exisitng file's path as expected
true
}
}
val strm = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3n://<bucket>/", filterF, false).map(_._2.toString)
strm.print() //DOESN'T PRINT ANYTHING
、何も働きました。
代わりにStreamingContext.textFileStream(...)
を使用すると正常に動作しますが、このAPIの動作に期待される新しいファイルのみが読み込まれます。
ここに何か不足していますか?どんな助けもありがとう。前もって感謝します!
この操作をどのように起動しますか。つまり、そのバケットに新しいファイルをコピーしますか? –