2017-04-05 10 views
-1

私はFlinkを初めて利用しています。実際に私はflink java apiを介してファイルとcsv変換を読み込もうとしています。flink java apiを使ってディレクトリ下のファイル名(ローカルファイルシステム/ hdfs)を読む方法

私たちの要件に従ってください。

: a)は、ローカルファイルシステムからファイル/ HDFS C)

私のコードをCSVために同じデータを読み書きする必要があります)csvファイル名 bと入力引数、出力引数としてフォルダを渡す必要があります

public class WriteToCSV { 

    public static void main(String[] args) throws Exception { 
     final ParameterTool params = ParameterTool.fromArgs(args); 
     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
     env.getConfig().setGlobalJobParameters(params); 
     List<String> paths = new ArrayList<String>(); 
     File dir = new File("C://"); 
     for (File f : dir.listFiles()) { 
       paths.add(f.getName()); 
     } 
     DataSet<String> data = env.fromCollection(paths).rebalance(); 

     DataSet<Tuple2<String, Integer>> counts = 
        // split up the lines in pairs (2-tuples) containing: (word,1) 
        data.flatMap(new MySplitter()).groupBy(0).sum(1); 

     System.out.println(" data -:"+data); 
     data.print(); 
     counts.writeAsCsv("C://new.csv"); 
    } 
} 


class MySplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { 

    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { 
     // normalize and split the line into words 
     String[] tokens = value.split("\\W+"); 

     // emit the pairs 
     for (String token : tokens) { 
      if (token.length() > 0) { 
       out.collect(new Tuple2<String, Integer>(token, 1)); 
      } 
     } 
    } 
} 

ファイル名(data.print())を取得できました。 csvは作成されず、サーバーコンソールにも例外はありません。

答えて

1

何もあなたのコード内のCSVファイルに書き込まれていない理由は、あなたがさらにあなたがディレクトリへのパスを受け入れ、内のすべてのファイルを読み込みenv.readTextFile(path)を使用することができ、あなたのコードを向上させるためにcounts.writeAsCsv("C://new.csv");

env.execute()を呼び出さないということですそのディレクトリは各行のレコードを生成します。

+0

DataSetプログラムとDataStreamプログラムは、 'print()'動作が異なります。 DataSetプログラムは 'print()'が呼び出されると実行をトリガし、その結果をプログラムを送信したクライアントのstdoutに書き込みます。 DataStreamプログラムはプログラムを起動しません(これには 'execute()'が必要です)。そして、作業中のstdoutに出力します。 –

+0

はい、私は知っていますが、 'print()'が 'writeAsCsv'の前に呼び出されているので、それはうまく印刷されていると思われますが、出力はcsvに書き込まれません。 –

+0

ああ、そうです。申し訳ありませんが、私はあなたが正しい答えを読んでいませんでした。ありがとう! –

関連する問題