0

私が扱っているデータには、スキーマの変更がありました。古いデータと新しいデータを組み合わせた結果のdataFrameについては、以前のデータに存在しないように、&のフィルタを変換したい列を変換したいと思います。それは "ヌル"によって占められていません。可能であれば、いつでもカラムが終了するたびに&フィルタを変換したいのですが、&フィルタを変換したいと思います。そのような列のない以前のデータの場合は、すべての行を保持します。spark:スキーマの変更 - データフレームが終了した場合、そのデータフレームを変換してフィルタリングします。そうでない場合do not do

以前のデータに「ip」列がないため、次のコードの結果はjava.lang.NullPointerExceptionになります。

val filteredData = sqlContext.sql(
s"SELECT $fieldsString FROM data $filterTerm") 
.withColumn("ip",firstIp($"ip")) 
.filter("`ip` not in ('30.90.30.90', '70.80.70.80')") 
.filter("`ip` not like '10.%'") 

上記の「firstIp」関数は、配列から最初のIPアドレスを取得する単純なudfです。それはval firstIp = udf[String, String](_.split(",")(0))によって定義されます。私はスキーマでデータを2つの部分に分割したくないです - "ip"列のあるものとないもの...しかし、データを分割することなく達成できるのですか?

答えて

0

この回答は、異なるスキーマでfilterを使用することを求めていた以前のバージョンの質問に対応しています。質問が完全にに変更されたので、実際には意味をなさない。

列が存在する場合、あなたは、単にチェックすることができ:bluenote10 @

// let's create some test data 
case class SchemaA(host: String) 
case class SchemaB(host: String, ip: String) 

val testDataA = sc.parallelize(Seq(
    SchemaA("localhost"), 
    SchemaA("other") 
)).toDF() 
val testDataB = sc.parallelize(Seq(
    SchemaB("localhost", "127.0.0.1"), 
    SchemaB("other", "192.168.0.1") 
)).toDF() 

def doSomething(df: DataFrame) { 
    val filtered = if (df.columns.contains("ip")) { 
    df.filter("ip in ('127.0.0.1')") 
    } else { 
    df 
    } 
    // do whatever you want after filtering... 
    filtered.select($"host").show() 
} 

doSomething(testDataA) 
doSomething(testDataB) 
+0

感謝を!私は私の質問を編集しました。十分にはっきりしていなかった。おそらく誤解を招いている可能性があります。 – MichM

+0

要点は、古いログと新しいログを1つのデータフレームにまとめたものです。ログのスキーマは異なります。したがって、古いログにはないカラムには、 "null"が設定されます。おそらくwithColumn(udf)がjava.lang.NullPointerExceptionを引き起こしていると思います。またはおそらくフィルタリングはです。 – MichM

+0

@MichM:それで、質問するときに最小限の例を用意することは良い考えです。あなたの質問は、実際には欠けている値を扱う問題であり、異なるスキーマを扱う方法ではありません。 – bluenote10

関連する問題