PubSubトピックに固定数の文字列(テストに使用された800,000個の1KB)を配置し、次のApache Beam(2.1.0)ジョブをデータフロー、正確に一度のセマンティクスは期待どおりに保持されます。 (データフローのコンソールに示すように)、そして、再び出力ファイルをキックオフ同じジョブが実行された場合は、すべての要素の前に排水PubSubから読み込んでGoogle Cloud Storageに書き込むデータフロージョブ
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
public interface PubSubToGsPipelineOptions extends PipelineOptions {
@Description("PubSub subscription")
String getInput();
void setInput(String input);
@Description("Google Cloud Storage output path")
String getOutput();
void setOutput(String output);
}
が読み込まれ
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
public class PubSubToGsSimpleJob {
public static void main(String[] args) {
PubSubToGsPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(PubSubToGsPipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.readStrings().fromSubscription(options.getInput()))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()));
p.run();
}
}
PipelineOptions
実装の下にPubSubトピックに公開された元のデータセットよりも少ないレコードしか持っていません。これは、排水とジョブの交換は、が少なくとも1回セマンティクスを持つべきであると述べているので、奇妙に思われるデータ損失を引き起こす可能性があることを示唆している。このパイプラインは、ジョブを排水して交換するときに、少なくとも一度セマンティクス(またはより正確で一度セマンティクス)を達成するように設計する必要がありますか?
これは、TextIO can/theのような場合をよりよく処理する必要があります(上記の推測が正しい場合)。出力パスを変更するとすべてのレコードが保存されるかどうか確認してください。これはDataflowの保証を確認するものです。 –
ログパターンに一致する行が見つかりませんでした。別の出力に書き込むように置換ジョブを設定すると、少なくとも一度セマンティクスが生成されます。セマンティクスがソースとしてPubSubを使用して維持されていないと、正確に何らかの重複レコードが存在しました。 カスタムソースが正確に1回の配信を保証し、ソース側のバッファリングを提供する場合、ドレインと置換は正確に1回のセマンティクスを提供できます.PubSubメッセージはACKで応答する前にバッファリングできるため、 PubSubは正確に一度のセマンティクスを提供できますか? – JonSim
回避策をご確認いただきありがとうございます。私はTextIOのバグを報告します。私はあなたが重複を見ていることに驚いています。この場合、PubSubは正確に一度のセマンティクスを提供する必要があります。あなたができる情報をいくつか提供することができれば非常に便利です(あなたができるjob_id ID)。私は確かに重複のソースにもっと掘り起こすことに興味があります。 –