2017-11-17 43 views
0

PubSubトピックに固定数の文字列(テストに使用された800,000個の1KB)を配置し、次のApache Beam(2.1.0)ジョブをデータフロー、正確に一度のセマンティクスは期待どおりに保持されます。 (データフローのコンソールに示すように)、そして、再び出力ファイルをキックオフ同じジョブが実行された場合は、すべての要素の前に排水PubSubから読み込んでGoogle Cloud Storageに書き込むデータフロージョブ

import org.apache.beam.sdk.options.Description; 
import org.apache.beam.sdk.options.PipelineOptions; 

public interface PubSubToGsPipelineOptions extends PipelineOptions { 
    @Description("PubSub subscription") 
    String getInput(); 
    void setInput(String input); 

    @Description("Google Cloud Storage output path") 
    String getOutput(); 
    void setOutput(String output); 
} 

が読み込まれ

import org.apache.beam.sdk.Pipeline; 
import org.apache.beam.sdk.io.TextIO; 
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; 
import org.apache.beam.sdk.options.PipelineOptionsFactory; 
import org.apache.beam.sdk.transforms.windowing.FixedWindows; 
import org.apache.beam.sdk.transforms.windowing.Window; 
import org.joda.time.Duration; 

public class PubSubToGsSimpleJob { 

    public static void main(String[] args) { 
     PubSubToGsPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() 
       .as(PubSubToGsPipelineOptions.class); 
     Pipeline p = Pipeline.create(options); 

     p.apply(PubsubIO.readStrings().fromSubscription(options.getInput())) 
       .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1)))) 
       .apply(TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput())); 
     p.run(); 
    } 

} 

PipelineOptions実装の下にPubSubトピックに公開された元のデータセットよりも少ないレコードしか持っていません。これは、排水とジョブの交換は、が少なくとも1回セマンティクスを持つべきであると述べているので、奇妙に思われるデータ損失を引き起こす可能性があることを示唆している。このパイプラインは、ジョブを排水して交換するときに、少なくとも一度セマンティクス(またはより正確で一度セマンティクス)を達成するように設計する必要がありますか?

答えて

0

私は、ドレインと交換のジョブが残りのウィンドウで上書きされる前にウィンドウが部分的に書き込まれている可能性があります。このログ行の排水されたジョブと交換ジョブからワーカーログを確認することができますin WriteFiles。 Beam HEADを使用すると、最終的な宛先が上書きされたときにも記録されます。

概念的に排水されたジョブと置換ジョブは全く異なるパイプラインです。同じ出力場所を使用することは、他の無関係の2つのジョブに対して同じ出力場所を使用することと同じです。別の方法として、2番目のジョブに異なる出力パスを使用して、両方のディレクトリにすべてのレコードが存在することを確認することができます。

+0

これは、TextIO can/theのような場合をよりよく処理する必要があります(上記の推測が正しい場合)。出力パスを変更するとすべてのレコードが保存されるかどうか確認してください。これはDataflowの保証を確認するものです。 –

+0

ログパターンに一致する行が見つかりませんでした。別の出力に書き込むように置換ジョブを設定すると、少なくとも一度セマンティクスが生成されます。セマンティクスがソースとしてPubSubを使用して維持されていないと、正確に何らかの重複レコードが存在しました。 カスタムソースが正確に1回の配信を保証し、ソース側のバッファリングを提供する場合、ドレインと置換は正確に1回のセマンティクスを提供できます.PubSubメッセージはACKで応答する前にバッファリングできるため、 PubSubは正確に一度のセマンティクスを提供できますか? – JonSim

+0

回避策をご確認いただきありがとうございます。私はTextIOのバグを報告します。私はあなたが重複を見ていることに驚いています。この場合、PubSubは正確に一度のセマンティクスを提供する必要があります。あなたができる情報をいくつか提供することができれば非常に便利です(あなたができるjob_id ID)。私は確かに重複のソースにもっと掘り起こすことに興味があります。 –

関連する問題