2016-05-05 3 views
0

ここではドライバのコードがあるが、私はこのドライバを実行すると、私はから来て、次の例外を取得AvroMultipleInputs - 問題複数のパスを追加すること

AvroMultipleInputs.addInputPath(jobConf, new Path(args[0]), IncrementalDataMapper.class, incrSchema); 
AvroMultipleInputs.addInputPath(jobConf, new Path(args[1]), BaseDataMapper.class, incrSchema); 

AvroJob.setMapOutputSchema(jobConf, Pair.getPairSchema(Schema.create(Type.STRING), incrSchema)); 

AvroJob.setReducerClass(jobConf, DeltaCaptureReducer.class); 
AvroJob.setInputSchema(jobConf, Pair.getPairSchema(Schema.create(Type.STRING), incrSchema)); 
AvroJob.setOutputSchema(jobConf, incrSchema); 

異なるマッパークラスで、私は複数のアブロ入力パスを追加するために使用することスニペットAvroMultipleInputs

java.lang.RuntimeExceptionの方法getInputSchemaMap(...):org.apache.avro.SchemaParseException: は再定義することはできません:com.sample.Test

今私が行ったことは、同じ問題を引き起こすスタンドアロンプ​​ログラムのAvroMultipleInputsのメソッドgetInputSchemaMap(...)をシミュレートすることです。

Schema.Parser schemaParser = new Schema.Parser(); 
    String m1 = "path1;" + toBase64("{\"type\":\"record\",\"name\":\"Test\",\"namespace\":\"com.sample\",\"fields\":[ {\"name\":\"BATCH_ID\",\"type\":[\"null\",\"long\"]} ] }"); 
    String m2 = "path2;" + toBase64("{\"type\":\"record\",\"name\":\"Test\",\"namespace\":\"com.sample\",\"fields\":[ {\"name\":\"BATCH_ID\",\"type\":[\"null\",\"long\"]} ] }"); 
    String[] schemaMappings = (m1 + "," + m2).split(","); 
    for (String schemaMapping : schemaMappings) { 
     String[] split = schemaMapping.split(";"); 
     String schemaString = fromBase64(split[1]); 
     System.out.println(schemaString); 
     Schema inputSchema; 
     try { 
      inputSchema = schemaParser.parse(schemaString); 
     } catch (SchemaParseException e) { 
      throw new RuntimeException(e); 
     } 
    } 

スタンドアロンコード

失敗したコードは、今私は、次のようにすべてのマッピングのために作成されたパーサによってこれを修正しました。

for (String schemaMapping : schemaMappings) { 
     String[] split = schemaMapping.split(";"); 
     String schemaString = fromBase64(split[1]); 
     System.out.println(schemaString); 
     Schema inputSchema; 
     try { 
      Schema.Parser schemaParser = new Schema.Parser(); 
      inputSchema = schemaParser.parse(schemaString); 
     } catch (SchemaParseException e) { 
       throw new RuntimeException(e); 
     } 
} 

誰でもこれを試しましたか?修正するアイデアはありますか?

私も自分のプロジェクトにAvroMultipleInputsをコピーして、上記のような別のパーサーを使用するためにコードを変更しようとしましたが、私は

java.lang.NullPointerExceptionが スレッドの例外「メイン」次の例外を取得しますorg.apache.hadoop.mapred.lib.MultipleInputs.getInputFormatMap(MultipleInputs.java:93) at org.apache.hadoop.mapred.lib.DelegatingInputFormat.getSplits(DelegatingInputFormat.java:55) at org.apache.hadoop .mapreduce.JobSubmitter.writeOldSplits(JobSubmitter.java:328) at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:320) org.apache.hadoop.mapreduce.Job $ 10.runでorg.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196) (Job.java:1290)実際には

答えて

0

で、私はそれを動作させるために多くのファイルをカスタマイズする必要があります。何かが
DelegatingInputFormat.java

DelegatingMapper.java
MapCollector.java
を(私は知らないのか分からない)

AvroMultipleInputs.java場合、私はまだ影響について確認していません TaggedInputSplit.java

関連する問題