2017-09-23 18 views
0

Bigtableで開始行と終了行をスキャンしようとしています。 スキャン間の要素は約100Kです。 setCaching(500)を使用してHBaseで実行できたバッチでそれらを取得したいのですが。HbaseとGoogle Bigtable:多数の行をスキャンする

Bigtableでは、setCachingは無視され、1 RPCで全体の結果セットを取得しようとします。どのようにHBaseに似たように達成することができますか?

私はJavaドライバbigtable-hbase-1.1とバージョンを使用しています1.0.0-pre3

Bigtableの設定:

Configuration conf = new Configuration(); 
conf.set("google.bigtable.buffered.mutator.throttling.enable", "false"); 
conf.set("google.bigtable.rpc.timeout.ms", "1500000"); 
conf.set("google.bigtable.grpc.read.partial.row.timeout.ms","1500000"); 
conf.set("google.bigtable.long.rpc.timeout.ms", "1500000"); 
conf.set("google.bigtable.grpc.retry.deadlineexceeded.enable", "false"); 
conf.set("google.bigtable.buffered.mutator.max.inflight.rpcs", "500"); 
conf.set("google.bigtable.bulk.max.row.key.count", "500"); 

Configuration conff = BigtableConfiguration.configure(conf,projectID,instanceID); 
connection = BigtableConfiguration.connect(conff); 

スキャナの設定:

byte[] start = "prefix".getbytes() ; 
byte[] end = Bytes.add("prefix".getbytes(),(byte))0xff); 
Scan scan = new Scan(start, end); 
出てくる行の

期待数は100KSのオーダーであります。

答えて

0

行を読み込むときにバッチ処理を心配する必要はありません。 Bigtableの応答はストリーミングされ、バックプレッシャーに対応しています。私たちはストリームのチャンクをバッファリングするためにGRPCに依存しています。 https://grpc.io/docs/guides/concepts.html#server-streaming-rpc

このサンプルコードを試す気にし、それが動作するかどうか私に教えてだろう(つまり、何の期限がエラーを超えていない。): ここGRPCストリーミングについて紹介へのリンクです。サンプルコードが機能する場合は、独自のデータをスキャンしてそれが動作することを確認してください。何かがない場合は、私に知らせてください。

のpom.xml:

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 

    <groupId>com.google.cloud.example</groupId> 
    <artifactId>row-write-read-example</artifactId> 
    <version>1.0-SNAPSHOT</version> 

    <dependencies> 
    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>4.12</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>com.google.cloud.bigtable</groupId> 
     <artifactId>bigtable-hbase-1.x</artifactId> 
     <version>1.0.0-pre3</version> 
    </dependency> 
    </dependencies> 

    <build> 
    <plugins> 
     <plugin> 
     <artifactId>maven-compiler-plugin</artifactId> 
     <version>3.6.2</version> 
     <configuration> 
      <source>1.8</source> 
      <target>1.8</target> 
     </configuration> 
     </plugin> 
    </plugins> 
    </build> 
</project> 

は、Java:

import com.google.cloud.bigtable.hbase.BigtableConfiguration; 
import java.io.IOException; 
import org.apache.hadoop.hbase.HColumnDescriptor; 
import org.apache.hadoop.hbase.HConstants; 
import org.apache.hadoop.hbase.HTableDescriptor; 
import org.apache.hadoop.hbase.TableName; 
import org.apache.hadoop.hbase.client.Admin; 
import org.apache.hadoop.hbase.client.BufferedMutator; 
import org.apache.hadoop.hbase.client.Connection; 
import org.apache.hadoop.hbase.client.Put; 
import org.apache.hadoop.hbase.client.Result; 
import org.apache.hadoop.hbase.client.ResultScanner; 
import org.apache.hadoop.hbase.client.Scan; 
import org.apache.hadoop.hbase.client.Table; 

public class WriteReadTest { 
    private static final String PROJECT_ID = "<YOUR_PROJECT_ID>"; 
    private static final String INSTANCE_ID = "<YOUR_INSTANCE_ID>"; 
    private static final String TABLE_ID = "<YOUR_NONEXISTENT_TABLE>"; 
    private static final String FAMILY = "cf"; 

    private static final TableName TABLE_NAME = TableName.valueOf(TABLE_ID); 

    public static void main(String[] args) throws IOException { 
    try(Connection connection = BigtableConfiguration.connect(PROJECT_ID, INSTANCE_ID); 
     Admin admin = connection.getAdmin()) { 

     // Setup 
     admin.createTable(
      new HTableDescriptor(TABLE_NAME) 
       .addFamily(new HColumnDescriptor(FAMILY)) 
    ); 

     try { 
     // Write the rows 
     populateTable(connection, 2_000_000); 

     // Read the rows 
     readFullTable(connection); 
     } finally { 
     admin.disableTable(TABLE_NAME); 
     admin.deleteTable(TABLE_NAME); 
     } 

    } 
    } 

    private static void populateTable(Connection connection, int rowCount) throws IOException { 
    long startTime = System.currentTimeMillis(); 
    int buckets = 100; 
    int maxWidth = Integer.toString(buckets).length(); 

    try(BufferedMutator bufferedMutator = connection.getBufferedMutator(TABLE_NAME)) { 
     for (int i = 0; i < rowCount; i++) { 
     String prefix = String.format("%0" + maxWidth + "d", i % buckets); 
     String key = prefix + "-" + String.format("%010d", i); 
     String value = "value-" + key; 

     Put put = new Put(key.getBytes()) 
      .addColumn(
       FAMILY.getBytes(), 
       HConstants.EMPTY_BYTE_ARRAY, 
       value.getBytes() 
      ); 

     bufferedMutator.mutate(put); 
     } 
    } 

    long endTime = System.currentTimeMillis(); 
    System.out.printf("Populated table in %d secs, writing %d rows\n", (endTime - startTime)/1000, rowCount); 
    } 

    private static void readFullTable(Connection connection) throws IOException { 
    long startTime = System.currentTimeMillis(); 

    int count = 0; 
    try(Table table = connection.getTable(TABLE_NAME); 
     ResultScanner scanner = table.getScanner(new Scan("0".getBytes(), "z".getBytes()))) { 

     for(Result row = scanner.next(); row != null; row = scanner.next()) { 
     count++; 
     } 
    } 

    long endTime = System.currentTimeMillis(); 

    System.out.printf("Scanned table in %d secs, reading %d rows\n", (endTime - startTime)/1000, count); 
    } 
} 
+0

私は5分の長いRPCタイムアウトを保持しているし、まだ取得DEADLINEがエラーを上回りました。ほんの100k行だけ? – Peter

+0

バッファチャンクサイズをチューニングする方法はありますか?小さすぎるかもしれない。 私の場合、クライアントはシンガポールと台湾のbigtableにいるので、1回の往復には50ミリ秒かかります – Peter

+0

Bigtableはまずサーバ上で結果全体を取得してからクライアントにストリームするためですか?私はその後、全体の結果セットを得るのは時間がかかるかもしれないと思いますか? – Peter

関連する問題