私は、すべての種類の奇妙な予期しない動作を示しているhadoop 0.20.205.0 MapReduceジョブ(シングルスレッド、ローカル)を実行しようとしていました。私は最終的に理由を考え出した。これは私にはハープのバグのように見えますが、おそらく私が理解できないことがあります。誰かが私に助言をくれますか? 私のsetMapOutputKeyClassクラスはConfigurableを実装しています。 ReadFieldsメソッドは、setConfが最初に呼び出されない限り正しく読み込まれません(これはConfigurableインターフェイスのポイントだと思います)。 WritableComparatorのコードを見ると、フレームワークがソートするときに、内部キーオブジェクトをインスタンス化します。Hadoop 0.20.205.0 WritableComparatorが設定可能なキーを受け付けていません
70 key1 = newKey();
71 key2 = newKey();
とNEWKEYは()キーを構築するために、ヌルの設定を使用しています。私は、デバッガで実行したときに
83 public WritableComparable newKey() {
84 return ReflectionUtils.newInstance(keyClass, null);
85 }
は確かに私は
91 key1.readFields(buffer);
でいることを見つけます
conf内のkey1はnullなので、setConfは呼び出されていません。
これはhadoopのバグですか、私はConfigurable以外の何かを使ってキーを設定するはずですか? これがバグであれば誰でも回避策を知っていますか?
編集:ここでは、この理由で失敗したジョブの短い(やや不自然)の例です:
// example/WrapperKey.java
package example;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
/**
* This class wraps a WritableComparable class to add one extra possible value
* (namely null) to the range of values available for that class.
*/
public class WrapperKey<T extends WritableComparable> implements
WritableComparable<WrapperKey<T>>, Configurable {
private T myInstance;
private boolean isNull;
private Configuration conf;
@Override
public void setConf(Configuration conf) {
this.conf = conf;
Class<T> heldClass = (Class<T>) conf.getClass("example.held.class",
null, WritableComparable.class);
myInstance = ReflectionUtils.newInstance(heldClass, conf);
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeBoolean(isNull);
if (!isNull)
myInstance.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
isNull = in.readBoolean();
if (!isNull)
myInstance.readFields(in);
}
@Override
public int compareTo(WrapperKey<T> o) {
if (isNull) {
if (o.isNull)
return 0;
else
return -1;
} else if (o.isNull)
return 1;
else
return myInstance.compareTo(o.myInstance);
}
public void clear() {
isNull = true;
}
public T get() {
return myInstance;
}
/**
* Should sort the KV pairs (5,0), (3,0), and (null,0) to [(null,0), (3,0), (5,0)], but instead fails
* with a NullPointerException because WritableComparator's internal keys
* are not properly configured
*/
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
conf.setClass("example.held.class", ByteWritable.class,
WritableComparable.class);
Path p = new Path("input");
Path startFile = new Path(p, "inputFile");
SequenceFile.Writer writer = new SequenceFile.Writer(
p.getFileSystem(conf), conf, startFile, WrapperKey.class,
ByteWritable.class);
WrapperKey<ByteWritable> key = new WrapperKey<ByteWritable>();
key.setConf(conf);
ByteWritable value = new ByteWritable((byte) 0);
key.get().set((byte) 5);
writer.append(key, value);
key.get().set((byte) 3);
writer.append(key, value);
key.clear();
writer.append(key, value);
writer.close();
Job j = new Job(conf, "Example job");
j.setInputFormatClass(SequenceFileInputFormat.class);
j.setOutputKeyClass(WrapperKey.class);
j.setOutputValueClass(ByteWritable.class);
j.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(j, p);
FileOutputFormat.setOutputPath(j, new Path("output"));
boolean completed = j.waitForCompletion(true);
if (completed) {
System.out
.println("Successfully sorted byte-pairs by key (putting all null pairs first)");
} else {
throw new RuntimeException("Failed to sort");
}
}
}
なぜConfigurableを実装していますか? –
鍵はゲームのボード状態です。私は解決されているゲームの幅と高さを指定する必要があります。次に読み込まれるバイト数はwidth * height(ボード上の各セルに1つ)です。私は幅と高さを渡すことができることを理解していますが、これは一般的な解決策ではありません。たとえば、私のキーが実際にジェネリック型を持ち、インスタンスを含むクラスがいくつかの設定パラメータに依存しているとします。 readFieldsを呼び出すたびにクラス名を効率的に読み込んで解析する方法はありません。私は各インスタンスに対して一度だけそれを知る必要があると予想する必要があります – dspyz
キーはWritableComparableである必要があるので、HDFSに書き込むことができ、Hadoopによってソートされ、reduceフェーズに入力されます。 Hadoopは作業を行うときにWritableComparableメソッドを使用します。新しいインスタンスを作成しますが、ConfigurableかsetConf()を呼び出す必要はありません。設定可能なのは、ジョブのconfクラスで、コードで使用する任意のクラスではありません。 –