2017-10-30 3 views
0

s3に格納された一連のテキストファイルに対して、ファイルごとに1行のスパークRDDを作成しようとしています。テキストファイルの内容を取得するためにAmazonS3のインスタンスを作成してこれを行います。それはスパークのマップ機能で使用できるように、私は、このS3クライアントの周りにシリアライズ可能なラッパーを作成しました:すべてのマップ呼び出しでオブジェクトを逆シリアル化する

class SerializableAmazonS3 implements java.io.Serializable { 

    public transient AmazonS3 client; 
    public AmazonS3 create() 
    { 
     AmazonS3ClientBuilder builder = AmazonS3Client.builder().withRegion(REGION).withCredentials(new ProfileCredentialsProvider()); 
     return builder.build(); 
    } 

    private void readObject(ObjectInputStream ois) { 
     this.create(); 
    } 
} 

私がいる問題は、火花が機能(行ごとに毎回readObjectを呼び出すことです

答えて

0

map()の変換はテキストファイル内の行ごとに実行されますが、あなたのソースではmapPartition()を使用していますが、この変換はパーティションごとに1回呼び出されます。また、特定のケースでは、coalesce(num_of_part ition)と指定するだけで多くのパーティションが作成され、このステップの後でmapPartition()を呼び出すことができます。

はそれが役に立てば幸い:

mapPartitionは()の仕組み

は、以下を参照してください:

http://www.mishudi.com/general-spark-transformation/#mapPartitions

0

はMapReduceのとは異なり、スパーク地図セットアップのクリーンアップとマップのフェーズを提供しません。

カスタムロジックを適用してセットアップとクリーンアップを実行する必要があります。

少数の論理が(ちょうど削減地図のセットアップ、マップとクリーンアップを模倣しようとして)されている

  1. は、tryキャッチループを持って、最後のブロックでcleaup呼び出します。シャットダウンフックも同じ
  2. ブールのためのトライキャッチチェックの最初の行は、変数(最初はfalseに設定)を初期化トライキャッチでマップの呼び出しを持ってacheieveするための代替であると設定メソッドを呼び出し、 を設定し、この変数をtrueに設定します。

マップタスクのSpark writing to Aerospikeの例をご覧ください。

内部マップタスクでは、sparkはエアロシーククライアントを初期化し、入力行ごとに何らかのアクションを行います。マップタスクが完了したときに接続を閉じます。

Sparkのすべてのタスクはスレッドでありプロセスではないので、静的変数を賢明に使用するように注意してください。

0

Hadoop/EMR S3ファイルシステムクライアントを使用する場合は、FSインスタンス&のキャッシュ機能を利用できるため、(URI、ユーザー)ごとに1つのFSインスタンス(およびAWS s3クライアント)ペア。 AWSライブラリ&は、低コストのオブジェクトインスタンスではなく、正しく動作するスレッドプールを必要とするため重要です。

+0

S3ファイルシステムクライアントの使用に関するいくつかのパフォーマンス上の問題があると聞いていました。特に小さなファイルをたくさんダウンロードすることで。 [例についてはこちらを参照](http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219) –

+0

私はそれを打つつもりです。 –

+0

小さなファイルの場合、GETを介してファイルを開くために費やされた時間>読み込み時間。 0.4c/GETのようなものも請求されますので、何千もの小さなファイルは必要ありません。パーティション化可能なファイル形式にマージすることをお勧めします。 Hadoop harは処理可能な.tarファイルのようなものですが、Spark&S3でやったことはありません –

関連する問題