2011-12-21 19 views
1

私は、すべての種類の奇妙な予期しない動作を示しているhadoop 0.20.205.0 MapReduceジョブ(シングルスレッド、ローカル)を実行しようとしていました。私は最終的に理由を考え出した。これは私にはハープのバグのように見えますが、おそらく私が理解できないことがあります。誰かが私に助言をくれますか? 私のsetMapOutputKeyClassクラスはConfigurableを実装しています。 ReadFieldsメソッドは、setConfが最初に呼び出されない限り正しく読み込まれません(これはConfigurableインターフェイスのポイントだと思います)。 WritableComparatorのコードを見ると、フレームワークがソートするときに、内部キーオブジェクトをインスタンス化します。Hadoop 0.20.205.0 WritableComparatorが設定可能なキーを受け付けていません

70  key1 = newKey(); 
71  key2 = newKey(); 

とNEWKEYは()キーを構築するために、ヌルの設定を使用しています。私は、デバッガで実行したときに

83 public WritableComparable newKey() { 
84 return ReflectionUtils.newInstance(keyClass, null); 
85 } 

は確かに私は

91  key1.readFields(buffer); 
でいることを見つけます

conf内のkey1はnullなので、setConfは呼び出されていません。

これはhadoopのバグですか、私はConfigurable以外の何かを使ってキーを設定するはずですか? これがバグであれば誰でも回避策を知っていますか?

編集:ここでは、この理由で失敗したジョブの短い(やや不自然)の例です:

// example/WrapperKey.java 

package example; 

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 

import org.apache.hadoop.conf.Configurable; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.ByteWritable; 
import org.apache.hadoop.io.SequenceFile; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 
import org.apache.hadoop.util.ReflectionUtils; 

/** 
* This class wraps a WritableComparable class to add one extra possible value 
* (namely null) to the range of values available for that class. 
*/ 
public class WrapperKey<T extends WritableComparable> implements 
     WritableComparable<WrapperKey<T>>, Configurable { 
    private T myInstance; 
    private boolean isNull; 
    private Configuration conf; 

    @Override 
    public void setConf(Configuration conf) { 
     this.conf = conf; 
     Class<T> heldClass = (Class<T>) conf.getClass("example.held.class", 
       null, WritableComparable.class); 
     myInstance = ReflectionUtils.newInstance(heldClass, conf); 
    } 

    @Override 
    public Configuration getConf() { 
     return conf; 
    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     out.writeBoolean(isNull); 
     if (!isNull) 
      myInstance.write(out); 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     isNull = in.readBoolean(); 
     if (!isNull) 
      myInstance.readFields(in); 
    } 

    @Override 
    public int compareTo(WrapperKey<T> o) { 
     if (isNull) { 
      if (o.isNull) 
       return 0; 
      else 
       return -1; 
     } else if (o.isNull) 
      return 1; 
     else 
      return myInstance.compareTo(o.myInstance); 
    } 

    public void clear() { 
     isNull = true; 
    } 

    public T get() { 
     return myInstance; 
    } 

    /** 
    * Should sort the KV pairs (5,0), (3,0), and (null,0) to [(null,0), (3,0), (5,0)], but instead fails 
    * with a NullPointerException because WritableComparator's internal keys 
    * are not properly configured 
    */ 
    public static void main(String[] args) throws IOException, 
      InterruptedException, ClassNotFoundException { 
     Configuration conf = new Configuration(); 
     conf.setClass("example.held.class", ByteWritable.class, 
       WritableComparable.class); 
     Path p = new Path("input"); 
     Path startFile = new Path(p, "inputFile"); 
     SequenceFile.Writer writer = new SequenceFile.Writer(
       p.getFileSystem(conf), conf, startFile, WrapperKey.class, 
       ByteWritable.class); 
     WrapperKey<ByteWritable> key = new WrapperKey<ByteWritable>(); 
     key.setConf(conf); 
     ByteWritable value = new ByteWritable((byte) 0); 
     key.get().set((byte) 5); 
     writer.append(key, value); 
     key.get().set((byte) 3); 
     writer.append(key, value); 
     key.clear(); 
     writer.append(key, value); 
     writer.close(); 

     Job j = new Job(conf, "Example job"); 
     j.setInputFormatClass(SequenceFileInputFormat.class); 
     j.setOutputKeyClass(WrapperKey.class); 
     j.setOutputValueClass(ByteWritable.class); 
     j.setOutputFormatClass(SequenceFileOutputFormat.class); 
     FileInputFormat.setInputPaths(j, p); 
     FileOutputFormat.setOutputPath(j, new Path("output")); 
     boolean completed = j.waitForCompletion(true); 
     if (completed) { 
      System.out 
        .println("Successfully sorted byte-pairs by key (putting all null pairs first)"); 
     } else { 
      throw new RuntimeException("Failed to sort"); 
     } 
    } 
} 
+1

なぜConfigurableを実装していますか? –

+0

鍵はゲームのボード状態です。私は解決されているゲームの幅と高さを指定する必要があります。次に読み込まれるバイト数はwidth * height(ボード上の各セルに1つ)です。私は幅と高さを渡すことができることを理解していますが、これは一般的な解決策ではありません。たとえば、私のキーが実際にジェネリック型を持ち、インスタンスを含むクラスがいくつかの設定パラメータに依存しているとします。 readFieldsを呼び出すたびにクラス名を効率的に読み込んで解析する方法はありません。私は各インスタンスに対して一度だけそれを知る必要があると予想する必要があります – dspyz

+0

キーはWritableComparableである必要があるので、HDFSに書き込むことができ、Hadoopによってソートされ、reduceフェーズに入力されます。 Hadoopは作業を行うときにWritableComparableメソッドを使用します。新しいインスタンスを作成しますが、ConfigurableかsetConf()を呼び出す必要はありません。設定可能なのは、ジョブのconfクラスで、コードで使用する任意のクラスではありません。 –

答えて

0

WrapperKeyは設定を実装し、setConfを実装しています。インターフェイスを実装するだけで、他のクラスがこれを呼び出すことになるわけではありません。 Hadoopフレームワークは、キーに対してsetConfメソッドを呼び出していない可能性があります。

これはバグではありません。私が見たすべてのタイプはWritableComparableのみを実装し、Configurableは実装していません。これを回避する方法が不明な場合は、キーに具体的な型を定義する必要があります。

+0

私は、コンフィギュラブルではコンフィギュレータが(リフレクションによってインスタンス化されているので)キーの中に設定オプションを渡すことができるようになっています。だから、ReflectionUtils.newInstanceがその設定を引数として取ります – dspyz

+0

RefectionUtils.newInstance()を使ってキーがインスタンス化されるコードを教えてください。 –

+0

ReflectionUtils.newInstance(クラス、コンフィグレーション) –

関連する問題