インキュベータビームリポジトリのword_counting.pyの例(Dataflowドキュメントからリンクされています)を見ていましたが、これを修正して最も多く出現したのはです。ここに私のパイプラインは次のとおりです。Dataflow:Python SDKでトップモジュールを使用する:単一要素のPCollection
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
| 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c) # 'top' is the only added line
output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
私はTop.Of()メソッドを使用して行を追加したが、(私が注文したPCollectionを待っているが、見ていた、それは単一の要素として配列でPCollectionを返すようですPCコレクションが順序付けられていないコレクションであると思われる文書
パイプラインが実行されると、ビーム全体が1つの要素(配列全体)と 'format'でビームループされます。ラムダ関数はエラーを発生させます。配列全体をタプルにマップする(単語、c)
このシングルをどのように扱うべきか-elementこのステップでパイプラインを中断することなくPCollection?