2017-05-25 7 views
1

私はFLINK に新しいです、私は変換がこのFLINKストリーミング時間ウィンドウにファイル(CSVまたはテキスト)を作成

val supportTask= customSource 

    .map(line => line.split(",")) 
    .map(line => SupportTaskNew(line(0)toInt,line(1).toString,line(2)toString,line(3)toLong,line(4).toString,line(5)toInt,line(6)toInt)) 
    .filter(_ => true) //todo put sent date condition 
    .map(line => Count(1)) 
    .keyBy(0) 
    .timeWindow(Time.seconds(20)) //todo for time being 10 seconds, actuals 30 min 
.sum(0) 

のように仮定してい今、私は各20秒の時間ウィンドウ

supportTask.writeAsText(("D://myfile_"+Calendar.getInstance().get(Calendar.SECOND)),WriteMode.NO_OVERWRITE).setParallelism(1) 
用のファイルを作成したいです

ファイル名に秒を付けたファイルを作成するたびに、ファイル名+秒を指定しました。

ここでは1つのファイルしか作成されませんが、20秒ごとに新しいファイルを作成したいと思います。

+0

** DataStream.writeUsingOutputFormat()API **を使用してください。 ** writeAsText **は、すべての出力レコードをパラメータで指定されたファイルに書き込みます。そのためには、特別な出力形式を実装する必要があります。 – David

答えて

2

恐らく、Bucketing File SinkとカスタムBucketerでこれを行うことができます。

関連する問題