2017-10-10 32 views
0

beam.Filterの一部として使用される単語のリストを含むGCSに格納された.txtファイルがある場合、このリストはApacheのビームパイプライン内で動的にアクセスできますか?私はこのリストをパイプライン内のグローバル変数として定義できることを知っていますが、ファイル全体をリストに読み込む方法と、これを達成するためのビームトリックがあるかどうかはわかりません。助言がありますか?ここで私は次のエラーを取得動作していない私の現在の実装..Google Cloud Dataflowアクセスクラウドストレージの.txtファイル

def boolean_terms(word, term_list): 
    if word in term_list: 
    return (word, 1) 
    else: 
    return (word, 0) 

# side table 
filter_terms = p | beam.io.ReadFromText(path_to_gcs_txt_file) 

words = ... 

filtered_words = words | beam.FlatMap(lambda x: 
    [boolean_terms(word, filter_terms) for word in x]) 

ある「例外TypeError:型の引数を 『_InvalidUnpickledPCollection』反復可能ではありません」

答えて

3

あなたはside inputとして単語のリストにアクセスすることができます。 beam.Filterトランスフォームは、そのリンクの例では、FlatMapParDoというまったく同じ方法で、フィルタ関数からのサイド入力の使用をサポートしていると思います。

のような何か:

words | beam.Filter(lambda x, filter_terms: word in filter_terms, 
        filter_terms=pvalue.AsList(p | beam.io.ReadFromText(path))) 
+0

ありがとう!私は私が近くにいると思うが、それはまだ私のために働いているようには見えない。何か不足していますか? – reese0106

+0

ああ、私はそれを理解したと思う - これを正しく動作させるために 'pvalue.AsList(filter_terms)'を追加する必要があった – reese0106

関連する問題