2017-11-16 3 views
0

env.readCsvFile(location).pojoType(dynClass, arr);で指定された動的リターンタイプを生成するカスタムCSVリーダを作成します。dynClassはByteBuddyで作成され、arrは配列です列名の org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: POJO type 'com.me.dynamic.FlinkPojo$ByteBuddy$zQ9VllB1' expected but was 'com.me.dynamic.I'. Apache Flink:変換(マップ、リダクション、ジョイントなど)でダイナミックタイプを消費する方法

関数宣言は、基本型を指定します。私はに実行しています課題は、このランタイムエラーフラグメントである

public class PojoToTupleRichMapFunction extends RichMapFunction<I, O> implements ResultTypeQueryable { 

    Class tupleClass = null; 
    Class pojoClass = null; 
    Config.Schema schema = null; 
    transient List<Field> fields = null; 

    PojoToTupleRichMapFunction(DynDataSet dynSet) { 
     this.schema = dynSet.dataDef.schema; 
     // Create a map from pojo to tuple 
     this.tupleClass = O.getTupleClass(schema.columns.size()); 
     this.pojoClass = dynSet.recType; 

    } 

    @Override 
    public void open(Configuration parameters) { 
     fields = new ArrayList<>(schema.columns.size()); 
     for (int i = 0; i < schema.columns.size(); i++) { 
      try { 
       fields.add(pojoClass.getField(schema.columns.get(i).name)); 
      } catch (NoSuchFieldException | SecurityException ex) { 
       Logger.getLogger(PojoToTupleRichMapFunction.class.getName()).log(Level.SEVERE, null, ex); 
      } 
     } 
    } 

    @Override 
    public TupleTypeInfo getProducedType() { 
     // build list of types 
     List<BasicTypeInfo<?>> types = new ArrayList<>(schema.columns.size()); 
     for (int i = 0; i < schema.columns.size(); i++) { 
      BasicTypeInfo bt = null; 
      String typeName = schema.columns.get(i).type.getName(); 
      switch (typeName) { 
       case "java.lang.Integer": 
        bt = BasicTypeInfo.INT_TYPE_INFO; 
        break; 
       case "java.lang.String": 
        bt = BasicTypeInfo.STRING_TYPE_INFO; 
        break; 
       case "java.lang.Long": 
        bt = BasicTypeInfo.LONG_TYPE_INFO; 
        break; 
       case "java.lang.Short": 
        bt = BasicTypeInfo.SHORT_TYPE_INFO; 
        break; 
       default: 
        Logger.getLogger(Config.class.getName()).log(Level.SEVERE, "Unknown type: {0}", typeName); 

      } 
      types.add(bt); 
     } 
     return new TupleTypeInfo(tupleClass, types.toArray(new BasicTypeInfo[0])); 
    } 

    @Override 
    public O map(I pojo) throws Exception { 
     O ret; 
     ret = (O) tupleClass.newInstance(); 
     for (int i = 0; i < schema.columns.size(); i++) { 
      ret.setField(fields.get(i).get(pojo), i); 
     } 
     return ret; 
    } 
} 

:私はその後でタプルに私のPOJOをマッピングしてみてください。実際の入力タイプは動的サブクラスです。出力タイプはgetProducedTypeによって提供されます。

MapFunctionで動的入力タイプを処理するにはどうすればよいですか?少なくとも一つの解決策(おそらくない最高の)を提供するために

+0

フィールド配列の作成は、 'field'データがシリアル化中に失われないように、' open'メソッドで行わなければならないことを示すために上記をいくつか編集しました。 –

答えて

0

、私はにクラス定義を変更:私は、その後、一般的なパラメータを含むコンパイル済みのクラスを再分類するByteBuddyを使用

public class PojoToTupleRichMapFunction<I extends FlinkPojo, O extends Tuple> extends RichMapFunction<I, O> implements ResultTypeQueryable { 
} 

static private DataSet<?> mapPojoToTuple(DataSet ds, DynDataSet dynSet) { 
    Class<?> clazz = new ByteBuddy() 
     .subclass(TypeDescription.Generic.Builder.parameterizedType(PojoToTupleRichMapFunction.class, dynSet.recType, Tuple.class).build()) 
     .make() 
     .load(PojoToTupleRichMapFunction.class.getClassLoader()) 
     .getLoaded(); 
    Constructor<?> ctr = clazz.getConstructors()[0]; 
    RichMapFunction fcn = null; 
    try { 
     fcn = (RichMapFunction) ctr.newInstance(dynSet); 
    } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) { 
     Logger.getLogger(dyn_demo.class.getName()).log(Level.SEVERE, null, ex); 
    } 
    return ds.map(fcn); 
} 

これはFlinkを満足するようです。

関連する問題