2017-10-29 17 views
0

ほとんどのスクリプトは次のような処理をしています。私の処理をwithingグループ化、一切の集約がないためスパークの使用s3から読み込み、s3から書き込むことはできますか?

spark.read().csv("s3://") 
    .filter(..).map(...) 
    .write().parquet("s3://"); 

火花を指定する方法は、私はメモリ内で行われたすべてのこの仕事をしたいということ、ありますか?これは、ディスクにまったく触れないレコードストリームプロセッサーによる単純な記録でなければなりません。

+0

あなたの質問を明確にすることはできますか? s3のファイルから読み込んで出力をs3に書き出すと、常にディスクの処理が行われます。あなたのフィルタとマップ関数はすでにメモリ上で実行されています(少なくとも、sparkがどのようにメモリを効率的に使用するかについては、少なくとも)。 – buubovich

+0

いいえこれは、ネットワーク処理からメモリ処理に直接移動し、ネットワークに戻る必要があります。ちょうどあなたがLinuxのパイプで行うことができるように。あなたがs3を読み書きするだけで、ディスクに触れなければならないわけではありません。 – ForeverConfused

+0

このシナリオでディスクに何かが格納されていると思いますか? –

答えて

1

EMRとそのs3コネクタは話せません。私はApache Hadoop自体とS3Aコネクタについて話すことができます

S3にアップロードする前に生成されたデータをバッファリングする必要があります。大きなファイルの場合は、アップロードを4GBのファイルに分割する必要があり、アップロードのサイズが小さい場合でも、生成するアプリケーションの一般的な条件に対処する必要があるため、stream()に続けてclose()を実行することはできませんデータをS3にアップロードするよりも高速に処理できます。

一時的なローカル一時記憶域を使用すると、S3アップロード帯域幅が処理できるデータよりも速くデータを生成でき、ブロックを再送信することでネットワークエラーに対処できます。

Apacheオリジナルのs3:およびs3n:クライアント(およびHadoop 2.8以前のs3a)はすべて、アップロードを開始する前にファイル全体をHDDに書き込みました。必要なストレージは、生成された#ofバイトと同じで、close()でのみアップロードされたので、そのクローズコールの時間はデータ/帯域幅です。

Hadoop 2.8以降のS3Aは、データが単一ブロック(5MB以上、デフォルト64MB)のサイズにバッファリングされる高速アップロード(オプション2.8+、自動3.0)をサポートします。サイズに達しました。これにより、より速い書き込みが可能になり、十分な帯域幅でclose()遅延(max:last-block-size/bandwidth)はほとんどありません。ヒープバイト配列またはオフヒープバイトバッファで使用するように構成することもできますが、生成速度とアップロード速度の不一致に対処するためにはまだストレージが必要です。これを行うと、メモリ割り当てとキューサイズについて非常に注意深く再生する必要があります。待機中のアップロードのキューが十分に大きいときにライターをブロックするようにクライアントを設定する必要があります。

更新 Johnathan Kelly @ AWSは、ブロックバッファごとに同じことを行い、ASF S3Aコネクタとしてアップロードすることを確認しました。これは、データの生成速度をバイト/秒で表した場合に、< = VMからアップロード帯域幅が必要な場合、必要なローカルディスクの量は最小です...データを高速に生成すると、さらに多くのデータが必要になりますまたはジェネレータスレッドをブロックするためにいくつかのキュー制限に達する)。私は、実際の帯域幅の数字をいつも見積もるつもりはないが、それはいつも前年比で改善しており、どんな声明もすぐに時代遅れになるだろう。その理由から、信じる前にベンチマークのポストの年齢を見てください。独自の作業負荷で独自のテストを行います。

+1

FWIW、EMRのS3コネクタ(別名EmrFS)もこれを行います。 (出力は、パーツサイズに達すると複数のパーツに分割され、ローカルストレージの一時ファイルに保存され、同時にS3にアップロードされます)。 –

関連する問題