2017-07-10 4 views
0

私は誰かが私にこれら2つのAPI呼び出しの違いを教えてくれることを願っています。私は2人の間で奇妙な結果を得ている。これは、hbase-client/hbase-serverバージョン1.0.1および1.2.0-cdh5.7.2で発生しています。Hbaseでは、ResultScannerとinitTableMapperJobの間のスキャンの違いは何ですか

まず、私の行キーの形式は、hash_name_timestampの形式である です。 100_servername_1234567890 hbaseテーブルのTTLは30日なので、30日以上経過したものは圧縮後に消えてしまいます。

次は、ResultScannerを使用するためのコードです。 MapReduceを使用しないため、完了には非常に時間がかかります。私はこの方法で仕事をすることができません。なぜなら、時間がかかりすぎるからです。しかし、デバッグの目的で、私はこのメソッドに問題はありません。

Scan scan = new Scan(); 
scan.addColumn(Bytes.toBytes("raw_data"), Bytes.toBytes(fileType)); 
scan.setCaching(500); 
scan.setCacheBlocks(false); 
scan.setTimeRange(start, end); 

Connection fConnection = ConnectionFactory.createConnection(conf); 
Table table = fConnection.getTable(TableName.valueOf(tableName)); 
ResultScanner scanner = table.getScanner(scan); 
for (Result result = scanner.next(); result != null; result = scanner.next()) { 
    System.out.println("Found row: " + Bytes.toString(result.getRow())); 
} 

次のコードは動作しません:それは、返されたキーのすべてのタイムスタンプが過去30日以内に、指定した時間の範囲内にあるので、私には有効になり、指定した時間範囲のすべてのキーが、一覧表示されますMapReduceはResultScannerの方法よりも高速に動作します。これは、マップを1200個のマップに分割するためです。問題私は期限切れによるTTLに消えているはずですrowkeys取得していますされています。ここでは

Scan scan = new Scan(); 
scan.addColumn(Bytes.toBytes("raw_data"), Bytes.toBytes(fileType)); 
scan.setCaching(500); 
scan.setCacheBlocks(false); 
scan.setTimeRange(start, end); 
TableMapReduceUtil.initTableMapperJob(tableName, scan, MTTRMapper.class, Text.class, IntWritable.class, job); 

はマッパーの25%以上が失敗したため、最終的には、後に全体のMRジョブを殺す、私が取得エラーです。

Error: org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions: Wed Jun 28 13:46:57 PDT 2017, null, java.net.SocketTimeoutException: callTimeout=120000, callDuration=120301: row '65_app129041.iad1.mydomain.com_1476641940' on table 'server_based_data' at region=server_based_data

私はHBaseのクライアントとのHBase・サーバーのjarファイルのコードを勉強しようとするでしょうが、うまくいけば、誰かがinitTableMapperJob呼び出しが失敗する原因となっている方法の違いがあり、どのようなぶっきらぼう知っているだろう。

編集:ここに私が使用しているテーブルの説明である:私はまだのでinitTableMapperJob方法で何かがあると思う

public void map(ImmutableBytesWritable rowkey, Result columns, Context context) throws IOException, InterruptedException { 
    Configuration conf = context.getConfiguration(); 
    startMS = conf.getLong("startTime", 0); 
    endMS = conf.getLong("endTime", 1); 
    System.out.println(startMS); 
    System.out.println(endMS); 

    // extract the ci as the key from the rowkey 
    String pattern = "\\d*_(\\S*)_(\\d{10})"; 
    String ciname = null; 
    Pattern r = Pattern.compile(pattern); 
    String strRowKey = Bytes.toString(rowkey.get()); 
    // check the time here to see if we count it or not in the counts 

    Matcher m = r.matcher(strRowKey); 
    long ts = 0; 

    if (m.find()) { 
     ts = Long.valueOf(m.group(2)).longValue(); 
     ciname = m.group(1); 
     if ((ts >= startMS) && (ts <= endMS)) { 
      context.write(new Text(ciname), ONE);   
     }   
    }  
} 

describe 'server_based_data' 
Table server_based_data is ENABLED            
server_based_data                
COLUMN FAMILIES DESCRIPTION              
{NAME => 'raw_data', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLIC 
ATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'SNAPPY', MIN_VERSIONS => '0 
', TTL => '2592000 SECONDS (30 DAYS)', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE 
=> '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}       
1 row(s) in 0.5180 seconds 
ここ

は私マッパーコードがあります私が上に投稿したエラーは、テーブルのTTLから期限切れになったはずの行のタイムスタンプを示していますが、何らかの理由でinitTableMapperJobがまだそれを見つけてそれを探してみるのですが、 。

+0

あなたは縮小ジョブをマップし、Hbaseテーブルを記述してください。興味があれば、HbaseのSparkでアプローチすることをお勧めします。 – gorros

+0

@gorros、ご協力いただきありがとうございます。はい、あなたがスパークの提案をしているなら、私は聞きます。うまくいけば実装が簡単で、私はあなたを理解するのに十分なほどスマートです。私はまたあなたが要求したようにいくつかの詳細で質問を更新しました。 – Classified

答えて

1

いくつかの提案をしたいと思います。

  1. Spark on HBase examplesをご覧ください。具体的には、Sparkでスキャンを実行する方法です。これはScalaで書かれていますが、あなたはJavaで同じことを達成するかもしれません。その場合でも、MapReduceコードよりも簡潔になります。
  2. 行キーのみが必要な場合は、FirstKeyOnlyFilterなどの対応するフィルタを追加します。これにより、HBaseから取得された不要なデータの量が削減されます。

initTableMapperJobの奇妙な動作の原因がわかりません。しかし、私は上記の提案が役立つことを願っています。

関連する問題