2017-03-13 14 views
0

私はSparkのデータフレームに適用する必要のあるいくつかのフィルタを持っていますが、最初は実行時にどのフィルタを使用するかを知っています。現在、私は本当にそれが必要とされていない場合は、実行時に動的にFX _filter2を除外する方法を見つけることができません?私は、個々のfilter機能でそれらを追加していますが、filtesの一つはデータフレーム上の任意の数のフィルタ

myDataFrame 
    .filter(_filter1) 
    .filter(_filter2) 
    .filter(_filter3)... 

が定義されていない場合には失敗しましたか

var filter = _filter1 
if (_filter2 != null) 
    filter = filter.and(_filter2) 
... 

または私は見つかっていないスパークで、このための良好なパターンがあります:

は、私は1つの大きなフィルタを作成することによって、それを行うべきか?

答えて

1

一つの可能​​な解決策はlit(true)にすべてfiltersをデフォルトにある:

import org.apache.spark.sql.functions._ 

val df = Seq(1, 2, 3).toDF("x") 

val filter_1 = lit(true) 
val filter_2 = col("x") > 1 
val filter_3 = lit(true) 

val filtered = df.filter(filter_1).filter(filter_2).filter(filter_3) 

これはコードの中でnullを保持し、真の述語はプルーンになります実行計画からD:あなたは、もちろん、それはさらに簡単と述語のシーケンスにすることができ

filtered.explain 
== Physical Plan == 
*Project [value#1 AS x#3] 
+- *Filter (value#1 > 1) 
    +- LocalTableScan [value#1] 

import org.apache.spark.sql.Column 

val preds: Seq[Column] = Seq(lit(true), col("x") > 1, lit(true)) 
df.where(preds.foldLeft(lit(true))(_ and _)) 

と、右実装されている場合、完全にプレースホルダをスキップします。

0

なぜ:フィルタのリストを作成

また
var df = // load 
if (_filter2 != null) { 
    df = df.filter(_filter2) 
} 
etc 

、:

var df = // load 
val filters = Seq (filter1, filter2, filter3, ...) 
filters.filter(_ != null).foreach (x => df = df.filter(x)) 

//申し訳ありませんが、コード内のいくつか間違いがある場合、それはより多くのアイデアだ - 現在、私はできませんがまずテストコード

1

私はヌルフィルタを取り除くでしょう:

val filters:List[A => Boolean] = nullableFilters.filter(_!=null) 

そして、チェーンフィルタに関数を定義します。

def chainFilters[A](filters:List[A => Boolean])(v:A) = filters.forall(f => f(v)) 

今、あなたは単にあなたのDFにフィルタを適用することができます。

df.filter(chainFilters(nullableFilters.filter(_!=null)) 
関連する問題