2016-06-16 5 views
0

私はSpark Streamingアプリケーションを構築していますが、私の要件は監視対象のディレクトリ内のすべての既存ファイルを読み込むことです。ファイルフィルタがSpark StreamingContext.fileStream(...)APIで動作しません

私はこのためにStreamingContext.fileStream(...) APIを使用しています。このAPIは、フィルタ関数を渡すために必要です。私の場合、私はすべてのファイルを読む必要があるので、私はいつもtrueを返しています。 newFilesOnlyフラグがStreamingContext.fileStream(...)の場合はfalseに設定されています。

は[ここAPI doc]

ません。しかし、どんなフィルタ機能が戻るかnewFilesOnlyフラグがDSTREAM対応で作成したRDDSは空で、に設定されています。

は、ここでは、コードスニペットです:私はフィルタ機能とnewFilesOnlyフラグからの戻り値のさまざまな組み合わせを試してみました

val ssc = new StreamingContext(sparkConf, Seconds(30)) 
val filterF = new Function[Path, Boolean] { 
    def apply(x: Path): Boolean = { 
     println("In File " + x.toString) //Prints exisitng file's path as expected 
     true 
    } 
} 
val strm = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3n://<bucket>/", filterF, false).map(_._2.toString) 
strm.print()  //DOESN'T PRINT ANYTHING 

、何も働きました。

代わりにStreamingContext.textFileStream(...)を使用すると正常に動作しますが、このAPIの動作に期待される新しいファイルのみが読み込まれます。

ここに何か不足していますか?どんな助けもありがとう。前もって感謝します!

+0

この操作をどのように起動しますか。つまり、そのバケットに新しいファイルをコピーしますか? –

答えて

2

FileInputDStreamの無視ウィンドウを大きくして解決しました。これは、spark.streaming.fileStream.minRememberDurationプロパティを変更することで実行できます。 デフォルト値は1分です。テストしたすべてのファイルの変更時間が1分よりも古いので無視されます。 詳細については、コードのマニュアルhereを参照してください。

関連する問題