2017-04-10 12 views
0

I'amの数に基づいて、私はこのようなテーブルを有する:カラムに基づいPyspark:フィルタリングDATAFRAME pysparkを使用して行ごとにnull値

id | ClientNum | Value |  Date  | Age | Country | Job 
1 |  19  | A | 1483695000 | 21 | null | null 
2 |  19  | A | 1483696500 | 21 | France | null 
3 |  19  | A | 1483697800 | 21 | France | Engineer 
4 |  19  | B | 1483699000 | 21 | null | null 
5 |  19  | B | 1483699500 | 21 | France | null 
6 |  19  | B | 1483699800 | 21 | France | Engineer 
7 |  24  | C | 1483699200 | null | null | null 
8 |  24  | D | 1483699560 | 28 | Spain | null 
9 |  24  | D | 1483699840 | 28 | Spain | Student 

、iは異なる各ClientNumために残しておきたいですほとんどの情報(Age、Country、Job)が指定されている値。

ClientNum | Value |  Date  | Age | Country | Job 
     19  | A | 1483697800 | 21 | France | Engineer 
     19  | B | 1483699800 | 21 | France | Engineer 
     24  | C | 1483699200 | null | null | null 
     24  | D | 1483699840 | 28 | Spain | Student 

ありがとう:

結果はこのようなものになるはずです!

+0

を[この回答を試してみてください](http://stackoverflow.com/questions/38649793/how-to-get-distinct-rows-in-dataframe-using -pyspark)と[これも参照してください](http://stackoverflow.com/questions/39287729/filterrows-by-distinct-values-in-one-column-in-pyspark) – ARr0w

+0

できませんdf.distinct()またはdf.drop_duplicates()を使用すると、すべての行が私のexemple上で区別されます。私は明確な値だけを保持したい。 – Omar14

+0

これはこれらの回答についてです。あなたが保持したい明確な価値を得るために。 – ARr0w

答えて

1

はここで行あたりの非NULL値の数を計算し、その後Window機能を使用してデータをフィルタリングするudfを使用してのアプローチです:

ましょう最初のudfを定義しますこれは引数としてカラムのarrayをとり、結果としてnullでない値の数を返します。

df = df.withColumn("counter", nullcounter_udf(array(df.columns))) 

は、今、私たちはClientNumValueしてデータを分割し、最高counter値で行を保つことができます:

from pyspark.sql.window import Window 
from pyspark.sql.functions import rank, col 

window = Window.partitionBy(df['ClientNum'], df['Value']).orderBy(df['counter'].desc()) 

df.select('*', rank().over(window).alias('rank')) \ 
    .filter(col('rank') == 1) \ 
    .sort('Value') \ 
    .show() 
+---+---------+-----+----------+----+-------+--------+-------+----+ 
| id|ClientNum|Value|  Date| Age|Country|  Job|counter|rank| 
+---+---------+-----+----------+----+-------+--------+-------+----+ 
| 3|  19| A|1483697800| 21| France|Engineer|  8| 1| 
| 6|  19| B|1483699800| 21| France|Engineer|  8| 1| 
| 7|  24| C|1483699200|null| null| null|  5| 1| 
| 9|  24| D|1483699840| 28| Spain| Student|  8| 1| 
+---+---------+-----+----------+----+-------+--------+-------+----+ 

from pyspark.sql.functions import array 

def nullcounter(arr): 

    res = [x for x in arr if x != None] 
    return(len(res)) 

nullcounter_udf = udf(nullcounter) 

のは、あなたのデータにこの列を追加してみましょう

データ

df = sc.parallelize([(1, 19, "A", 1483695000, 21, None, None), 
(2, 19, "A", 1483696500, 21, "France", None), 
(3, 19, "A", 1483697800, 21, "France", "Engineer"), 
(4, 19, "B", 1483699000, 21, None, None), 
(5, 19, "B", 1483699500, 21, "France", None), 
(6, 19, "B", 1483699800, 21, "France", "Engineer"), 
(7, 24, "C", 1483699200, None, None, None), 
(8, 24, "D", 1483699560, 28, "Spain", None), 
(9, 24, "D", 1483699840, 28, "Spain", "Student")]).toDF(["id","ClientNum","Value","Date","Age", "Country", "Job"]) 
+1

ありがとうが、配列のすべての列は、私が思うタイプを持っている必要があります。 データ型の不一致による:関数配列への入力はすべて同じ型でなければなりません。 – Omar14

+0

それはあなたの値を文字列に変換しますが、null以外の値の長さを計算するための中間ステップとしてのみ使用するため、ユースケースにとっては重要ではありません。どのようなスパークバージョンがありますか? – mtoto

+0

私は@ Omar14(pyspark 2.2.0)によって記述された同じ問題を持っています。 –

0

はこれを試してみてください:

val df = Your_data_frame.registerTempTable("allData") // register your dataframe as a temp table 

// we are finding max of date for each clientNum and value and join back to the original table. 

    sqlContext.sql("select a.ClientNum, a.Value, a.Date, a.Age, a.Country, a.Job from allData a 
    join 
    (select ClientNum, Value, max(Date) as max_date from allData group by ClientNum, Value) b 
    on a.ClientNum = b.ClientNum and a.Value = b.Value and a.Date = b.max_date").show 
0

私のように、あなたは他の回答とのトラブルがあったが、ここではUDF(火花2.2.0)を使用してPythonで私の解決策である場合:

はのはダミーのデータセットを作成してみましょう:

llist = [(1, 'alice', 'some_field', 'some_field', 'some_field', None), (30, 'bob', 'some_field', None, None, 10), (3, 'charles', 'some_field', None, 'some_other_field', 1111)] 
df = sqlContext.createDataFrame(llist, ['id', 'name','field1','field2', 'field3', 'field4']) 

df.show() 

+---+-------+----------+----------+----------------+------+ 
| id| name| field1| field2|   field3|field4| 
+---+-------+----------+----------+----------------+------+ 
| 1| alice|some_field|some_field|  some_field| null| 
| 30| bob|some_field|  null|   null| 10| 
| 3|charles|some_field|  null|some_other_field| 1111| 
+---+-------+----------+----------+----------------+------+ 

さんはNone値をカウントするため、当社のUDFを定義してみましょう:

from pyspark.sql.types import IntegerType 
from pyspark.sql.functions import struct, udf 

count_empty_columns = udf(
         lambda row: len([x for x in row if x is None]), 
         IntegerType() 
        ) 

私たちは、そのUDFに基づいて新しい列null_countを追加することができます。

df = df.withColumn('null_count', 
     count_empty_columns(struct([df[x] for x in df.columns]))) 

df.show() 

+---+-------+----------+----------+----------------+------+----------+ 
| id| name| field1| field2|   field3|field4|null_count| 
+---+-------+----------+----------+----------------+------+----------+ 
| 1| alice|some_field|some_field|  some_field| null|   1| 
| 30| bob|some_field|  null|   null| 10|   2| 
| 3|charles|some_field|  null|some_other_field| 1111|   1| 
+---+-------+----------+----------+----------------+------+----------+ 

そして最後にフィルタリング:

df = df.filter(df['null_count'] <= 1) 
関連する問題