2017-09-03 16 views
0

参照値の範囲を持つ別の列と1つの列の値を比較したい。pyspark列の値と別の列の値の比較

私は、次のコードを使用して試してみました:私たちは、次のようなデータフレームを作成する場合

from pyspark.sql.functions import udf, size 
from pyspark.sql.types import * 
df1 = sc.parallelize([([1], [1, 2, 3]), ([2], [4, 5, 6,7])]).toDF(["value", "Reference_value"]) 
intersect = lambda type: (udf(
    lambda x, y: (
     list(set(x) & set(y)) if x is not None and y is not None else None), 
    ArrayType(type))) 
integer_intersect = intersect(IntegerType()) 

# df1.select(
#  integer_intersect("value", "Reference_value"), 
#  size(integer_intersect("value", "Reference_value"))).show() 
df1=df1.where(size(integer_intersect("value", "Reference_value")) > 0) 
df1.show() 

上記のコードは動作しますが:値とrefernce_value列がlong_type とarray_typeにはあるので、

しかし、私は読んでいた場合csvでデータフレームを作成すると、配列型に変換できません。ここでDF1は、私が「Reference_value」欄で「値」列を比較すると、1つのデータフレームは、値列がリファレンスのセット内にない場合は、行をフィルタリングすることである二つの新しいデータフレームを導出したいCSV

df1 is as follows df1= 

category value Reference value 

count   1  1 
n_timer  n20  n40,n20 
frames   54  56 
timer   n8  n3,n6,n7 
pdf   FALSE  TRUE 
zip   FALSE  FALSE 

から読み込まれます値。

出力DF2 =

category  value  Reference value 

    count   1  1 
    n_timer  n20  n40,n20 
    zip   FALSE  FALSE 

出力DF3 =

category value Reference value 

frames   54  56 
timer   n8  n3,n6,n7 
pdf   FALSE  TRUE 

array_containsような任意の簡単な方法があります。私はArray_containsを試してみましたが、動作していませんでした。

from pyspark.sql.functions import array_contains 
df.where(array_contains("Reference_value", df1["vale"])) 

答えて

-2
#One can copy paste the below code for direct input and outputs 

from pyspark import SparkContext 
from pyspark.sql import SQLContext 
from pyspark.sql import Row 
from pyspark.sql.functions import udf, size 
from pyspark.sql.types import * 
from pyspark.sql.functions import split 
sc = SparkContext.getOrCreate() 
sqlContext = SQLContext.getOrCreate(sc) 
df1 = sc.parallelize([("count","1","1"), ("n_timer","n20","n40,n20"), ("frames","54","56"),("timer","n8","n3,n6,n7"),("pdf","FALSE","TRUE"),("zip","FALSE","FALSE")]).toDF(["category", "value","Reference_value"]) 
print(df1.show()) 
df1=df1.withColumn("Reference_value", split("Reference_value", ",\s*").cast("array<string>")) 
df1=df1.withColumn("value", split("value", ",\s*").cast("array<string>")) 
intersect = lambda type: (udf(
    lambda x, y: (
     list(set(x) & set(y)) if x is not None and y is not None else None), 
    ArrayType(type))) 
string_intersect = intersect(StringType()) 
df2=df1.where(size(string_intersect("value", "Reference_value")) > 0) 
df3=df1.where(size(string_intersect("value", "Reference_value")) <= 0) 
print(df2.show()) 
print(df3.show()) 

input df1= 
+--------+-----+---------------+ 
|category|value|Reference_value| 
+--------+-----+---------------+ 
| count| 1|    1| 
| n_timer| n20|  n40,n20| 
| frames| 54|    56| 
| timer| n8|  n3,n6,n7| 
|  pdf|FALSE|   TRUE| 
|  zip|FALSE|   FALSE| 
+--------+-----+---------------+ 

df2= 
+--------+-------+---------------+ 
|category| value|Reference_value| 
+--------+-------+---------------+ 
| count| [1]|   [1]| 
| n_timer| [n20]|  [n40, n20]| 
|  zip|[FALSE]|  [FALSE]| 
+--------+-------+---------------+ 

df3= 
+--------+-------+---------------+ 
|category| value|Reference_value| 
+--------+-------+---------------+ 
| frames| [54]|   [56]| 
| timer| [n8]| [n3, n6, n7]| 
|  pdf|[FALSE]|   [TRUE]| 
+--------+-------+---------------+ 
関連する問題