2016-10-31 5 views
0

私は、CloudStream DataflowにPCollectionをそのまま書き込んでいます。彼らのインデックスによって最初のコレクションのアイテムを参照する別のコレクションを構築したいと思います。例えばDataFlowでのPCollectionのインデックス付け

PC1:

strings go here 
some other string here 
more strings 

PC2:

0,1 
1,1 
0,2 

私は全体のパイプラインを書き込み、別の起動することなく、PC1内のインデックスを取得する方法がわからないんだ、とさえ私にはわからないんだけど読み込まれている行/レコード番号のレコードを保持する方法。単純に静的変数を使用するのは安全ですか?私は、プラットフォームの一般的な並列性に基づいていないと仮定します。

+0

インデックス化されたコレクションで何を行う予定ですか?おそらく、0からNの範囲の密な数値IDではなく、一意のIDを生成するだけで達成できます。 – jkff

+0

(密度の高い数値IDを生成することも可能ですが、計算量が多く、必要でない可能性もあります)。 – jkff

+0

ファイルに書き込むつもりです。このデータを作成するシステムには2つのファイルが必要です.1つは各行にレコードを持つテキストファイルで、もう1つは他のファイルのレコードの行番号を表す整数のペアです。 –

答えて

1

PCollection年代は、本質的に順序付けられていないので、 『コレクション内のアイテムのインデックス』のようなものはありません - しかし、あなたは、要素自体に行番号を含めることができます。PC1は整数であるPCollection<KV<Integer, String>>もいます行番号 - 基本的に行番号と対になっているテキストファイルから行を読み込みます。

現在のところ、これを行う組み込みソースを提供していません - ファイル名を入力にしてIOChannelFactoryを使用して行を1行ずつ読み込んでemitする単純なDoFn<String, KV<Integer, String>>を入力することをお勧めします内容は行番号でPC1になります。

関連する問題