2016-09-17 17 views
5

によってpysparkのデータフレームにアクセスする方法(df.limitによると同様に(1)。私は新しいデータフレームにデータフレームの最初の行を得ることができます)。pyspark、スパーク:最後の行を選択する方法も最後の行を取得する方法</p> <pre><code>name age city abc 20 A def 30 B </code></pre> <p>ようpysparkのSQLのデータフレームからインデックス

index.like行番号でデータフレーム行にアクセスするにはどうすればよいですか。 12または200である。パンダで

私は

df.tail(1) # for last row 
df.ix[rowno or index] # by index 
df.loc[] or by df.iloc[] 

私は、このような方法または代替の方法でpysparkのデータフレームにアクセスする方法を単に好奇心行うことができます。最後の行を取得する方法

おかげ

答えて

3

ロングとすべての列がoderableていることを前提として醜い方法:

from pyspark.sql.functions import (
    col, max as max_, struct, monotonically_increasing_id 
) 

last_row = (df 
    .withColumn("_id", monotonically_increasing_id()) 
    .select(max(struct("_id", *df.columns)) 
    .alias("tmp")).select(col("tmp.*")) 
    .drop("_id")) 

すべての列が順番することができない場合は、あなたが試すことができます:

with_id = df.withColumn("_id", monotonically_increasing_id()) 
i = with_id.select(max_("_id")).first()[0] 

with_id.where(col("_id") == i).drop("_id") 

注意を。 がpyspark.sql.functions/`o.a.s.sql.functionsにありますが、description of the corresponding expressionsを考えるとここでは良い選択ではありません。

は、どのように私はあなたがすることはできませんindex.like

により、データフレームの行にアクセスすることができます。スパークDataFrameで、インデックスでアクセスできます。 You can add indices using zipWithIndexとフィルタを後で実行します。これを覚えておいてください。O(N)操作。

+0

こんにちはを使用して作成することができます行をautoincrement IDカラムで追加するか、または小さいdfのために、私はtoPandas()。tail(1)を使用していました。とにかくお返事いただきありがとうございます。私が尋ねたデータフレームの索引アクセスは、時には列の値を(col値の等価条件によって)置き換えなければならない場合があり、そのために私はudfの助けを借りて行っていました。しかし、1つのインスタンス(特定のインデックス番号行)だけを置き換えたい場合は、それを行う方法がありませんでした。今私は "zipWithIndex"を使うことができます。ありがとう。 – Satya

0

は単調ないどのようmonotonically_increasing_id()作品です連続する整数、、ユニークな、増加含まインデックス列を取得するには、以下を使用します。インデックスは、DataFrameのcolNameと同じオーダーで昇順になります。

import pyspark.sql.functions as F 
from pyspark.sql.window import Window as W 

window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow) 

df = df\ 
.withColumn('int', F.lit(1))\ 
.withColumn('index', F.sum('int').over(window))\ 
.drop('int')\ 

尾を見て、次のコードを使用し、またはデータフレームの最後rownums

rownums = 10 
df.where(F.col('index')>df.count()-rownums).show() 

end_row DATAFRAMEへstart_rowから行を見て、次のコードを使用します。

start_row = 20 
end_row = start_row + 10 
df.where((F.col('index')>start_row) & (F.col('index')<end_row)).show() 

zipWithIndex()は、ユニークな単調に戻る増加んRDD方式で、連続した整数が、あなたが戻って、id列に改正され、元のデータフレームを取得することができる方法で実装するのはるかに遅いように見えます。

2

最後の行を取得する方法。

あなたが最後のレコードを取得する一つの簡単な方法は、SQL使用され、例えばあなたがデータフレームを注文するために使用できる列、「インデックス」をお持ちの場合: 1)を降順と 2で、あなたのテーブルを注文)このため

df.createOrReplaceTempView("table_df") 
query_latest_rec = """SELECT * FROM table_df ORDER BY index DESC limit 1""" 
latest_rec = self.sqlContext.sql(query_latest_rec) 
latest_rec.show() 

そして、どのように私はノーindex.like行によってデータフレームの行にアクセスすることができますから、第一の値を取ります。 12または200である。

同様の方法で、あなたは、任意の行のレコードを取得することができます

row_number = 12 
df.createOrReplaceTempView("table_df") 
query_latest_rec = """SELECT * FROM (select * from table_df ORDER BY index ASC limit {0}) ord_lim ORDER BY index DESC limit 1""" 
latest_rec = self.sqlContext.sql(query_latest_rec.format(row_number)) 
latest_rec.show() 

あなたが持っていない場合は、「インデックス」欄には、あなたは現在、私は最後の取り扱い午前、

from pyspark.sql.functions import monotonically_increasing_id 

df = df.withColumn("index", monotonically_increasing_id()) 
+0

よく説明してくれてありがとうございました。今のところ新しいアプローチです – Satya

+0

'monotonically_increasing_id()' [documentation](http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.monotonically_increasing_id)*** "現在のインプリメンテーションでは、パーティションIDが上位31ビットに、各パーティション内のレコード番号が下位33ビットに格納されます。」***したがって、異なるパーティションにまたがって格納される大きなDataFramesについて考えると、これは機能しません。すべてが1つのパーティションに含まれていない限り、DataFrameの最後の行を参照することはできません。 – Clay

+0

@Clay最後の部分はより補足的でした。しかし、大規模なDataFrameが本当に巨大である、つまり 'monotonically_increasing_id()'の想定に適合しない場合、** "データフレームのパーティション数は10億未満で、各パーティションのレコード数は80億未満です**"代わりに 'sql' ROW_NUMBER()OVER(PARTITION BY xxx ORDER BY yyy) 'を使用してください。 –

関連する問題

 関連する問題