2017-10-29 9 views
0

通常、私たちはjavaファイルへの入力として1つのテキストファイルを与えます(単純な単語カウントの問題の場合など)。その代わりに、私は与えたい100のcsvファイルを持っています(すべてのファイルを1つのファイルにマージすることはできません)。与えられた100株の最大/最小株価変動を予測しようとすると、各csvファイルは一意です。 したがって、csvファイルのフォルダ全体をjavaプログラムへの入力ストリームとして与える方法。Hadoop Dfsとmapreduceの入力として複数のファイル

+0

MapReduceはすでにMapReduceのフォルダを受け入れる作るためにどのようにフォルダ –

+0

理想的には、あなたはそのままHDFSにcsvファイルを置くだろう、そしてあなたはそれの上にハイブまたはスパークのクエリを使用して... –

+0

を受け入れますか? /*演算子を使用していますか? – user2336157

答えて

4

解決策1:これを解決するために、我々はFileInputFormat.addInputPaths()メソッドを使用することができ、複数の入力のカンマ区切りのリストを取ることができ、我々は

FileInputFormat.addInputPaths(“file0,file1,....”) 

または

と仮定としてそれを書くことができます2ファイルを分析する必要があり、FacebookやYouTubeのサービスを使用している人のリスト(これらのうち1つの出力ファイルが必要)

つのファイルがあります
Path YoutubePath = new Path(args[0]); 
Path FacebookPath = new Path(args[1]); 
Path outputPath = new Path(args[2]); 
MultipleInputs.addInputPath(job, FacebookPath, TextInputFormat.class, JoinFacebookMapper.class); 
MultipleInputs.addInputPath(job, YoutubePath, TextInputFormat.class, YoutubeMapper.class); 
FileOutputFormat.setOutputPath(job, outputPath); 

コードに次の行を追加すると、1回のマップ削減ジョブで複数のファイルが渡されます。あなたは、引数ここ

0

としてフォルダ全体を渡すことができます

または

は私のテストコードは、HDFSに多くのファイルをコピーして、それらをマージすることで、それはまた、他のファイル形式をフィルタリングすることができ、私はそれをするのに役立つかもしれないと思います君は !

public class FilesMergeToHDFS { 
private static FileSystem fs = null; 
private static FileSystem local = null; 

public static void main(String[] args) throws IOException, URISyntaxException { 
    // TODO Auto-generated method stub 
    list(); 
} 

private static void list() throws IOException, URISyntaxException { 
    // TODO Auto-generated method stub 

      Configuration conf = new Configuration(); 
      URI uri = new URI("hdfs://xxx:9000");//HDFS address 
      fs = FileSystem.get(uri,conf); 


      local = FileSystem.getLocal(conf); 

      FileStatus[] dirsStatus = local.globStatus(new Path("E://data/73/*"), new RegexExcludePathFilter("^.*svn$")); 
      Path[] dirs = FileUtil.stat2Paths(dirsStatus); 
      FSDataInputStream in = null; 
      FSDataOutputStream out = null; 
      for(Path p:dirs){ 
        //upload 
       String filename = p.getName(); 
       FileStatus[] localStatus = local.globStatus(new Path(p+"/*"),new RegexAcceptPathFilter("^.*txt$")); 
       Path[] listedPaths = FileUtil.stat2Paths(localStatus); 
       //set outputpath 
       Path block = new Path("hdfs://hadoop:9000/mergehdfs/filesmerge/"+filename+".txt"); 
       out =fs.create(block); 
       for(Path path:listedPaths){ 
        in = local.open(path); 
        IOUtils.copyBytes(in, out, 4096, false); // copydata 
        in.close(); 
       } 
       if (out != null) { 
        out.close(); 
       } 
      } 
} 

private static class RegexAcceptPathFilter implements PathFilter { 

private final String regex; 

    public RegexAcceptPathFilter(String regex) { 
     this.regex = regex; 
    } 

    @Override 
    public boolean accept(Path path) { 
     // TODO Auto-generated method stub 
     boolean flag = path.toString().matches(regex); 
     return flag; 
    } 

} 

private static class RegexExcludePathFilter implements PathFilter { 
private final String regex; 
    public RegexExcludePathFilter (String regex) { 
     this.regex = regex; 
    } 

    @Override 
    public boolean accept(Path path) { 
     // TODO Auto-generated method stub 
     boolean flag = path.toString().matches(regex); 
     return !flag; 
    } 
} 
} 
関連する問題