2016-10-13 13 views
2

FlinkでLeftOuterJoinを実行しようとしています。 私はそれがここCoGroupFunctionで に行われるようleftOuterJoin自分自身を実装しようとしないでください:私はFlatJoinFunctionを使用しようとしていますhttps://gist.github.com/mxm/c2e9c459a9d82c18d789FlinkのLeftOuterJoin(JAVA API)

public static final class leftOuter implements FlatJoinFunction<Tuple3<String,String,String>, Tuple2<String,String>, Tuple2<String,String>>{ 


    @Override 
    public void join(Tuple3<String, String, String> in1, 
      Tuple2<String, String> in2, 
      Collector<Tuple2<String, String>> out) throws Exception { 
     // TODO Auto-generated method stub 
     out.collect(new Tuple2<String,String>(in1.f0, in2.f1 == null ? "null" : in2.f1)); 

    } 

} 

私は次のようにこの関数を呼び出す:

 input1.leftOuterJoin(input2).where(0) 
      .equalTo(1) 
      .with(new leftOuter()); 

残念ながら、私はout.collect行でNullPointerExceptionを取得します。

ありがとうございました!

答えて

1

これは、左外部結合の予想される動作です。両方の入力、input1input2は、同じ持つレコードがキーに参加している場合join()は、デカルトの各要素のために呼び出され、

  1. :あなたのプログラム、左の外側には呼び出しJoinFunction 2のケースに参加考える

    このキーの製品input1左入力は、右入力(input2)に存在しないキーを持つレコードを有する場合

  2. join()は、右入力用input1nullこのキーを持つレコードごとに呼び出されます。

JoinFunctionin2 == nullの小切手を追加する必要があります。

+0

ありがとうございました!私はそれが働いたと思う(結果を確認する必要がある)。 コードは http://pastebin.com/SjB27Qyt になります。 – SevenOfNine

関連する問題