2017-09-16 11 views
3

私たちは3人の顧客を持っているとし、それぞれに同じ作業を並行して実行したいと考えています。ドライバから複数のスパークジョブを同時に実行する

def doSparkJob(customerId: String) = { 
    spark 
    .read.json(s"$customerId/file.json") 
    .map(...) 
    .reduceByKey(...) 
    .write 
    .partitionBy("id") 
    .parquet("output/") 
} 

我々は同時に(スパークドライバからの)このようにそれを実行します。

val jobs: Future[(Unit, Unit, Unit)] = for { 
    f1 <- Future { doSparkJob("customer1") } 
    f2 <- Future { doSparkJob("customer1") } 
    f3 <- Future { doSparkJob("customer1") } 
} yield (f1, f2, f3) 

Await.ready(jobs, 5.hours) 

は、私は、これは悪いアプローチであることを正しく理解していますか?多くのスパーク・ジョブは、エグゼクティブからお互いの文脈を押し出し、ディスクに溢れるデータが多数現れるようになります。どのようにスパークは並列ジョブからタスクを実行する管理ですか? 1人のドライバから3つの同時ジョブがあり、コアが1つのエグゼキュータが3人しかいない場合のシャッフルの表示方法。

良いアプローチは次のようになります: 私たちはすべての顧客のgroupByKeyすべてのデータをすべて読んで、私たちがしたいことをします。

答えて

2

これは悪いアプローチだと正しく理解していますか?

必ずしもそうである必要はありません。多くはコンテキストに依存し、Sparkはこのようなシナリオに対処するためにAsyncRDDActionsという独自のセットを実装しています(ただし、Dataset相当)。

最も単純なシナリオでは、静的割り当てでは、リソースの不足のためにSparkがすべてのジョブを順番にスケジュールするだけです。他に構成されていない限り、これは説明された構成で最も可能性の高い結果です。 Sparkは、複数の同時ジョブ間で限られたリソースを共有するために、FAIRスケジューラを使用したアプリケーション内スケジューリングを使用できます。 Scheduling Within an Applicationを参照してください。

複数のジョブを同時に開始するには十分な量のリソースがあれば、個々のジョブ間、特にIOとメモリ集約ジョブの競合が起こる可能性があります。すべてのジョブが同じリソース(特にデータベース)を使用している場合、Sparkがスロットリングとその後の失敗またはタイムアウトを引き起こす可能性があります。複数のジョブを実行することの影響がそれほど深刻でない場合は、キャッシュの削除を増やすことができます。

利用可能なリソース(Sparkクラスタおよび外部サービス)、APIの選択(RDDはSQLよりも貪欲になる傾向があるため、低レベルの管理が必要)とオペレータの選択。ジョブが逐次的であっても、ドライバの使用率を向上させ、レイテンシを短縮するために、非同期を使用することができます。これは、Spark SQLと複雑な実行計画(Spark SQLの一般的なボトルネック)で特に便利です。このようにして、Sparkは新しい実行計画をクランチし、他のジョブは実行されます。

関連する問題