2016-11-16 14 views
0

私は2つのデータフレーム(tx_dfとlogin_df)を持っています。 最初の列にはplayer_id、tx_id、およびtx_timeの列があり、2番目の列にはplayer_idとlogin_timeがあります。PySparkが最も近い時間値で2つのデータフレームを結合する

私がしたいのは、player_idカラムを使用して2つのデータフレームを結合することですが、それに加えて、login_dfからの最新のログインローだけを結合することです。 このようなtx_dfがある場合たとえば、このような

pid_1, txid_1, '2016-11-16 00:01:00' 
pid_1, txid_2, '2016-11-16 00:01:02' 
pid_1, txid_3, '2016-11-16 00:02:15' 
pid_1, txid_4, '2016-11-16 00:02:16' 
pid_1, txid_5, '2016-11-16 00:02:17' 

とlogin_df:

pid_1, '2016-11-16 00:02:10' 
pid_1, '2016-11-16 00:00:55' 
pid_1, '2016-11-13 00:03:00' 
pid_1, '2016-11-10 16:30:00' 

私はこのように見えるために、データフレームを結果たい:

pid_1, txid_1, '2016-11-16 00:01:00', pid_1, '2016-11-16 00:00:55' 
pid_1, txid_2, '2016-11-16 00:01:02', pid_1, '2016-11-16 00:00:55' 
pid_1, txid_3, '2016-11-16 00:02:15', pid_1, '2016-11-16 00:02:10' 
pid_1, txid_4, '2016-11-16 00:02:16', pid_1, '2016-11-16 00:02:10' 
pid_1, txid_5, '2016-11-16 00:02:17', pid_1, '2016-11-16 00:02:10' 

が、私はないですデータフレームへの義務的なバインディングなので、RDDや他の方法を使ってうまくいく方法を理解することができます。

tx_dfにはすべてのプレーヤーID(さらには数千のプレーヤーID)のトランザクションエントリが数千件あり、login_dfには未知数のプレイヤーログイン情報が含まれている可能性があるため、データが爆発することがあります。 player_idでこれら2つを結合するだけで、受け入れられないデカルト積の結果として膨大なデータフレームが作成されます。

注:私はPython API for Sparkを使用しています。

答えて

0

今後の参考として、これを少しずつ異なる方法で解決することができました。 私は、第2のデータフレームがそれを放送するのに十分小さいほど十分に幸運でした。より正確には、私は値のハッシュマップを放送しましたが、それはちょうどそれが目的のためにうまく合っていることが分かったからです。 (参照:broadcast variables in Spark

をその後、私はこの

tx_df.rdd.map(my_map_function) 

ような最初のデータフレームの行を反復処理し、ソートやその他の操作、そして最後にI値た選ん必要なかっmy_map_functionでIは、放送hasmapアクセス最初のデータフレームの行に追加します。

値のハッシュマップをブロードキャストすることの良い副作用として、データフレームの結合を削除して処理を高速化できました。 これを行う前に、スクリプトは、

  • は、このブロードキャスト・ソリューションの後ビッグデータフレームの必要のない行
  • をフィルタリング大きなもの

  • にデータフレームを結合したデータフレームに

    • データをロードしていましたスクリプトは

      • データフレームにデータをロード
      • 適切な値が既にあるので、直接第1の値にアクセスし、

      フィルタリング第二のアプローチで必要とされない現在の行にそれらを追加し、最初のものだけを反復二番目

    • のの
    • 放送値スクリプトの実行が速くなるようにピックアップしました。

  • 関連する問題