私はMapReduceのコードから奇妙な出力を取得していますに影響を与えません。削減機能は、最終的な出力
入力:
aa bb
aa cc
bb aa
cc dd
dd bb
xx aa
ss rr
出力:
aa [email protected]
aa [email protected]
aa [email protected]
aa [email protected]
bb [email protected]
bb [email protected]
bb [email protected]
cc [email protected]
cc [email protected]
dd [email protected]
dd [email protected]
rr [email protected]
ss [email protected]
xx [email protected]
コード:
package org.mapreduce.userscore;
import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class UserScore {
public static class ScoreWritable implements Writable {
private IntWritable N;
private IntWritable M;
//Default Constructor
public ScoreWritable() {
this.N = new IntWritable();
this.M = new IntWritable();
}
//Custom constructor
public ScoreWritable(IntWritable N, IntWritable M){
this.N = N;
this.M = M;
}
//Setter method to set the values of ScoreWritable objects
public void set(IntWritable NN,IntWritable MM) {
this.N = NN;
this.M = MM;
}
//to get the first object from Score Record
public IntWritable getN() {
return N;
}
//to get the second object from Score Record
public IntWritable getM() {
return M;
}
@Override
//overriding default readFields method.
//It de-serializes the byte stream data
public void readFields(DataInput in) throws IOException {
N.readFields(in);
M.readFields(in);
}
@Override
//It serializes object data into byte stream data
public void write(DataOutput out) throws IOException {
N.write(out);
M.write(out);
}
//@Override
//public boolean equals(Object o) {
//if (o instanceof ScoreWritable) {
//ScoreWritable other = (ScoreWritable) o;
//return N.equals(other.N) && M.equals(other.M);
//}
//return false;
//}
@Override
public int hashCode() {
return N.hashCode();
}
}
public static class Map extends Mapper<LongWritable, Text, Text, ScoreWritable> {
private Text user = new Text();
private ScoreWritable score = new ScoreWritable();
private IntWritable NN = new IntWritable();
private IntWritable MM = new IntWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
int iterator = 1;
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
user.set(tokenizer.nextToken());
if (iterator == 1) {
NN = new IntWritable(1);
MM = new IntWritable(0);
iterator += 1;
} else {
NN = new IntWritable(0);
MM = new IntWritable(1);
}
score.set(NN,MM);
context.write(user, score);
}
}
}
public static class Reduce extends Reducer<Text, ScoreWritable, Text, IntWritable> {
private IntWritable resultf = new IntWritable();
public void reduce(Text key, Iterable<ScoreWritable> values, Context context) throws IOException, InterruptedException {
//int result = ((values.getN().get()) * (values.getM()).get());
resultf.set(result);
context.write(key, resultf = new IntWritable(2));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//Create a new Jar and set the driver class(this class) as the main class of jar:
Job job = new Job(conf, "userscore");
job.setJarByClass(UserScore.class);
//Set the map and reduce classes in the job:
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setCombinerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//job.setMapOutputKeyClass(Text.class);
//job.setMapOutputValueClass(ScoreWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(4);
//Set the input and the output path from the arguments
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//Run the job and wait for its completion
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
テキストファイルから読み込むMapreduceコードを作成しようとしています。テキストファイルには、各行に1組の文字列があり、これらの文字列はソーシャルネットワーク内のユーザー名を表し、最初の文字列は2番目のユーザーに従います。フォロワーの総数を計算し、各ユーザーのユーザー名に続いて、これらの2つの数値を乗算して、各ユーザーのスコアの一種を作成しようとしています。
考えられるのは、値の書き込み可能なカスタムクラス(ScoreWritable)を作成し、ユーザー名をテキストキーとして、値をScoreWritableクラスとして送信することです。 Reduceの出力を変更して定数 "2"を出力したことに気付いた場合は、チェックしてみてください。しかし、出力は上記のようになります。
私は間違っていますか?
私は仮想マシンでClouderaイメージを使用してjarファイルをコンパイルして実行しています。
はあなたがデバッグ出力は何ですか値2?そしてあなたの出力をどのように出力しますか?UserScoreの列がtoString()呼び出しの結果と同じであると思われます – gtosto
@gtosto出力は4つのテキストファイルで構成されています。これらのファイルのいずれかからのテキストが含まれ、残りの3つは空です。私は4つの作業を減らしたので、出力はテキストファイルとして4つの部分に分けられるべきだと思っていました。私がマッパーから受け取ったのと同じ値を出力しようとすると、私はあなたが上に示したのと同じ出力を得ます。 – BlueTile