2015-10-22 17 views
11

私はnullと空の文字列を同じ列に入れたSpark 1.5.0 DataFrameを持っています。私はすべての列のすべての空の文字列をnullNone、Python)に変換したいと思います。 DataFrameには何百もの列があるので、各列のハードコード化された操作を避けようとしています。空の文字列をDataFrameのNone/Null値に置き換えます

以下の試行を参照してください。エラーが発生します。

from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 

## Create a test DataFrame 
testDF = sqlContext.createDataFrame([Row(col1='foo', col2=1), Row(col1='', col2=2), Row(col1=None, col2='')]) 
testDF.show() 
## +----+----+ 
## |col1|col2| 
## +----+----+ 
## | foo| 1| 
## | | 2| 
## |null|null| 
## +----+----+ 

## Try to replace an empty string with None/null 
testDF.replace('', None).show() 
## ValueError: value should be a float, int, long, string, list, or tuple 

## A string value of null (obviously) doesn't work... 
testDF.replace('', 'null').na.drop(subset='col1').show() 
## +----+----+ 
## |col1|col2| 
## +----+----+ 
## | foo| 1| 
## |null| 2| 
## +----+----+ 
+0

@palsch、いいえ、リストを返しません。 DataFrameを返します。 Sparkのドキュメントへのリンクで質問を更新しました。 – dnlbrky

+2

@palsch一般的なPythonの質問ではありません! Spark DataFramesは、大きなデータに対して重いデータ分析を可能にするために一般的に使用される分散データ構造です。だからあなたは解決策には合っていません。 – eliasah

+1

@eliasah真実Pythonic 'lambda x:' 'udf'でラップされたx else''はうまく動くでしょう:) – zero323

答えて

15

それはこのように簡単です:

from pyspark.sql.functions import col, when 

def blank_as_null(x): 
    return when(col(x) != "", col(x)).otherwise(None) 

dfWithEmptyReplaced = testDF.withColumn("col1", blank_as_null("col1")) 

dfWithEmptyReplaced.show() 
## +----+----+ 
## |col1|col2| 
## +----+----+ 
## | foo| 1| 
## |null| 2| 
## |null|null| 
## +----+----+ 

dfWithEmptyReplaced.na.drop().show() 
## +----+----+ 
## |col1|col2| 
## +----+----+ 
## | foo| 1| 
## +----+----+ 

あなたが複数の列を埋めるためにしたい場合は、たとえば減らすことができます。

to_convert = set([...]) # Some set of columns 

reduce(lambda df, x: df.withColumn(x, blank_as_null(x)), to_convert, testDF) 

や理解を使用する:

exprs = [ 
    blank_as_null(x).alias(x) if x in to_convert else x for x in testDF.columns] 

testDF.select(*exprs) 

具体的には文字列フィールドのレートは、でthe answerを確認してください。

+0

Thanks @ zero323。多くの列を自動的かつ効率的に処理できるように答えを拡張できますか?おそらくすべての列名を列挙し、各列の答えと同じようなコードを生成し、コードを評価しますか? – dnlbrky

+0

できなかった理由はありません。 DataFramesは遅延評価され、残りは単なる標準のPythonです。編集にはいくつかのオプションがあります。 – zero323

+0

私はこの回答を受け入れますが、まずは@RobinLoxleyのビットを追加してください。あなたが気にしないなら、私はあなたの答えを編集することができます。 – dnlbrky

8

私のソリューションは、あなたが望む限り多くの分野に対処することができI'vは、これまで見て、すべてのソリューションよりもはるかに優れている、次のように少しの関数を参照してください。

// Replace empty Strings with null values 
    private def setEmptyToNull(df: DataFrame): DataFrame = { 
    val exprs = df.schema.map { f => 
     f.dataType match { 
     case StringType => when(length(col(f.name)) === 0, lit(null: String).cast(StringType)).otherwise(col(f.name)).as(f.name) 
     case _ => col(f.name) 
     } 
    } 

    df.select(exprs: _*) 
    } 

あなたは簡単に書き換えることができます上記のPythonの関数

私は単にzero323さんとsoulmachineの答えの上に追加@liancheng

6

からこのトリックを学びました。すべてのStringTypeフィールドを変換します。

from pyspark.sql.types import StringType 
string_fields = [] 
for i, f in enumerate(test_df.schema.fields): 
    if isinstance(f.dataType, StringType): 
     string_fields.append(f.name) 
関連する問題