2016-05-23 10 views
0
Scalaのコードの私の作品がどのように見える

代わりにスパークScalaの

val orgIncInactive = orgIncLatest.filter("(LD_TMST != '' and LD_TMST is not null)").select("ORG_ID").rdd 

orgIncInactive.collect.foreach(p => DenormalizedTablesMethodsUtil.hbaseTablePurge(p(0).toString, tableName, connection)) 

に()を収集使用しての他のアプローチは、私が収集し使用を避けることができますどのような方法は何があります()ここでは? 私はさまざまな可能性を試しましたが、私はSerializableエラーで終わっています。

ありがとうございました。

答えて

0

何をしようとしているのか、最終的にはシリアル化エラーの原因に依存します。ある種のデータベース接続を無名関数に渡そうとしているようです。それは一般にいくつかの理由で失敗するだろう。たとえ接続オブジェクト自体を直列化可能にしたとしても、オブジェクトをサブクラス化してSerializableを実装することで、データベース接続はドライバとエグゼキュータの間で共有することはできません。

代わりに、それぞれのエグゼキュータで接続オブジェクトを作成し、ドライバで定義されている接続オブジェクトの代わりにローカル接続オブジェクトを使用する必要があります。これを達成するにはいくつかの方法があります。

mapPartitionsを使用するとロジックを実行する前にオブジェクトをローカルでインスタンス化できます。詳細はhereを参照してください。

もう一つの可能​​性は、初期化時に接続オブジェクトをnullまたはNoneに設定するシングルトンオブジェクトを作成することです。次に、接続が初期化されたかどうかを調べるgetConnectionのようなオブジェクト内のメソッドを定義します。そうでない場合は、接続を初期化します。それからどちらかの方法で有効な接続が返されます。

初期化をパーティションごとに1回強制するのではなく、エグゼキュータごとに1回だけに制限するため、2番目のアプローチは最初よりも多く使用します。

関連する問題