2017-04-08 20 views
0

Scala Sparkフィルタにはテスターを使用し、テスターはjavaのPredicateインターフェースを実装し、引数で特定のクラス名を受け取る必要があります。 私はこのSpark Scala Serializableオブジェクトを動的に作成する

val tester = Class.forName(qualifiedName).newInstance().asInstanceOf[Predicate[T]] 
var filtered = rdd.filter(elem => tester.test(elem)) 

のようなものをやっている問題は、私の特定の述語クラスがシリアライズではないので、実行時に、私はスパーク「TaskNotSerializable例外」を持っているということです。

私は

val tester = Class.forName(qualifiedName).newInstance() 
      .asInstanceOf[Predicate[T] with Serializable] 
var filtered = rdd.filter(elem => tester.test(elem)) 

をすれば、私は同じエラーを取得します。 私はrdd.filterにテスターを作成する場合はそれが動作を呼び出す:

var filtered = rdd.filter { elem => 
    val tester = Class.forName(qualifiedName).newInstance() 
      .asInstanceOf[Predicate[T] with Serializable] 
    tester.test(elem) 
} 

しかし、私はテストのために(多分放送するために)単一のオブジェクトを作成します。どうすれば解決できますか?

答えて

0

クラスの実装には、Serializableが必要です。 asInstanceOf[Predicate[T] with Serializable]キャストは嘘です:実際に値がSerializableであることを確認していないので、キャスト中に2番目のケースでエラーが発生せず、最後のケースでは「成功」する理由があります。

しかし、テストのために1つのオブジェクト(ブロードキャストする可能性があります)を作成します。

できません。ブロードキャストであるかどうかにかかわらず、デシリアライゼーションは、という新しいワーカーノード上のオブジェクトを作成します。しかし、あなたは、各パーティションにのみ単一のインスタンスを作成することができます。

var filtered = rdd.mapPartitions { iter => 
    val tester = Class.forName(qualifiedName).newInstance() 
      .asInstanceOf[Predicate[T]] 
    iter.filter(tester.test) 
} 

をそれは実際にそれが厳密に小さい仕事だから、それを送信し、それを考えデシリアライズ、testerをシリアル化するよりもパフォーマンスが向上します。

+0

ああ、ありがとうございます!しかし、私は2つのエラーを受け取ります: 'デフォルトの引数を含むアプリケーションでTエラーが発生しました。 ' ' mapPartitionsのメソッドが十分ではありません:(暗黙的な証拠$ 6:scala.reflect.ClassTag [T])org。 apache.spark.rdd.RDD [T]。不特定の値パラメータの証拠$ 6。 preservePartitions booleanを(falseとして)追加しましたが、何が尋ねられているのかわかりません(証拠がAPIで文書化されていません)。 – Andrean

+0

いくつかのスパーク操作では暗黙の 'ClassTag'パラメータが必要です。 'ClassTag'の説明については、http://docs.scala-lang.org/overviews/reflection/typetags-manifests.htmlを参照するか、詳細な情報を検索してください。ここで、 'T'があなたのメソッド/クラスの型パラメータであると仮定すると、それに':ClassTag'を追加するだけでよい(つまり、このコードが 'def foo [T](...)'メソッドであれば、それは 'def foo [T:ClassTag]'である)。同様にコンパイルエラーを発生させる呼び出し元をコンパイルして更新します。 –

+0

mmhどのような関数foo [T] ..する必要がありますか?私はフィルタリングするjava.util.function.Predicateからtest(T)メソッドを使用して、フィルタリングされた要素もJavaクラスですので、どこでclassTagマーカを配置するのか分かりません – Andrean

関連する問題