2017-07-21 5 views
0

タプル内のさまざまな型を動的に宣言する方法はありますか?Flink:動的タプルサイズと型を宣言する

私は動的にタプルの列数を宣言するための方法を見つけた:

env.readCsvFile(filePath).tupleType(Tuple.getTupleClass(3))

しかし、どのようなタイプのパラメータを指定せずに、それはエラーとしてスロー:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Tuple needs to be parameterized by using generics.

私が欲しかったですタプル内のすべての要素を単純な0​​として使用する。次の作品は:

env.readCsvFile(filePath).types(String.class, String.class);

これはTuple2(String,String)タイプになります。しかし私の場合、私はcsvにいくつのデータの列があるのか​​分かりません。しかし、私はすべての列を文字列として読み込んでいます。 (私は最大25列の制限があることを理解する)

私もCsvInputFormatのサブタイプを指定することで、読んでみました:

env.readFile(new TupleCsvInputFormat(filePath,TypeInformation.of(String.class), filePath);

しかし、それはコンパイルすることができませんでした。私の場合にこれを使用する方法がわかりませんでした。 Tupleクラスを拡張して同じ(可能であれば)クラスを拡張する方法についても不明でした。 TypeHintは手前の列の数を知る必要があるようです。

私は他のenv.read...()メソッドについては分かりません。私はいくつか試しましたが、ignoreFirstLine()のようないくつかの方法は利用できませんでした。彼らはCsvReaderとだけ付属しています。

だから、誰かが親切に列の数は、(入力から渡された)任意であり、かつ簡単なStringとしてTupleの各要素を読み取ることなら、私はcsvファイルを読むための最良の方法を見つけ出すことができますか?

答えて

1

CSVファイルを読み取る独自の方法を書くことができます。たぶん、このような何か:

public static void main(String[] args) throws Exception { 
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
    int n = 3; // number of columns here 
    Class[] types = IntStream.range(0, n).mapToObj(i -> String.class).toArray(Class[]::new); 
    DataSet<Tuple> csv = readCsv(env, "filename.csv", types); 
    csv.print(); 
} 

private static DataSource<Tuple> readCsv(ExecutionEnvironment env, String filename, Class[] fieldTypes) { 
    TupleTypeInfo<Tuple> typeInfo = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(fieldTypes); 
    TupleCsvInputFormat<Tuple> inputFormat = new TupleCsvInputFormat<>(new Path(filename), typeInfo); 
    return new DataSource<>(env, inputFormat, typeInfo, Utils.getCallLocationName()); 
} 

注:このメソッドは、CsvReaderクラスのconfigureInputFormatメソッドを呼び出してスキップします。あなたがそれを必要とするなら、それをすることができます。

+1

私は 'mapToObj'の使い方が大好きです。それはきちんとしたトリックでした。ありがとう! :) – Vineet