5

タスクマネージャ - > SQS - >スクレイパーワーカー(私のアプリ) - > AWS Firehose - > S3ファイル - > Spark - >(?)Redshiftのパイプラインを作成しました。SparkでインクリメンタルなS3ファイルを処理する方法

私が解決しようとしていますいくつかのものは/改善し、私が指導のために幸せになる:

  1. スクレーパーが潜在的に重複データを取得し、スパークでDUPのにつながる消防ホース、再びそれらをフラッシュすることができます。私の計算を開始する前にDistinct関数を使って火花の中でこれを解決するのでしょうか?
  2. 私はS3処理ファイルを削除しないので、データは大きくなり続けます。これは良い練習ですか? (入力データベースとしてs3を持っている)あるいは、スパークが終了した後に各ファイルを処理して削除しますか?現在、私はsc.textFile("s3n://...../*/*/*")を行っています。これは、すべてのバケットファイルを収集し、計算を実行します。
  3. 結果をRedshift(またはs3)に配置するにはどうすればよいですか?つまり、s3がますます大きくなると、赤方偏移はデータを重複してしまいます...私はいつも前にそれをフラッシュするでしょうか?どうやって?
+0

処理対象の要素用のバケットを持つことができます。プッシュされたバケットは別のバケットに移動し、必要に応じてコピーを保持しますが、2度目は再処理しません –

答えて

0

私はこれらの問題が単一のパイプラインではなく発生しています。ここに私がしたことがあります。

重複

削除

  1. 。ローカル複製を削除するにはBloomFilterを使用しました。ドキュメントは比較的不完全ですが、Bloomフィルタオブジェクトを簡単に保存/ロード/結合/交差できます。フィルター上でreduceを行うこともできます。

    b。 SparkからRedShiftに直接データを保存する場合、現在のバッチのBloomFilterを更新してブロードキャストし、フィルタリングして全体的に重複がないようにするために時間と労力を費やす必要があります。 RDSでUNIQUE制約を使用してエラーを無視する前に、残念ながらRedShift does not honour the constraint

  2. と3.データが大きく

を取得私は(小さなログファイルの多くは通常があるので、インパクトスパークのパフォーマンスを)&マージデータを移動するためにs3-dist-cp commandを実行するために、EMRクラスタを使用。 EMRを使用してSparkクラスタをホストする場合は、分析の前にステップを追加してデータをあるバケットから別のバケットに移動してください。ステップは、カスタムジャーとしてcommand-runner.jarを取り、コマンドが元distcpがマージファイルをサポートしていないことを

s3-dist-cp --src=s3://INPUT_BUCKET/ --dest=s3://OUTPUT_BUCKET_AND_PATH/ --groupBy=".*\.2016-08-(..)T.*" --srcPattern=".*\.2016-08.*" --appendToLastFile --deleteOnSuccess 

ノートのように見えます。

一般に、処理されたデータと処理されていないデータを同じバケット(または少なくともパス)で一緒に使用しないようにする必要があります。

関連する問題