2017-12-11 15 views
0

私はApache Flinkチュートリアルに従ってTaxiRideイベントのストリームを整理しています。結果のストリームはコンソールに出力されます。そして今、私はcsvファイルに書きたいと思います。私はそれはコンパイルエラーにつながるDataSet<Tuple1<TaxiRide>> rides1 = filteredRides.writeAsCsv("/resources").setParallelism(1);を作ってるんだときjava.lang.IllegalArgumentException: The writeAsCsv() method can only be used on data streams of tuples.Apache Flink writeAsCsv()メソッドを使ってオブジェクトのタプルを書く

DataStreamSink<TaxiRide> rides = filteredRides.writeAsCsv("/resources").setParallelism(1); 

 // configure event-time processing 
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
     // get the taxi ride data stream 
     DataStream<TaxiRide> rides = env.addSource(
       new TaxiRideSource(path, maxEventDelay, servingSpeedFactor)); 

     DataStream<TaxiRide> filteredRides = rides 
       // filter out rides that do not start or stop in NYC 
       .filter(new RideCleansing.NYCFilter()); 

     filteredRides.print(); 

は、私は、次が、エラーを取得しようとしました。

TaxiRideオブジェクトの清掃されたストリームをcsvファイルに書き込むにはどうすればよいですか?

答えて

1

DataStreamおよびDataSetは、混合できない別々のAPIに属します。したがって、コンパイルエラー。

エラーメッセージ "writeAsCsv()メソッドは、タプルのデータストリームでのみ使用できます。" DataStream<TaxiRide>オブジェクトをDataStreamのタプルに変換してCSVファイルとして書き込む必要があることを意味します。 これは単純なMapFunctionで行うことができます。

DataStream<Tuple9<Long, Boolean, DateTime, DateTime, Float, Float, Float, Float, Float, Short>> rideTuples = filteredRides 
    .map(new TupleConverter()); 

あなたがDataStreamrideTuplesを持っていたら、あなたはCSVファイルに書き込むことができ

class TupleConverter implements MapFunction<TaxiRide, Tuple9<Long, Boolean, DateTime, DateTime, Float, Float, Float, Float, Float, Short>> { 

    public Tuple9<Long, Boolean, DateTime, DateTime, Float, Float, Float, Float, Float, Short> map(TaxiRide ride) { 
    return Tuple9.of(ride.rideId, ride.isStart, ...); 
    } 
} 

のように定義されているTupleConverterで。

関連する問題