2017-11-27 12 views
0

ここでお勧めします:Best Practices - Naming large TupleX types。私はデータストリームのためにタプルの代わりにPOJOを使用しています。Apache Flink:サブクラスタプルのデータストリームでwriteAsCsv()を使用できません

これは私のPOJOが定義する方法である:

public class PositionEvent extends Tuple8<Integer, String, Integer, 
    Integer, Integer, Integer, Integer, Integer> 

私はcsvファイルへPositionEventのデータストリームを保存しようとすると、例外がスローされます。

source.filter((PositionEvent e) -> e.speed > MAXIMUM_SPEED) 
      .writeAsCsv(String.format("%s/%s", outputFolder, SPEED_RADAR_FILE)) 

例外スレッドに"main" java.lang.IllegalArgumentException:writeAsCsv()メソッドは、タプルのデータストリームでのみ使用できます。

私は明示的にTuple8にPositionEventをキャストする場合は、それが動作します:

source.filter((PositionEvent e) -> e.speed > MAXIMUM_SPEED) 
      .map((PositionEvent e) -> 
        (Tuple8<Integer, String, Integer, Integer, 
          Integer, Integer, Integer, Integer>) e) 
      .writeAsCsv(String.format("%s/%s", outputFolder, SPEED_RADAR_FILE)) 

はFLINKは、データストリーム内のオブジェクトがTupleサブクラスであることを検出するべきではないでしょうか。

====================

編集:(おかげでtwalthrする)

[OK]を、これが今の私のPOJOです:

今、私は明示的に私のPOJOをキャストする必要はありません

public class PositionEvent extends Tuple8<Integer, String, Integer, 
     Integer, Integer, Integer, Integer, Integer> { 

    public int timestamp; 

    public String vid; 

    public int speed; 

    public int xway; 

    public int lane; 

    public int dir; 

    public int seg; 

    public int pos; 

    public PositionEvent() { 
    } 

    public PositionEvent(int timestamp, String vid, int speed, int xway, 
         int lane, int dir, int seg, int pos) { 
     super(timestamp, vid, speed, xway, lane, dir, seg, pos); 
    } 
} 

import org.apache.flink.api.java.tuple.Tuple8; 

public class PositionEvent extends Tuple8<Integer, String, Integer, 
     Integer, Integer, Integer, Integer, Integer> { 

    public PositionEvent() { 
    } 

    public PositionEvent(int timestamp, String vid, int speed, int xway, 
         int lane, int dir, int seg, int pos) { 
     super(timestamp, vid, speed, xway, lane, dir, seg, pos); 
    } 

    public int getSpeed() { 
     return f2; 
    } 
} 

これは前に私のPOJOました。

答えて

2

Tuple8を拡張するだけでなく、e.speedのような追加フィールドも追加したようです。これは暗黙のうちにあなたの型をPOJOにします。フィールドに名前を付け、効率的なタプル型のままにするには、単純にゲッターを実装するだけですが、フィールドを追加しないでください。それ以外の場合は、単純にタプルの代わりにPOJOを使用できます。

Flink's Table & SQL APIを調べる価値があるかもしれません。すべてのタイプを自動的に処理することで開発を容易にすることを目指しています。

+0

ありがとうございました。それはうまくいったが、私は理由を理解できない。 POJOからすべてのメンバーを削除し、代わりにgetSpeed()を追加しました。 – fediazgon

+0

私は、このドキュメントがこれに関してあまり具体的ではないことに同意します。 'TupleXX'クラスのフィールドが変更されない場合、タプルは有効なタプル型に過ぎません。それ以外の場合、オブジェクトはPOJO型になる追加のフィールドに対して異なるシリアル化ロジックを必要とします。 – twalthr

関連する問題