2016-03-09 10 views
16

を持っている場合どのように私は与えられた列は、例えば、JSONスキーマ私はスパークSQLでJSONファイルからデータフレームを作成すると、火花データフレームが列

ため .select

例を呼び出す前に存在している場合どのように私が言うことができ、検出しません

potential_columns = Seq("b", "c", "d") 
df = sqlContext.read.json(filename) 
potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column")) 

が、私はhasColumnための良い機能を見つけることができません。

{ 
    "a": { 
    "b": 1, 
    "c": 2 
    } 
} 

は、これは私が何をしたいです。

scala> df.select("a.*").columns 
res17: Array[String] = Array(b, c) 

答えて

7

実際に、あなたもあなただけのデータフレームの上にそれを呼び出すことができ、列を使用するために選択呼び出す必要はありません:私が得ている最も近い列がこのややぎこちない配列であるかどうかをテストすることです自身

// define test data 
case class Test(a: Int, b: Int) 
val testList = List(Test(1,2), Test(3,4)) 
val testDF = sqlContext.createDataFrame(testList) 

// define the hasColumn function 
def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = df.columns.contains(colName) 

// then you can just use it on the DF with a given column name 
hasColumn(testDF, "a") // <-- true 
hasColumn(testDF, "c") // <-- false 

hasColumn方法が利用可能であるように、またあなたがCその後、あなたのデータフレームの上にポン引き私のライブラリパターンを使用して直接

implicit class DataFrameImprovements(df: org.apache.spark.sql.DataFrame) { 
    def hasColumn(colName: String) = df.columns.contains(colName) 
} 

を暗黙のクラスを定義することができます使用すること:

testDF.hasColumn("a") // <-- true 
testDF.hasColumn("c") // <-- false 
+4

ブール値を返していますネストされた列。 from json '{" a ":{" b ":1、" c ":0}}' – ben

2

このためのあなたの他のオプションは、df.columnsとあなたのpotential_columnsに(この場合はintersectで)いくつかの配列操作を行うことであろう。

// Loading some data (so you can just copy & paste right into spark-shell) 
case class Document(a: String, b: String, c: String) 
val df = sc.parallelize(Seq(Document("a", "b", "c")), 2).toDF 

// The columns we want to extract 
val potential_columns = Seq("b", "c", "d") 

// Get the intersect of the potential columns and the actual columns, 
// we turn the array of strings into column objects 
// Finally turn the result into a vararg (: _*) 
df.select(potential_columns.intersect(df.columns).map(df(_)): _*).show 

これは、上記の内部オブジェクトのシナリオではうまくいきません。そのためのスキーマを調べる必要があります。

私はあなたがより多くの仕事をしなければならないことは、一般的な作るために、あなたのpotential_columnsへの完全修飾列名

val potential_columns = Seq("a.b", "a.c", "a.d") 

// Our object model 
case class Document(a: String, b: String, c: String) 
case class Document2(a: Document, b: String, c: String) 

// And some data... 
val df = sc.parallelize(Seq(Document2(Document("a", "b", "c"), "c2")), 2).toDF 

// We go through each of the fields in the schema. 
// For StructTypes we return an array of parentName.fieldName 
// For everything else we return an array containing just the field name 
// We then flatten the complete list of field names 
// Then we intersect that with our potential_columns leaving us just a list of column we want 
// we turn the array of strings into column objects 
// Finally turn the result into a vararg (: _*) 
df.select(df.schema.map(a => a.dataType match { case s : org.apache.spark.sql.types.StructType => s.fieldNames.map(x => a.name + "." + x) case _ => Array(a.name) }).flatMap(x => x).intersect(potential_columns).map(df(_)) : _*).show 

これが唯一のレベルの深なっ変更するつもりです。

38

Tryが存在すると仮定して失敗させてください。無地でシンプルかつサポートし、任意のネスト:

import scala.util.Try 
import org.apache.spark.sql.DataFrame 

def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess 

val df = sqlContext.read.json(sc.parallelize(
    """{"foo": [{"bar": {"foobar": 3}}]}""" :: Nil)) 

hasColumn(df, "foobar") 
// Boolean = false 

hasColumn(df, "foo") 
// Boolean = true 

hasColumn(df, "foo.bar") 
// Boolean = true 

hasColumn(df, "foo.bar.foobar") 
// Boolean = true 

hasColumn(df, "foo.bar.foobaz") 
// Boolean = false 

それとももっと簡単:

val columns = Seq(
    "foobar", "foo", "foo.bar", "foo.bar.foobar", "foo.bar.foobaz") 

columns.flatMap(c => Try(df(c)).toOption) 
// Seq[org.apache.spark.sql.Column] = List(
// foo, foo.bar AS bar#12, foo.bar.foobar AS foobar#13) 

Pythonの同等:それはTry内の式を評価するよう

from pyspark.sql.utils import AnalysisException 
from pyspark.sql import Row 


def has_column(df, col): 
    try: 
     df[col] 
     return True 
    except AnalysisException: 
     return False 

df = sc.parallelize([Row(foo=[Row(bar=Row(foobar=3))])]).toDF() 

has_column(df, "foobar") 
## False 

has_column(df, "foo") 
## True 

has_column(df, "foo.bar") 
## True 

has_column(df, "foo.bar.foobar") 
## True 

has_column(df, "foo.bar.foobaz") 
## False 
+1

これは構造化フィールドでも機能します。 'contains'関数を使う解決法はありません! +1 –

+1

ありがとう、私はこの答えを受け入れるだろう! – sparker

1

Tryは最適ではありませんそれが決定を下す前に。大規模なデータセットの場合

Scalaで以下を使用します。

df.schema.fieldNames.contains("column_name") 
+0

ネストされたデータでは機能しません。 – user8371915

2

私が通常使用する別のオプションが

df.columns.contains("column-name-to-check") 

これは、これは動作しません

+2

はネストされた列では機能しません。 – Sindhu

関連する問題