-1
私はFlinkを初めて利用しています。実際に私はflink java apiを介してファイルとcsv変換を読み込もうとしています。flink java apiを使ってディレクトリ下のファイル名(ローカルファイルシステム/ hdfs)を読む方法
私たちの要件に従ってください。
: a)は、ローカルファイルシステムからファイル/ HDFS C)私のコードをCSVために同じデータを読み書きする必要があります)csvファイル名 bと入力引数、出力引数としてフォルダを渡す必要があります
public class WriteToCSV {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
List<String> paths = new ArrayList<String>();
File dir = new File("C://");
for (File f : dir.listFiles()) {
paths.add(f.getName());
}
DataSet<String> data = env.fromCollection(paths).rebalance();
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
data.flatMap(new MySplitter()).groupBy(0).sum(1);
System.out.println(" data -:"+data);
data.print();
counts.writeAsCsv("C://new.csv");
}
}
class MySplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line into words
String[] tokens = value.split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
ファイル名(data.print())を取得できました。 csvは作成されず、サーバーコンソールにも例外はありません。
DataSetプログラムとDataStreamプログラムは、 'print()'動作が異なります。 DataSetプログラムは 'print()'が呼び出されると実行をトリガし、その結果をプログラムを送信したクライアントのstdoutに書き込みます。 DataStreamプログラムはプログラムを起動しません(これには 'execute()'が必要です)。そして、作業中のstdoutに出力します。 –
はい、私は知っていますが、 'print()'が 'writeAsCsv'の前に呼び出されているので、それはうまく印刷されていると思われますが、出力はcsvに書き込まれません。 –
ああ、そうです。申し訳ありませんが、私はあなたが正しい答えを読んでいませんでした。ありがとう! –