2016-10-18 18 views
5

をarray-(データフレーム)の配列を爆発:私はこのようなデータフレーム持ってpySpark

+-----+--------------------+ 
|index|    merged| 
+-----+--------------------+ 
| 0|[[2.5, 2.4], [3.5...| 
| 1|[[-1.0, -1.0], [-...| 
| 2|[[-1.0, -1.0], [-...| 
| 3|[[0.0, 0.0], [0.5...| 
| 4|[[0.5, 0.5], [1.0...| 
| 5|[[0.5, 0.5], [1.0...| 
| 6|[[-1.0, -1.0], [0...| 
| 7|[[0.0, 0.0], [0.5...| 
| 8|[[0.5, 0.5], [1.0...| 
+-----+--------------------+ 

を私は

+-----+-------+-------+ 
|index|Column1|Column2| 
+-----+-------+-------+ 
| 0| 2.5| 2.4 | 
| 1| 3.5| 0.5| 
| 2| -1.0| -1.0| 
| 3| -1.0| -1.0| 
| 4| 0.0 | 0.0 | 
| 5| 0.5| 0.74| 
+-----+-------+-------+ 

各タプルにマージされた列を爆発したい[[2.5、2.4] 、[3.5,0,5]]は2列を返します.2,5と3,5が列1に格納され、(2.4,0,5)が2列目に格納されることがわかります

df= df.withColumn("merged", df["merged"].cast("array<array<float>>")) 
df= df.withColumn("merged",explode('merged')) 

、私は別のDF

を作成するためのUDFを適用しますが、私は、データをキャストまたは爆発適用することはできません、と私は

pyspark.sql.utils.AnalysisException: u"cannot resolve 'cast(merged as array<array<float>)' due to data type mismatch: cannot cast StringType to ArrayType(StringType,true) 

エラーを受け取った私はまた

df= df.withColumn("merged", df["merged"].cast("array<string>")) 
を試してみました

でも何も動作しません キャストなしで爆発を適用すると、

+0

あなたはDFのスキーマを与えることができますか?マージされたように見えるのは、実際には文字列であり、引数ではありません。 'split'を使って文字列を区切り文字で分割することができます。また、あなたの質問にはタイプミスがあるようです:予期した結果の例では、分解された値のインデックスが同じではありませんか?それともあなたが本当に欲しいものを与えたのですか? – Wilmerton

+0

Thx、私は自分のコードを読んで、私は私のラムダ関数(私の列をマージする人)で戻り値の型ArrayType(ArrayType(FloatType())を追加するのを忘れていたことを発見しました – MrGildarts

+0

...問題は解決しましたか? – Wilmerton

答えて

0

あなたは、次のコードを試みることができる

pyspark.sql.utils.AnalysisException: u"cannot resolve 'explode(merged)' due to data type mismatch: input to function explode should be array or map type, not StringType; 

from pyspark import SparkConf, SparkContext       
from pyspark.sql import SparkSession        

from pyspark.sql.types import FloatType, StringType, IntegerType 
from pyspark.sql.functions import udf, col       


def col1_calc(merged):            
    return merged[0][0]            

def col2_calc(merged):            
    return merged[0][1]            

if __name__ == '__main__':           
    spark = SparkSession \           
     .builder \             
     .appName("Python Spark SQL Hive integration example") \  
     .getOrCreate()            

    df = spark.createDataFrame([         
     (0, [[2.5,2.4],[3.5]]),          
     (1, [[-1.0,-1.0],[3.5]]),         
     (2, [[-1.0,-1.0],[3.5]]),         
    ], ["index", "merged"])           

    df.show()              

    column1_calc = udf(col1_calc, FloatType())      
    df = df.withColumn('Column1', column1_calc(df['merged']))  
    column2_calc = udf(col2_calc, FloatType())      
    df = df.withColumn('Column2', column2_calc(df['merged']))  

    df = df.select(['Column1', 'Column2', 'index'])     
    df.show()   

出力:

+-------+-------+-----+ 
|Column1|Column2|index| 
+-------+-------+-----+ 
| 2.5| 2.4| 0| 
| -1.0| -1.0| 1| 
| -1.0| -1.0| 2| 
+-------+-------+-----+ 
関連する問題