2017-01-30 25 views
0

私はPubSubからのメッセージを聞いて、その後BigQueryからデータを読み込んでいるストリーミングジョブを持っています。データは、PubSUbから受信したデータを使用して照会されます。これは、クエリを動的に作成し、それをBigQueryIO.Read.fromQuery()関数に渡す必要があることを意味します。以下は、BigQueryからデータを読み込んでTableRowを返すコードですが、私のコードが読み込むデータを実行しているところでNullPointerExceptionを与えています。Google Cloud Dataflow BigQueryIO.Read nullポインタエラー

public class RequestDailyUsageTransform extends PTransform<PCollection<DailyUsageJob>, PCollection<TableRow>> { 

    private String mQuery; 

    private String mForDate; 
    private LocalDateTime billingDateTime; 

    @Override 
    public PCollection<TableRow> apply(PCollection<DailyUsageJob> input) { 

     TableReference tableReference = getRequestTableReference(); 

     return input 
       .apply(ParDo.named("RequestUsageQuery") 
         .of(new RequestUsageQueryStringDoFn())) 
       .apply(BigQueryIO.Read.named("RequestUsageReader") 
         .fromQuery(mQuery) 
         .from(tableReference).withoutValidation()) 
       .apply(ParDo.named("DailyRequestMapper").of(new DailyRequestMapperDoFn())) 
       .apply(ParDo.named("BillDailyRequestUsage") 
         .of(new DailyRequestsBillDoFn(mForDate, billingDateTime))); 
    }} 

私もBigQueryIO.Read.fromQueryでDoFn()関数で生成された文字列を渡す方法を知りたいと思いました。

+0

こんにちは、完全なstacktraceを提供することは可能でしょうか?また、どのバージョンのデータフローを使用していますか? DoFnで計算されたクエリを渡す場合。私はそれがデータフローモデルにうまく収まるとは思わない。あなたのユースケースをもう少し説明してください。なぜ実行時にクエリを構築したいのですか?要素/キーに基づいて特定の情報を検索しようとしていますか? BigQueryIOソースクエリは、すべてのキーでデータを検索し、そのデータがパイプラインを通過し、キーによって分割されます。各DoFnは、データの特定の断片/キーを操作します。 –

+0

@AlexAmato毎日、特定の時間にバックエンドアプリケーションがストリーミングジョブに通知を送信して、各ユーザーに対して何らかのタスクを実行します。各メッセージには、タスクが実行されなければならない時間とユーザーIDがあります。 私はメッセージ内で言及された時間の間に、そのユーザーのデータのみをクエリするクエリを動的に作成したいと考えています。 – ghost

+0

私はこの場合、最も良いことは、すべてのデータを照会する毎日のバッチ・ジョブを実行し、ユーザーIDによってキーが設定されることだと思います。これは、あなたが望むよりも多くのデータを取り込みますが、ユーザーごとの情報を見つけることができます。残念ながら、現在のところデータ依存の読み込みを実行する方法はありません –

答えて

0

私はこの場合、最も良いことは、すべてのデータを照会する毎日のバッチジョブを実行し、ユーザーIDによってキーが設定されていることだと思います。これは、あなたが望むよりも多くのデータを取り込みますが、ユーザーごとの情報を見つけることができます。 Unfortuately現在データ依存の読み込みを行う方法はありません

+0

毎日のジョブでテーブル全体を一度ロードし、次にuserIdに従ってGroupCellをグループ化することをお勧めしますか?また、私はシステム・ラグを考えますか? – ghost

+0

発生原因:java.io.IOException:クエリの実行を乾かしようとするとエラーが発生するSELECT DATETIME_TRUNC(DATETIME(REQUEST_TIME)、HOUR)AS TIME、CAST(COUNT(REQUEST_TIME)AS INT64)が要求されると、ROUND(SUM(CAST(EGRESS_IN_GB AS TIMESTAMP( '2017-01-31T00:00')とTIMESTAMP( '2017-01-31T23:59:59')の間でREQUEST_TIMEが発生する場所GROUP BY TIME、PROJECT_KEY ORDER時々刻々ASC.a これは静的クエリを実行しようとしたときに発生するエラーです – ghost

+0

このエラーは引き続き表示されますか? BigQueryのウェブページでクエリを実行できましたか?あなたは失敗した仕事の仕事のIDを持っていますか? BigQueryIO.Read.fromQueryを使用していますか? https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/BigQueryIO.Read.html#fromQuery-java.lang.String- –

関連する問題