私はPubSubからのメッセージを聞いて、その後BigQueryからデータを読み込んでいるストリーミングジョブを持っています。データは、PubSUbから受信したデータを使用して照会されます。これは、クエリを動的に作成し、それをBigQueryIO.Read.fromQuery()関数に渡す必要があることを意味します。以下は、BigQueryからデータを読み込んでTableRowを返すコードですが、私のコードが読み込むデータを実行しているところでNullPointerExceptionを与えています。Google Cloud Dataflow BigQueryIO.Read nullポインタエラー
public class RequestDailyUsageTransform extends PTransform<PCollection<DailyUsageJob>, PCollection<TableRow>> {
private String mQuery;
private String mForDate;
private LocalDateTime billingDateTime;
@Override
public PCollection<TableRow> apply(PCollection<DailyUsageJob> input) {
TableReference tableReference = getRequestTableReference();
return input
.apply(ParDo.named("RequestUsageQuery")
.of(new RequestUsageQueryStringDoFn()))
.apply(BigQueryIO.Read.named("RequestUsageReader")
.fromQuery(mQuery)
.from(tableReference).withoutValidation())
.apply(ParDo.named("DailyRequestMapper").of(new DailyRequestMapperDoFn()))
.apply(ParDo.named("BillDailyRequestUsage")
.of(new DailyRequestsBillDoFn(mForDate, billingDateTime)));
}}
私もBigQueryIO.Read.fromQueryでDoFn()関数で生成された文字列を渡す方法を知りたいと思いました。
こんにちは、完全なstacktraceを提供することは可能でしょうか?また、どのバージョンのデータフローを使用していますか? DoFnで計算されたクエリを渡す場合。私はそれがデータフローモデルにうまく収まるとは思わない。あなたのユースケースをもう少し説明してください。なぜ実行時にクエリを構築したいのですか?要素/キーに基づいて特定の情報を検索しようとしていますか? BigQueryIOソースクエリは、すべてのキーでデータを検索し、そのデータがパイプラインを通過し、キーによって分割されます。各DoFnは、データの特定の断片/キーを操作します。 –
@AlexAmato毎日、特定の時間にバックエンドアプリケーションがストリーミングジョブに通知を送信して、各ユーザーに対して何らかのタスクを実行します。各メッセージには、タスクが実行されなければならない時間とユーザーIDがあります。 私はメッセージ内で言及された時間の間に、そのユーザーのデータのみをクエリするクエリを動的に作成したいと考えています。 – ghost
私はこの場合、最も良いことは、すべてのデータを照会する毎日のバッチ・ジョブを実行し、ユーザーIDによってキーが設定されることだと思います。これは、あなたが望むよりも多くのデータを取り込みますが、ユーザーごとの情報を見つけることができます。残念ながら、現在のところデータ依存の読み込みを実行する方法はありません –