2016-08-25 19 views
0

SparkContextとHiveで大きなデータセットを読み込んでいるとします。このデータセットはSparkクラスタに分散されます。たとえば、何千もの変数の観測値(値+タイムスタンプ)。は、Dataframe.toPandasは常にドライバノードまたはワーカーノードにありますか?

これで、データを整理/分析するためにいくつかのmap/reduceメソッドまたは集計を使用しました。たとえば、変数名でグループ化します。

一度グループ化すると、各変数のすべての観測(値)をtimeseries Dataframeとして取得できます。あなたは今DataFrame.toPandas

def myFunction(data_frame): 
    data_frame.toPandas() 

df = sc.load.... 
df.groupBy('var_name').mapValues(_.toDF).map(myFunction) 
  1. を使用している場合、これは各 ワーカーノード上で(可変あたり)パンダDATAFRAMEに変換、または
  2. は、ドライバのノード上で常にパンダデータフレームであり、データがゆえですワーカーノードからドライバに転送されますか?

答えて

4

このコンテキストでは、Pandas DataFrameに特別なことはありません。

  • DataFramepyspark.sql.dataframe.DataFramethis collects data and creates local Python object on the drivertoPandas方法を使用して作成された場合。
  • 実行者プロセス(for example in mapPartitions)内にpandas.core.frame.DataFrameが作成された場合は、単にRDD[pandas.core.frame.DataFrame]となります。パンダのオブジェクトには区別がなく、tupleとしましょう。
  • 最後に、例の擬似コードは、executorスレッド内でという意味のSpark DataFrame(これはあなたが意味するものと仮定しています)を作成することはできません。
+1

したがって、マップ機能内でPandas Dataframe APIを使用すると、ワーカーノードでもより多くのコンボ敵のメソッドを使用できます。たとえば、マップステップでその情報だけを分析して結果を返すなどです。 – Matthias

+0

はい、それはSparkRの 'dapply'と同様に可能です。望ましいパフォーマンスを得ることは難しいかもしれませんが、リソースの割り当てと並列性のバランスをとる必要があります。 – zero323

+0

あなたは私のヒーローです。私はまだ初心者ですが、改善しています。多分あなたは[その1つで]私を助けることができます(http://stackoverflow.com/questions/39155954/how-to-do-a-nested-for-each-loop-with-pyspark)。 – Matthias

関連する問題