2017-03-29 8 views
1

私は、テキストファイルとmysqlの両方のレコードを読み込み、それらを照合しようとする単純なパイプラインを持っています。つまり、DBに存在しないレコードを挿入し、DBのレコードをファイルで更新し、ファイルに存在しないDB内のレコードに追加します。SparkにBeam Tasksを均等に分配する方法は?

スパークで2Mレコードを実行したときに生じる問題は以下の通りです:

enter image description here

私の勘では、次のコードは、ここではその不均衡

 final TupleTag<FileRecord> fileTag = new TupleTag<>(); 
     final TupleTag<MysqlRecord> mysqlTag = new TupleTag<>(); 
     PCollection<KV<Integer, CoGbkResult>> joinedRawCollection = 
       KeyedPCollectionTuple.of(fileTag, fileRecords) 
         .and(mysqlTag, mysqlRecords) 
         .apply(CoGroupByKey.create()); 

を生産していることであるスパークです実行者DAG可視化

enter image description here

最終的に、1人のワーカーはメモリ不足になります。私はSparkでネイティブに、作業者間で作業負荷を分散させるのに役立つPartitionersを指定できます。しかし、どのようにビームでそれを行うのですか?

EDIT:

私は、複数のPCollectionsにそれを分割し、後でそれらを平らになるようJDBCIoが正しく1つのクエリを配布することができなかったことを疑いました。私はMysqlからはるかに速く読んだが、最終的には同じ問題に遭遇した。 enter image description here

しかし、各ステージはまだそのアンバランスに苦しんでいる?: enter image description here

+0

実際には、この不均衡の原因は、多くのレコードを持つMySQLからの読んだステップだからです。 JDBCIOはおそらく1つのSELECTクエリを配布しないので、その競合を参照します。私はそれを分割しようとしましょう。 – nambrot

答えて

0

区別するために私自身の失敗の実現と自分の質問に答えるために:ここで

は、作業中している段階ですスパーク・ステージとタスクの間。タスクは実際に広がっていた、私は実際にドライバプログラムに十分なメモリを割り当てていない。

関連する問題