私はApache Flinkチュートリアルに従ってTaxiRideイベントのストリームを整理しています。結果のストリームはコンソールに出力されます。そして今、私はcsvファイルに書きたいと思います。私はそれはコンパイルエラーにつながるDataSet<Tuple1<TaxiRide>> rides1 = filteredRides.writeAsCsv("/resources").setParallelism(1);
を作ってるんだときjava.lang.IllegalArgumentException: The writeAsCsv() method can only be used on data streams of tuples.
Apache Flink writeAsCsv()メソッドを使ってオブジェクトのタプルを書く
DataStreamSink<TaxiRide> rides = filteredRides.writeAsCsv("/resources").setParallelism(1);
:
// configure event-time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// get the taxi ride data stream
DataStream<TaxiRide> rides = env.addSource(
new TaxiRideSource(path, maxEventDelay, servingSpeedFactor));
DataStream<TaxiRide> filteredRides = rides
// filter out rides that do not start or stop in NYC
.filter(new RideCleansing.NYCFilter());
filteredRides.print();
は、私は、次が、エラーを取得しようとしました。
TaxiRideオブジェクトの清掃されたストリームをcsvファイルに書き込むにはどうすればよいですか?