2016-11-27 2 views
2

内カサンドラに保存する:クエリと私はスパーク/カサンドラ環境で次の操作を実行するための要件を持っているのforeachスパーク機能

  • は、私がやった別のテーブル2を(フィルタリングするための基準が含まれている表1に、最新の行を読みますそれはデータフレームを使用しています)、これまではうまくいきました。
  • table2の各フィルタ処理された行について、その行の状態を確認するためにtable3を照会する必要があります。
  • 前の手順で状態を取得した後、データを他の3つのテーブルに保存する必要があります。

問題は、私はので、私は、Javaスパークコンテキストを渡す必要がありますが、残念ながら、それは直列化可能ではありません(here関連の質問を参照)を意味し、カサンドラのテーブルを照会し、foreachの機能内の他のカサンドラのテーブルに保存する必要があるということです
シリアル化スタックorg.apache.spark.api.java.JavaSparkContext:
オブジェクト直列化可能ではない(クラス:org.apache.spark.apiを

java.io.NotSerializableException:私は有名な例外を取得しています.java.JavaSparkContext ....

implements ForeachFunction<Row>という新しいクラスを実装しましたが、Java Sparkのコンテキスト変数を作成しましたが、私はまだ同じ例外が発生しています。

foreach関数を静的にしなければならない人がいるかもしれませんが、cassandraテーブルを保存/照会するロジックを助けるためにオブジェクトを渡す必要があるため、これは不可能です。このシナリオ?

しかし、私はここで何が欠けているのか分かりません。

+0

論理の例を書けますか? 'foreach'を使うのではなく、おそらくそれをデータの変換として書く必要があります。 (結合されたデータフレームの結合、差分、結合など) – maasg

答えて

0

エグゼキュータでスパークコンテキストを使用することはできません。しかし、あなたの問題に少なくとも2つのソリューションがあります

  • は、データフレームを収集し、ドライバーに地元のforeachを実行します(ただし、これはカサンドラの実行への呼び出し一つずつようになりますと、おそらく非常に遅くなります)
  • すべての操作を組み合わせることは、ジョインを使用してテーブル1,2および3上の単一のデータフレームに変換します。次に、cassandra DFにこれらを組み込み、フィルタを実行してエグゼキュータ上のcassandraに保存します(これは最速かつ並列のソリューションですが、いくつかの特別なコーディングが必要です)。