2016-08-17 6 views
0

Hbaseにある2つのテーブルのマップサイドジョインをしようとしています。私の目的は、ハッシュマップに小さなテーブルのレコードを保持し、大きなテーブルと比較し、一度一致すると、再びhbaseのテーブルにレコードを書き込むことです。私は、MapperとReducerの両方を使用して結合操作のための同様のコードを書いて、うまくいき、両方のテーブルがマッパークラスでスキャンされます。しかし、サイド・ジョインを減らすことは効率的ではないので、マッパー側でのみ表に結合したい。次のコードでは、 "ブロックされているとコメントされています"は、常にfalseを返し、最初のテーブル(小さいもの)が読み込まれないことを確認することです。ヒントがあれば助かります。 HDPのサンドボックスを使用しています。Hbase mapside join-テーブルの1つが読み込まれませんか? hbaseから正しい結果をhbaseに読み込む

import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.HashMap; 
import java.util.HashSet; 
import java.util.List; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
//import org.apache.hadoop.hbase.client.Scan; 
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 
import org.apache.hadoop.hbase.mapreduce.TableReducer; 
import org.apache.hadoop.hbase.util.Bytes; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper.Context; 
import org.apache.hadoop.util.Tool; 
import com.sun.tools.javac.util.Log; 
import java.io.IOException; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.client.*; 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 
import org.apache.hadoop.hbase.mapred.TableOutputFormat; 
import org.apache.hadoop.hbase.mapreduce.TableMapper; 
import org.apache.hadoop.hbase.mapreduce.TableSplit; 

public class JoinDriver extends Configured implements Tool { 

    static int row_index = 0; 

     public static class JoinJobMapper extends TableMapper<ImmutableBytesWritable, Put> { 
     private static byte[] big_table_bytarr = Bytes.toBytes("big_table"); 
     private static byte[] small_table_bytarr = Bytes.toBytes("small_table"); 

     HashMap<String,String> myHashMap = new HashMap<String, String>(); 

     byte[] c1_value; 
     byte[] c2_value; 

     String big_table; 
     String small_table; 

     String big_table_c1; 
     String big_table_c2; 

     String small_table_c1; 
     String small_table_c2; 

     Text mapperKeyS; 
     Text mapperValueS; 
     Text mapperKeyB; 
     Text mapperValueB; 

     public void map(ImmutableBytesWritable rowKey, Result columns, Context context) { 
      TableSplit currentSplit = (TableSplit) context.getInputSplit(); 
      byte[] tableName = currentSplit.getTableName(); 

      try { 
       Put put = new Put(Bytes.toBytes(++row_index)); 


       // put small table into hashmap - myhashMap 
       if (Arrays.equals(tableName, small_table_bytarr)) { 

        c1_value = columns.getValue(Bytes.toBytes("s_cf"), Bytes.toBytes("s_cf_c1")); 
        c2_value = columns.getValue(Bytes.toBytes("s_cf"), Bytes.toBytes("s_cf_c2")); 
        small_table_c1 = new String(c1_value); 
        small_table_c2 = new String(c2_value); 

        mapperKeyS = new Text(small_table_c1); 
        mapperValueS = new Text(small_table_c2); 

        myHashMap.put(small_table_c1,small_table_c2); 


       } else if (Arrays.equals(tableName, big_table_bytarr)) { 
        c1_value = columns.getValue(Bytes.toBytes("b_cf"), Bytes.toBytes("b_cf_c1")); 
        c2_value = columns.getValue(Bytes.toBytes("b_cf"), Bytes.toBytes("b_cf_c2")); 
        big_table_c1 = new String(c1_value); 
        big_table_c2 = new String(c2_value); 

        mapperKeyB = new Text(big_table_c1); 
        mapperValueB = new Text(big_table_c2); 



      // if (set.containsKey(big_table_c1)){ 

        put.addColumn(Bytes.toBytes("join"), Bytes.toBytes("join_c1"), Bytes.toBytes(big_table_c1)); 
        context.write(new ImmutableBytesWritable(mapperKeyB.getBytes()), put); 
        put.addColumn(Bytes.toBytes("join"), Bytes.toBytes("join_c2"), Bytes.toBytes(big_table_c2)); 
        context.write(new ImmutableBytesWritable(mapperKeyB.getBytes()), put); 
        put.addColumn(Bytes.toBytes("join"), Bytes.toBytes("join_c3"),Bytes.toBytes((myHashMap.get(big_table_c1)))); 
        context.write(new ImmutableBytesWritable(mapperKeyB.getBytes()), put); 

      //  } 

       } 

      } catch (Exception e) { 
       // TODO : exception handling logic 
       e.printStackTrace(); 
      } 
     } 

    } 

    public int run(String[] args) throws Exception { 

     List<Scan> scans = new ArrayList<Scan>(); 



     Scan scan1 = new Scan(); 
     scan1.setAttribute("scan.attributes.table.name", Bytes.toBytes("small_table")); 
     System.out.println(scan1.getAttribute("scan.attributes.table.name")); 
     scans.add(scan1); 

     Scan scan2 = new Scan(); 
     scan2.setAttribute("scan.attributes.table.name", Bytes.toBytes("big_table")); 
     System.out.println(scan2.getAttribute("scan.attributes.table.name")); 
     scans.add(scan2); 

     Configuration conf = new Configuration(); 
     Job job = new Job(conf); 
     job.setJar("MSJJ.jar"); 
     job.setJarByClass(JoinDriver.class); 

     TableMapReduceUtil.initTableMapperJob(scans, JoinJobMapper.class, ImmutableBytesWritable.class, Put.class, job); 
     TableMapReduceUtil.initTableReducerJob("joined_table", null, job); 
     job.setNumReduceTasks(0); 


     job.waitForCompletion(true); 

     return 0; 
    } 

    public static void main(String[] args) throws Exception { 
     JoinDriver runJob = new JoinDriver(); 
     runJob.run(args); 

    } 

} 

答えて

1

問題文を読むことによって、複数のHBaseテーブル入力の使用について間違った考えがあると思います。 マッパークラスのセットアップメソッドで、小さなテーブルをHashMapにロードすることをお勧めします。次に、大きなテーブルでマップ専用ジョブを使用します。マップメソッドでは、以前にロードしたHashMapから対応する値をフェッチできます。 これがどのように機能するか教えてください。

関連する問題