2017-09-25 4 views
0

私はApache Beamを数日間使っています。私は作業中のアプリケーションを迅速に反復し、私が構築しているパイプラインにエラーがないことを確認したかったのです。火花ではsc.paralleliseを使用することができ、何らかのアクションを適用するときには、検査できる値が得られます。Apache Beamパイプラインからの出力を収集してコンソールに表示する

は同様に私は、Apacheビームについて読んでいたとき、私は私が実際にコンソールに結果を印刷したかった私たちはPCollectionを作成し、次の構文

with beam.Pipeline() as pipeline: 
    lines = pipeline | beam.Create(["this is test", "this is another test"]) 
    word_count = (lines 
        | "Word" >> beam.ParDo(lambda line: line.split(" ")) 
        | "Pair of One" >> beam.Map(lambda w: (w, 1)) 
        | "Group" >> beam.GroupByKey() 
        | "Count" >> beam.Map(lambda (w, o): (w, sum(o)))) 
    result = pipeline.run() 

を使用してそれに取り組むことを見出しました。しかし、私はその周りのドキュメントを見つけることができませんでした。

結果を毎回ファイルに保存するのではなくコンソールに出力する方法はありますか?

答えて

0

私のアプリケーション用のテストケースを作成する方法を理解した後、結果をコンソールに出力する方法を理解しています。私は現在、すべてのものを単一のノードマシンに実行しており、Apache Beamによって提供される機能を理解しようとしているのではなく、業界のベストプラクティスを損なうことなくどのように採用することができますか。

ここに私の解決策があります。当社のパイプラインの非常に最後の段階で、私たちは、私が知っている

import apache_beam as beam 

# lets have a sample string 
data = ["this is sample data", "this is yet another sample data"] 

# create a pipeline 
pipeline = beam.Pipeline() 
counts = (pipeline | "create" >> beam.Create(data) 
    | "split" >> beam.ParDo(lambda row: row.split(" ")) 
    | "pair" >> beam.Map(lambda w: (w, 1)) 
    | "group" >> beam.CombinePerKey(sum)) 

# lets collect our result with a map transformation into output array 
output = [] 
def collect(row): 
    output.append(row) 
    return True 

counts | "print" >> beam.Map(collect) 

# Run the pipeline 
result = pipeline.run() 

# lets wait until result a available 
result.wait_until_finish() 

# print the output 
print output 
0

はそれがにISN値を参照するために、後に我々は変数を印刷することができ、コンソールに結果を印刷または変数に結果を蓄積するマップ機能を導入することができますあなたが求めたものだが、なぜそれをテキストファイルに保存しないのか?それはstdoutで印刷するよりも常に優れており、揮発性ではありません

関連する問題