2017-12-19 12 views
1

は、私が(重要なステップを強調するためにフォーマットされた)このようなスパークパイプラインを持っていると仮定します。火花のパイプラインを分割する?

val foos1 = spark_session.read(foo_file).flatMap(toFooRecord) 
    .map(someComplicatedProcessing) 
    .map(transform1) 
    .distinct().collect().toSet 

私は似たパイプラインを追加している:

val foos2 = spark_session.read(foo_file).flatMap(toFooRecord) 
    .map(someComplicatedProcessing) 
    .map(transform2) 
    .distinct().collect().toSet 

その後、私は両方の結果で何かをします。

私はsomeComplicatedProcessingを2回実行しないようにしたいと思います(ファイルを解析するのは2度良いとは限りません)。

.map(someComplicatedProcessing)ステップの後にストリームを取得し、2つの並列ストリームを作成する方法はありますか?

私は中間結果をディスクに保存できるので、より多くのI/Oを犠牲にしてCPU時間を節約することができます。より良い方法がありますか?どのような言葉でウェブ検索しますか?

答えて

2

最初のオプション - cache中間結果:

val cached = spark_session.read(foo_file).flatMap(toFooRecord) 
    .map(someComplicatedProcessing) 
    .cache 
val foos1 = cached.map(transform1) 
    .distinct().collect().toSet 
val foos2 = cached.map(transform2) 
    .distinct().collect().toSet 

番目のオプション - RDDを使用して、単一パスを作るには:transform1transform2は互換性のないリターンを持っている場合

val foos = spark_session.read(foo_file) 
    .flatMap(toFooRecord) 
    .map(someComplicatedProcessing) 
    .rdd 
    .flatMap(x => Seq(("t1", transform1(x)), ("t2", transform2(x)))) 
    .distinct 
    .collect 
    .groupBy(_._1) 
    .mapValues(_.map(_._2)) 

val foos1 = foos("t1") 
val foos2 = foos("t2") 

2番目のオプションは、論争、いくつかの種類が必要な場合がありますタイプ。

関連する問題