2017-06-19 13 views
0

WindowsWordCount exampleプログラムのソースをテキストファイルからクラウドPub/Subに変更しました。シェイクスピアファイルのデータをPub/Subに公開しましたが、これは正しくフェッチされましたが、.groupByKey以降の変換は動作していません。Scio:コレクションソースとしてPub/Subを使用しているときにgroupByKeyが機能しない

sc.pubsubSubscription[String](psSubscription) 
    .withFixedWindows(windowSize) // apply windowing logic 
    .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty)) 
    .countByValue 
    .withWindow[IntervalWindow] 
    .swap 
    .groupByKey 
    .map { 
    s => 
     println("\n\n\n\n\n\n\n This never prints \n\n\n\n\n") 
     println(s) 
    } 

答えて

2

テキストファイルからPubSubへの入力を「無制限」に変更します。キーでグループ化するには、集約トリガーを定義する必要があります。それ以外の場合、グルーパーは永遠に待機します。それは、ここではデータフローのマニュアルに記載されます: https://cloud.google.com/dataflow/model/group-by-key

注:非グローバルウィンドウイングまたは凝集トリガいずれかが無制限PCollectionにGroupByKeyを実行するために必要とされます。これは、限定されたGroupByKeyが特定のキーを持つすべてのデータを収集するのを待たなければならないためです。無制限のコレクションでは、データは無制限です。ウィンドウ処理および/またはトリガを使用すると、無制限のデータストリーム内の論理的な有限のデータバンドルでグループ化を操作できます。

非グローバルウィンドウ戦略、トリガー戦略、またはその両方を設定せずにGroupByKeyを無制限のPCollectionに適用すると、パイプラインの構築時にDataflowによってIllegalStateExceptionエラーが生成されます。

残念ながら、ApacheのビームのPythonのSDKにトリガを(まだ)をサポートしていないようですので、私は解決策がPythonでどうなるかわかりません。

上記フランツさんのコメントに関しては

1

https://beam.apache.org/documentation/programming-guide/#triggersを参照してください)(StackOverflowのが私を聞かせている場合、私は!、特に彼のコメントに返信します)私はドキュメントがトリガが実装されていないと言うことがわかり...しかし、彼ら現在のプロジェクトが積極的にそれらを使用している間、リアルタイムデータベース機能は利用できないとも言います。彼らはまったく新しいものです。

を参照してくださいここでは、トリガ機能:これは、「リリース・レディ」のコードではないようhttps://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/trigger.py

用心、APIは未完成です。しかしそれは利用可能です。

+1

これを答えに含めることができるようです。それは、あなたがそれを1つにしたいと言っているから、コメントとして出てくるからです。将来的には、質問に対するコメントの回答、規則に対する回答のコメント、回答がある場合はそれを投稿、回答しない場合は回答として投稿しないでください。この投稿の文言は、削除の可能性のある候補になります。 – snb

関連する問題