0

Beam(Dataflow 2.0.0)では、PubSubトピックを読み込み、トピックからのメッセージに基づいてBigtableから少数の行をフェッチしようとしています。 Beamのドキュメントからpubsubメッセージに基づいてBigTableをスキャンする方法が見つかりませんでした。私は、ParDo関数を書いてビームパイプラインにパイプすることを試みましたが、無駄でした。Google PubSubから読み込み、PubSubメッセージに基づいてBigtableから読み込みます。トピック

BigTableIOには読み込みのオプションがありますが、これはパイプラインの外にあり、私のユースケースとして水蒸気で動作するかどうかはわかりません。

これはPubSubをストリーミングする場合のように実行可能であり、メッセージの内容に基づいてBigTableを読むことができます。

P.S:Beam 2.0でJava APIを使用しています。

PCollection<String> keyLines = 
       pipeline.apply(PubsubIO.readMessagesWithAttributes() 
       .fromSubscription("*************")) 
       .apply("PubSub Message to Payload as String", 
        ParDo.of(new PubSubMessageToStringConverter())); 

ここで、keyLinesがBigTableをスキャンするための行キーとして機能するようにします。 BigTableのコードスニペットを使用しています。私は 'RowFilter.newBuilder()'と 'ByteKeyRange'を見ることができますが、どちらもストリーミング方式ではなくバッチモードで動作するようです。

pipeline.apply("read", 
       BigtableIO.read() 
        .withBigtableOptions(optionsBuilder) 
        .withTableId("**********"); 

    pipeline.run(); 

助言してください。

答えて

1

ParToでBigTableを読むことができるはずです。 Cloud Big TableまたはHBase APIを直接使用する必要があります。 DoFn(example)の@Setupメソッドでクライアントを初期化する方が良いです。それが動作しない場合は、詳細を投稿してください。

+0

ありがとうございました。コードスニペットを追加しました。あなたが明確にすることができればもっと良いでしょう。 –

+0

あなたは 'BigtableIO.read()'ソースをそのように使うことはできません。あなたの場合、それぞれの 'keyLine'は検索する単一のキーを提供するのでしょうか、それともBigtableIO.read()を使ってスキャンする可能性のある潜在的に大きなスキャンを表していますか?前者の場合、 'ParDo'の明示的なbigtableルックアップが良いです(Bigtable APIを直接使用しています)。後でBeamでソース用の新しいAPI([Splittable DoFn](https://s.apache.org/splittable-do-fn)))を使用して実行できますが、まだあなたのユースケースを処理する準備が整った実装はありませんつまり、各行ごとにBitatbleリーダーを起動します)。 –

関連する問題