2017-12-07 21 views
0

k-meansアルゴリズムを実行するmap reduceプログラムを作成しようとしています。私はmap reduceを使うのが反復アルゴリズムを実行する最善の方法ではないことを知っています。 私はマッパークラスとレデューサークラスを作成しました。 マッパーコードで入力ファイルを読みました。 map reduceが完了したら、結果を同じ入力ファイルに保存します。どのように出力ファイルをマッパーから入力ファイルを上書きするようにしますか?Hadoop Mapreduce、map reduce出力でマッパーに入力されたtxtファイルを書き直すにはどうすればよいですか?

import java.io.IOException; 
import java.util.StringTokenizer; 
import java.util.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.Mapper; 
import java.io.FileReader; 
import java.io.BufferedReader; 
import java.util.ArrayList; 


public class kmeansMapper extends Mapper<Object, Text, DoubleWritable, 
DoubleWritable> { 
private final static String centroidFile = "centroid.txt"; 
private List<Double> centers = new ArrayList<Double>(); 

public void setup(Context context) throws IOException{ 
     BufferedReader br = new BufferedReader(new 
     FileReader(centroidFile)); 
     String contentLine; 
     while((contentLine = br.readLine())!=null){ 
      centers.add(Double.parseDouble(contentLine)); 
     } 
} 

public void map(Object key, Text input, Context context) throws IOException, 
InterruptedException { 

     String[] fields = input.toString().split(" "); 
     Double rating = Double.parseDouble(fields[2]); 
     Double distance = centers.get(0) - rating; 
     int position = 0; 
     for(int i=1; i<centers.size(); i++){ 
      Double cDistance = Math.abs(centers.get(i) - rating); 
      if(cDistance< distance){ 
       position = i; 
       distance = cDistance; 
      } 
     } 
     Double closestCenter = centers.get(position); 
     context.write(new DoubleWritable(closestCenter),new 
DoubleWritable(rating)); //outputs closestcenter and rating value 

     } 
} 
import java.io.IOException; 
import java.lang.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.Reducer; 
import java.util.*; 

public class kmeansReducer extends Reducer<DoubleWritable, DoubleWritable, 
DoubleWritable, Text> { 

public void reduce(DoubleWritable key, Iterable<DoubleWritable> values, 
Context context)// get count // get total //get values in a string 
      throws IOException, InterruptedException { 
      Iterator<DoubleWritable> v = values.iterator(); 
      double total = 0; 
      double count = 0; 
      String value = ""; //value is the rating 
      while (v.hasNext()){ 
       double i = v.next().get(); 
       value = value + " " + Double.toString(i); 
       total = total + i; 
       ++count; 
      } 
      double nCenter = total/count; 
    context.write(new DoubleWritable(nCenter), new Text(value)); 
} 
} 
import java.util.Arrays; 
import org.apache.commons.lang.StringUtils; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class run 
{ 

public static void runJob(String[] input, String output) throws Exception { 

    Configuration conf = new Configuration(); 

    Job job = new Job(conf); 
    Path toCache = new Path("input/centroid.txt"); 
    job.addCacheFile(toCache.toUri()); 
    job.setJarByClass(run.class); 
    job.setMapperClass(kmeansMapper.class); 
    job.setReducerClass(kmeansReducer.class); 
    job.setMapOutputKeyClass(DoubleWritable.class); 
    job.setMapOutputValueClass(DoubleWritable.class); 

    job.setNumReduceTasks(1); 
    Path outputPath = new Path(output); 
    FileInputFormat.setInputPaths(job, StringUtils.join(input, ",")); 
    FileOutputFormat.setOutputPath(job, outputPath); 
    outputPath.getFileSystem(conf).delete(outputPath,true); 
    job.waitForCompletion(true); 

} 

public static void main(String[] args) throws Exception { 
    runJob(Arrays.copyOfRange(args, 0, args.length-1), args[args.length-1]); 

} 

} 

おかげ

:また ので、私は古い入力ファイルと新しい入力ファイルからの値が値の差が0.1未満

私のコードがされている、すなわち、収束するまでの地図は反復を削減します

答えて

0

免責事項を記載していますが、Sparkまたはメモリ内の問題を解決できるその他のフレームワークに切り替えてください。あなたの人生はずっと良いでしょう。

本当にこのようにしたい場合は、runJobでコードを繰り返し実行し、入力に一時ファイル名を使用してください。これを実現するにはthis question on moving files in hadoopがあります。各反復が終了した後

大まか
FileSystem fs = FileSystem.get(new Configuration()); 
Path tempInputPath = Paths.get('/user/th/kmeans/tmp_input'; 

に言えば、あなたが入力を設定する必要があり、非常に最初の繰り返しのためにもちろん

fs.delete(tempInputPath) 
fs.rename(outputPath, tempInputPath) 

の操作を行います。あなたは、ファイルシステムのインスタンスと入力のための一時ファイルが必要になりますパスは、ジョブの実行時に提供される入力パスになります。後続の反復ではtempInputPathを使用できます。これは以前の反復の出力になります。

+0

返信いただきありがとうございます。私はrunjobのコードを反復処理するにはどうすればよいですか? – th308

+0

通常のforループでrunJobのコードの必要な部分をラップするだけです。 –

関連する問題