2016-09-22 7 views
1

私のプロジェクトでは、私のパイプラインで処理されるデータにいくつかのメタデータを追加しようとしています。メタデータは、src-folderの隣のresourcesというサブフォルダのDBFファイルにあります。Google DataflowでDataflowPipelineRunnerとして実行しているときにリソースファイルにアクセスする

src-folderにはメインクラスがあり、いくつかのパッケージ(IO、処理、集約、utils)があります。

パイプラインが定義されているメインクラスのメタデータでファイルを読み込んで処理します。次のように私は、ファイルにアクセスするために使用していたコードは次のとおりです。

File temp1 = new File("resources/xxx.dbf"); 

ファイルを使用して見つかった場合、私がチェック:

LOG.info(temp1.exists()) 

罰金実行されます。

PubSubIOを使用して読み込んだ文字列としてのメッセージがあります。このファイルの内容を使用して、キーと値を含むマップを埋めます。

Map<String, ArrayList<Double>> sensorToCoordinates = coordinateData.getSensorLocations(); 

私はその後、私が作った「SensorValues」と呼ばれるカスタムクラスの静的変数を設定します。

SensorValue.setKeyToCoordinates(sensorToCoordinates); 

SensorValueクラスに文字列からの着信メッセージを解析するとき、私は(パルド機能を使用して作られましたPCollectionからPCollectionに向かう)マップは、SensorValueクラスのコンストラクタで使用されます。

このコードをDirectPipelineRunnerを使用して実行すると、完璧に動作します。しかし、DataflowPipelineRunnerを使用して、SensorValueコンストラクタでマップにアクセスしようとすると、NullPointerExceptionが発生します。

DataflowPipelineRunnerを使用しているときにsetterが動作しないのはなぜですか(私はそれがいくつかのワーカーに分散された実行と関係していると推測しています)、そして何らかの静的リソースファイルあなたのパイプラインを豊かにする?

答えて

1

ParDoの実行が複数のワーカーに分散されているため、問題は正しいと思います。彼らはローカルファイルを持っていないし、マップの内容を持っていない可能性があります。

いくつかのオプションがここにあります

  1. は、GCS内のファイルを入れて、パイプラインは、ファイル(TextIOを使用するか、またはそのような何か)の内容を読んでいると、あなたにside-inputとしてそれを使用します後の処理。

  2. パイプラインのリソースにファイルを含めて、それを必要とするDoFnstartBundleにロードします(将来、すべてのバンドルよりも頻度を低くする方法があります)。

  3. マップの内容をDoFnの引数にシリアル化するには、そのクラスのコンストラクタに渡される非静的フィールドとして渡すことができます。

オプション1オプション2は、ファイルを取得する可能性が少ないネットワークトラフィックがある一方で(それは検索をバラバラにそれを分割してやってサポートすることができますので)このファイルのサイズが増加するとして優れています。オプション3は、シリアル化されたDoFnのサイズを大幅に増やすため、ファイルが極端に小さい場合にのみ機能し、データフローサービスに送信するジョブが大きくなる可能性があります。

+0

私は最初の解決策を試してみました。 私は 'PCollectionView >'を使用してキーを右の座標(View.asMap())にマップしました。 –

関連する問題