DRPCを使用してHBaseからデータを読み取るためのStormトポロジを作成しています。基本的には、データを取得するスキャンを実行し、データを豊かにして返します。DRPCで失敗するストームhbaseのボルト
私は簡単に(http://storm.apache.org/releases/current/Distributed-RPC.htmlに基づいて)働く基本的なDRPCの例を得ることができます。しかし、スキャンのコードを挿入すると、処理に非常に時間がかかります。 backtype.storm.daemon.drpc $ service_handler $ reify__8688.failRequest(drpc.clj:136)で
backtype.storm.generated.DRPCExecutionException
:〜[storm-分後、私は次のエラーを取得しますコア-0.10.0.2.4.2.0-258.jar:0.10.0.2.4.2.0-258]
backtype.storm.drpc.DRPCSpout.fail(DRPCSpout.java:241)〜[storm-core -0.10.0.2.4.2.0-258.jar:0.10.0.2.4.2.0-258
少し後で、私はorg.apache.hadoop.hbase.client.RetriesExhaustedExceptionを取得します。これは常に起こるとは限りませんが、非常に一般的です。これに基づく私の前提は2つの可能性のうちの1つです:
スキャンがタイムアウトしています。ただし、HBase ShellまたはRESTによるスキャンの実行は1秒未満で終了します テーブルが矛盾しているため、特定の領域が欠落しています。私はhbase hbckを実行しており、0の矛盾を示しています。 私はHBaseへの接続が問題ないことを知っています:デバッグ出力を追加し、ボルトが結果を取得しました。ただし、DRPCExecutionExceptionのために、これらの結果はDRPCで返されることはありません。
私はDRPCタイムアウトが問題でしたが、DRPCタイムアウトを大幅に増やしました。同じ時間で同じ結果が得られました。グーグルでは、私は同じ問題([Storm][DRPC] Request failed)の他の誰かを見つけましたが、これを修正する方法の兆候はありません。参考のため
私は以下の私のコードを追加してい:助けを事前に
try (Table table = HbaseClient.connection().getTable(TableName.valueOf("EPG_URI")))
{
List<Filter> filters = new ArrayList<>();
String startRowString = "start";
String endRowString = "end";
RowFilter startRow = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryPrefixComparator(startRowString.getBytes()));
filters.add(startRow);
RowFilter endRow = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryPrefixComparator(endRowString.getBytes()));
filters.add(endRow);
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, filters);
Scan scan = new Scan();
scan.addFamily("f1".getBytes());
scan.setFilter(filterList);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner)
{
hbaseValues.add(result);
}
}
}
感謝を。