1

DataflowでビルトインデータフローAPIを使用してBigtableからBigqueryにデータをインポートするジョブがあります。Bigtable-BigQuery DataFlow経由でのインポート:テーブル分割とタイムスタンプに関する2つの質問

質問1:ソースデータがBigtableの中で一つの大きなテーブルにある場合は、どのように私が言う、与えられた、に基づいて動的にBigQuery内のサブまたは小さいテーブルのセットにそれを分割することができます私は2つの質問がありますBigtableの行キーは実行時にのみ認識されますか?

データフロー内のJavaコードは次のようになります。だから、

p.apply(Read.from(CloudBigtableIO.read(config))) 
     .apply(ParDo.of(new SomeDoFNonBTSourceData())) 
     .apply(BigQueryIO.Write 
        .to(PROJ_ID + ":" + BQ_DataSet + "." + BQ_TableName) 
        .withSchema(schema) 
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) 
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); 
     p.run(); 

BQ_TableNameはコードレベルで供給する必要があるため、どのように私はそれがプログラムのように、SomeDoFNonBTSourceDataの内側に見られるものに基づいて提供することができます現在のRowKeyの値の範囲? rowKeyは「交流」であるならば、TableAの、「DF」をテーブルB、などの場合

質問2:最終的には人間が読める形式でそれを再構築するようにBigQueryにBigtableのタイムスタンプをエクスポートする正しい方法は何ですかBigQueryで? DoFn内

processElement機能は次のようになります。

List<TableFieldSchema> fields = new ArrayList<>(); 
    fields.add(new TableFieldSchema().setName("ColA").setType("STRING")); 
    fields.add(new TableFieldSchema().setName("TimeStamp").setType("TIMESTAMP")); 
    schema = new TableSchema().setFields(fields); 

だから、Bigtableのタイムスタンプ:

public void processElement(ProcessContext c) 
{ 
    String valA = new String(c.element().getColumnLatestCell(COL_FAM, COL_NAME).getValueArray()); 
    Long timeStamp = c.element().getColumnLatestCell(COL_FAM, COL_NAME).getTimestamp(); 

    tr.put("ColA", valA); 
    tr.put("TimeStamp",timeStamp); 
    c.output(tr); 
} 

、パイプラインの建設中、タイムスタンプ列のBQスキーマの設定は次のようになりますLongのようですが、BQの宛先TimeStamp列には"TIMESTAMP""INTEGER"の両方のタイプを試しています(BQではLongがないようです)。最終的には、BQのTimeStamp列を 'order by'句と人間が読める形式(日付と時刻)で表示する必要があります。 'order by'部分はうまくいくようですが、最終結果を意味のあるものにキャストすることはできませんでした。キャストエラーやまだ読めないものがあります。

答えて

2

ここで、質問1と同様の問題への回答を探しています:)。 2番目の質問については

、私はあなたにロングタイムスタンプが実際にUNIXタイムスタンプであることを確認する最初の必要性を考えて、私はいつもBQが変換されずにタイムスタンプとしてそれを摂取できると仮定しました。あなたはこれを試すことができ

しかし...

Long longTimeStamp = 1408452095L; 

Date timeStamp = new Date(); 
timeStamp.setTime(longTimeStamp * 1000); 

tr.put("TimeStamp", timeStamp.toInstant().toString()); 
+0

申し訳ありませんがそこにそれが日付のどのタイプですか? java.util.Dateに '.toInstant()'がありません.Google APIのDateにはこれらのメソッドがありません。 –

+0

あなたの言語レベルを「8-lambdas、type annotation etc」に変更してみてください。 Intellijでプロジェクト構造に行き、モジュールを選択して言語レベルを変更します。私はEclipseでこれを行う方法についてはあまり知らない。 – Ken

+0

ああ、あなたが正しいです - 私はそこにその文書を参照してください。私のEclipseで何かあるはずです。それを理解するだろう。私が質問1で何かを見つけたらここに投稿します... –

関連する問題