2016-11-01 2 views
1

インキュベータビームリポジトリのword_counting.pyの例(Dataflowドキュメントからリンクされています)を見ていましたが、これを修正して最も多く出現したのはです。ここに私のパイプラインは次のとおりです。Dataflow:Python SDKでトップモジュールを使用する:単一要素のPCollection

counts = (lines 
     | 'split' >> (beam.ParDo(WordExtractingDoFn()) 
         .with_output_types(unicode)) 
     | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) 
     | 'group' >> beam.GroupByKey() 
     | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))) 
     | 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c) # 'top' is the only added line 

    output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) 
    output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)) 

私はTop.Of()メソッドを使用して行を追加したが、(私が注文したPCollectionを待っているが、見ていた、それは単一の要素として配列でPCollectionを返すようですPCコレクションが順序付けられていないコレクションであると思われる文書

パイプラインが実行されると、ビーム全体が1つの要素(配列全体)と 'format'でビームループされます。ラムダ関数はエラーを発生させます。配列全体をタプルにマップする(単語、c)

このシングルをどのように扱うべきか-elementこのステップでパイプラインを中断することなくPCollection?

答えて

1

あなたはこれらのイテレート可能オブジェクトの要素のPCollectionにイテラブルのPCollectionを展開したい場合は、引数結果の反復可能に構成要素から機能ですFlatMapを、使用することができますあなたの場合には、要素が自身イテレート可能オブジェクトですしたがって、恒等関数を使用します。

counts = ... 
     | 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c) 
     | 'expand' >> beam.FlatMap(lambda word_counts: word_counts) # sic! 

    output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) 
    ... 
関連する問題