2
FlinkでLeftOuterJoinを実行しようとしています。 私はそれがここCoGroupFunctionで に行われるようleftOuterJoin自分自身を実装しようとしないでください:私はFlatJoinFunctionを使用しようとしていますhttps://gist.github.com/mxm/c2e9c459a9d82c18d789FlinkのLeftOuterJoin(JAVA API)
:
public static final class leftOuter implements FlatJoinFunction<Tuple3<String,String,String>, Tuple2<String,String>, Tuple2<String,String>>{
@Override
public void join(Tuple3<String, String, String> in1,
Tuple2<String, String> in2,
Collector<Tuple2<String, String>> out) throws Exception {
// TODO Auto-generated method stub
out.collect(new Tuple2<String,String>(in1.f0, in2.f1 == null ? "null" : in2.f1));
}
}
私は次のようにこの関数を呼び出す:
input1.leftOuterJoin(input2).where(0)
.equalTo(1)
.with(new leftOuter());
残念ながら、私はout.collect行でNullPointerExceptionを取得します。
ありがとうございました!
ありがとうございました!私はそれが働いたと思う(結果を確認する必要がある)。 コードは http://pastebin.com/SjB27Qyt になります。 – SevenOfNine