ここでお勧めします:Best Practices - Naming large TupleX types。私はデータストリームのためにタプルの代わりにPOJOを使用しています。Apache Flink:サブクラスタプルのデータストリームでwriteAsCsv()を使用できません
これは私のPOJOが定義する方法である:
public class PositionEvent extends Tuple8<Integer, String, Integer,
Integer, Integer, Integer, Integer, Integer>
私はcsvファイルへPositionEvent
のデータストリームを保存しようとすると、例外がスローされます。
source.filter((PositionEvent e) -> e.speed > MAXIMUM_SPEED)
.writeAsCsv(String.format("%s/%s", outputFolder, SPEED_RADAR_FILE))
例外スレッドに"main" java.lang.IllegalArgumentException:writeAsCsv()メソッドは、タプルのデータストリームでのみ使用できます。
私は明示的にTuple8にPositionEvent
をキャストする場合は、それが動作します:
source.filter((PositionEvent e) -> e.speed > MAXIMUM_SPEED)
.map((PositionEvent e) ->
(Tuple8<Integer, String, Integer, Integer,
Integer, Integer, Integer, Integer>) e)
.writeAsCsv(String.format("%s/%s", outputFolder, SPEED_RADAR_FILE))
はFLINKは、データストリーム内のオブジェクトがTuple
サブクラスであることを検出するべきではないでしょうか。
====================
編集:(おかげでtwalthrする)
[OK]を、これが今の私のPOJOです:
今、私は明示的に私のPOJOをキャストする必要はありませんpublic class PositionEvent extends Tuple8<Integer, String, Integer,
Integer, Integer, Integer, Integer, Integer> {
public int timestamp;
public String vid;
public int speed;
public int xway;
public int lane;
public int dir;
public int seg;
public int pos;
public PositionEvent() {
}
public PositionEvent(int timestamp, String vid, int speed, int xway,
int lane, int dir, int seg, int pos) {
super(timestamp, vid, speed, xway, lane, dir, seg, pos);
}
}
:
import org.apache.flink.api.java.tuple.Tuple8;
public class PositionEvent extends Tuple8<Integer, String, Integer,
Integer, Integer, Integer, Integer, Integer> {
public PositionEvent() {
}
public PositionEvent(int timestamp, String vid, int speed, int xway,
int lane, int dir, int seg, int pos) {
super(timestamp, vid, speed, xway, lane, dir, seg, pos);
}
public int getSpeed() {
return f2;
}
}
これは前に私のPOJOました。
ありがとうございました。それはうまくいったが、私は理由を理解できない。 POJOからすべてのメンバーを削除し、代わりにgetSpeed()を追加しました。 – fediazgon
私は、このドキュメントがこれに関してあまり具体的ではないことに同意します。 'TupleXX'クラスのフィールドが変更されない場合、タプルは有効なタプル型に過ぎません。それ以外の場合、オブジェクトはPOJO型になる追加のフィールドに対して異なるシリアル化ロジックを必要とします。 – twalthr