2017-12-07 12 views
0

DataSet APIを使用して、flink-connectors(flink-hbase_2.11)のhbase TableInputFormatでFlink 1.3.2を使用しています。Hbaseテーブル入力形式のFlink DataSet API - 複数行の読み込み

Iは次のようにrowkeysがstucturedさHBaseのテーブルを有する:

| RowKey  | data | 
| 0-someuniqid | data | 
| 0-someuniqid | data | 
| 2-someuniqid | data | 
| 2-someuniqid | data | 
| 4-someuniqid | data | 
| 5-someuniqid | data | 
| 5-someuniqid | data | 
| 7-someuniqid | data | 
| 8-someuniqid | data | 

テーブルの接頭辞(これはHBaseのノードにホットスポットを防止するためである)0〜9であることができるが。私のテストテーブルでは誰もこのテーブルに書きません。私は、フォームの仕事を持っている

:多数のレコードが読み込まれたときに

tableInputFormat0 = new TableInputFormat("table", 0); 
tableInputFormat1 = new TableInputFormat("table", 1); 
... 
tableInputFormat9 = new TableInputFormat("table", 9); 


tableInputFormat0.union(tableInputFormat1).(...).union(tableInputFormat9) 
       .map(mapFunction()) 
       .rebalance() 
       .filter(someFilter()) 
       .groupBy(someField()) 
       .reduce(someSumFunction()) 
       .output(new HbaseOutputFormat()); 

問題は(2000万記録前後)で、ジョブは常にレコードの同じ量を読み取ることはありません。

ほとんどの場合、(正しく)20,277,161行が読み込まれます。しかし、たぶん20,277,221または20,277,171は常にそれほど少なくない。 (私はflink Webダッシュボードでこの数値を取得していますが、何が書かれているのか分かりません)。

小さなデータセットを使って問題を解決することはできません5百万レコードというテーブルに対してジョブを実行しているときには起こりません。ボリュームのために何回レコードが何度読み取られるのかを特定するのは難しいです。

この問題をどのようにデバッグ(および解決)できますか?

答えて

1

TableInputFormatは抽象クラスであり、サブクラスを実装する必要があります。

私は二つのことだろう:各入力分割は一度だけ処理されること

  • チェックを(この情報はJobManagerログファイルに書き込まれます)
  • 放出されたレコードの数をカウントするために、あなたの入力フォーマットを適応させます入力分割ごとにレコード数と分割IDは、(TaskManager)ログに書き込まれる必要があります。

これは問題が原因1つ(または複数)

  • であるかどうか、識別するのに役立つはずによる分割を処理するコードのバグに一度又は
  • 以上に割り当てられる分割します。
関連する問題