sparkを使用してcassandraの複数のテーブルを操作/反復/スキャンする方法の問題があります。私たちのプロジェクトではspark & spark-cassandra-connectorを使用して複数のテーブルをスキャンし、異なるテーブル内の関連する値を一致させ、一致する場合はテーブル挿入などの追加アクションを実行します。ユースケースは、以下のようなものです:スパークRDDはシリアライズないなどの問題がspark-cassandra-connectorを使用して複数のcassandraテーブルをスキャンするためにスパークを使用する
ある
sc.cassandraTable(KEYSPACE, "table1").foreach( row => { val company_url = row.getString("company_url") sc.cassandraTable(keyspace, "table2").foreach( val url = row.getString("url") val value = row.getString("value") if (company_url == url) { sc.saveToCassandra(KEYSPACE, "target", SomeColumns(url, value)) } ) })
、ネストされた検索がsc.cassandraTableはRDDを返す原因を失敗します。私が回避するために知っている唯一の方法は、sc.broadcast(sometable.collect())を使うことです。しかし、もしそれが大きければ、収集はすべての記憶を消費するでしょう。また、ユースケースでは、いくつかのテーブルがブロードキャストを使用する場合、メモリを使い果たします。
ブロードキャストの代わりに、RDD.persistがケースを処理できますか?私の場合は、sc.cassandraTableを使用してRDD内のすべてのテーブルを読み込み、ディスクに保存し直してから処理するためにデータを取得します。それがうまくいくならば、どのようにしてrddの読み取りがチャンクによって行われることを保証できますか?
スパークの他に、優雅にケースを扱うことができる他のツール(ハープなど)がありますか?
ほとんどの場合、文字列等価演算子ではなく、string.containsを使用して関連する列を比較する必要があるため、結合を行うことはできません。 – user8053367
Solrのようなセカンダリインデックスがない限り、デカルト結合が必要です。 – RussS
ありがとうございます。私がデカルト結合を行うと、その結果はメモリを使い果たしてしまうほど巨大になるでしょうか? そして、セカンダリインデックスを使って処理する方法は? – user8053367