2017-05-24 14 views
0

sparkを使用してcassandraの複数のテーブルを操作/反復/スキャンする方法の問題があります。私たちのプロジェクトではspark & spark-cassandra-connectorを使用して複数のテーブルをスキャンし、異なるテーブル内の関連する値を一致させ、一致する場合はテーブル挿入などの追加アクションを実行します。ユースケースは、以下のようなものです:スパークRDDはシリアライズないなどの問題がspark-cassandra-connectorを使用して複数のcassandraテーブルをスキャンするためにスパークを使用する

  1. ある

    sc.cassandraTable(KEYSPACE, "table1").foreach(
        row => { 
        val company_url = row.getString("company_url") 
    
        sc.cassandraTable(keyspace, "table2").foreach(
         val url = row.getString("url") 
         val value = row.getString("value") 
         if (company_url == url) { 
          sc.saveToCassandra(KEYSPACE, "target", SomeColumns(url, value)) 
         } 
        ) 
    }) 
    

    、ネストされた検索がsc.cassandraTableはRDDを返す原因を失敗します。私が回避するために知っている唯一の方法は、sc.broadcast(sometable.collect())を使うことです。しかし、もしそれが大きければ、収集はすべての記憶を消費するでしょう。また、ユースケースでは、いくつかのテーブルがブロードキャストを使用する場合、メモリを使い果たします。

  2. ブロードキャストの代わりに、RDD.persistがケースを処理できますか?私の場合は、sc.cassandraTableを使用してRDD内のすべてのテーブルを読み込み、ディスクに保存し直してから処理するためにデータを取得します。それがうまくいくならば、どのようにしてrddの読み取りがチャンクによって行われることを保証できますか?

  3. スパークの他に、優雅にケースを扱うことができる他のツール(ハープなど)がありますか?

答えて

0

実際に一連のインナー結合を実行しようとしているようです。これは、あなたがカサンドラ表に直接クエリを実行するために一つのRDDの要素を使用することができます

joinWithCassandraTable方法

を参照してください。 Cassandraから読んでいるデータの割合によっては、これがあなたの最善の策かもしれません。端数が大きすぎる場合は、2つの表を別々に読んでから、RDD.joinメソッドを使用して行を整列させてください。

その他すべてが失敗した場合は、いつでも手動でCassandraConnectorオブジェクトを手動で使用して、Javaドライバに直接アクセスし、分散コンテキストからの生の要求を行うことができます。

+0

ほとんどの場合、文字列等価演算子ではなく、string.containsを使用して関連する列を比較する必要があるため、結合を行うことはできません。 – user8053367

+0

Solrのようなセカンダリインデックスがない限り、デカルト結合が必要です。 – RussS

+0

ありがとうございます。私がデカルト結合を行うと、その結果はメモリを使い果たしてしまうほど巨大になるでしょうか? そして、セカンダリインデックスを使って処理する方法は? – user8053367

関連する問題