Scala Sparkフィルタにはテスターを使用し、テスターはjavaのPredicateインターフェースを実装し、引数で特定のクラス名を受け取る必要があります。 私はこのSpark Scala Serializableオブジェクトを動的に作成する
val tester = Class.forName(qualifiedName).newInstance().asInstanceOf[Predicate[T]]
var filtered = rdd.filter(elem => tester.test(elem))
のようなものをやっている問題は、私の特定の述語クラスがシリアライズではないので、実行時に、私はスパーク「TaskNotSerializable例外」を持っているということです。
私は
val tester = Class.forName(qualifiedName).newInstance()
.asInstanceOf[Predicate[T] with Serializable]
var filtered = rdd.filter(elem => tester.test(elem))
をすれば、私は同じエラーを取得します。 私はrdd.filterにテスターを作成する場合はそれが動作を呼び出す:
var filtered = rdd.filter { elem =>
val tester = Class.forName(qualifiedName).newInstance()
.asInstanceOf[Predicate[T] with Serializable]
tester.test(elem)
}
しかし、私はテストのために(多分放送するために)単一のオブジェクトを作成します。どうすれば解決できますか?
ああ、ありがとうございます!しかし、私は2つのエラーを受け取ります: 'デフォルトの引数を含むアプリケーションでTエラーが発生しました。 ' ' mapPartitionsのメソッドが十分ではありません:(暗黙的な証拠$ 6:scala.reflect.ClassTag [T])org。 apache.spark.rdd.RDD [T]。不特定の値パラメータの証拠$ 6。 preservePartitions booleanを(falseとして)追加しましたが、何が尋ねられているのかわかりません(証拠がAPIで文書化されていません)。 – Andrean
いくつかのスパーク操作では暗黙の 'ClassTag'パラメータが必要です。 'ClassTag'の説明については、http://docs.scala-lang.org/overviews/reflection/typetags-manifests.htmlを参照するか、詳細な情報を検索してください。ここで、 'T'があなたのメソッド/クラスの型パラメータであると仮定すると、それに':ClassTag'を追加するだけでよい(つまり、このコードが 'def foo [T](...)'メソッドであれば、それは 'def foo [T:ClassTag]'である)。同様にコンパイルエラーを発生させる呼び出し元をコンパイルして更新します。 –
mmhどのような関数foo [T] ..する必要がありますか?私はフィルタリングするjava.util.function.Predicateからtest(T)メソッドを使用して、フィルタリングされた要素もJavaクラスですので、どこでclassTagマーカを配置するのか分かりません – Andrean