2016-04-01 6 views
0

私は次の構造を持つカサンドラテーブルを持っている:スパークとカサンドラ、キークラスタリングに範囲クエリは

TABLEテーブルを作成します( キーはint、 タイムスタンプ、 対策フロート、 主キー(キー、時間) );

私は、タイムスタンプが戻ってカサンドラに何らかの処理、およびフラッシュ結果を行う指定された開始と終了の範囲内、前のテーブルからデータを読み込みますスパークジョブを作成する必要があります。

私のspark-cassandraコネクタは、クラスタリングのcassandraテーブルの列に対して範囲クエリを実行する必要があります。私がしなければ

は、パフォーマンスの違いがあります:

sc.cassandraTable(keyspace,table). 
as(caseClassObject). 
filter(a => a.time.before(startTime) && a.time.after(endTime)..... 

ので、私がやっていることはスパークにすべてのデータをロードし、フィルタリング

を適用したり、私はこれを行う場合はされています

sc.cassandraTable(keyspace, table). 
where(s"time>$startTime and time<$endTime)...... 

は、Cassandraのすべてのデータをフィルタリングし、小さなサブセットをSparkにロードします。

範囲クエリの選択性は約1%です。 クエリにパーティションキーを含めることはできません。

これらの2つのソリューションのどちらを優先しますか?

答えて

2
sc.cassandraTable(keyspace, table).where(s"time>$startTime and time<$endTime) 

もっと速くなります。基本的には、同じデータを得るために、最初のコマンドで完全グラブのパーセンテージ(全仕事の5%を5%だけ引く場合)を実行しています。あなたはカサンドラのすべてのデータを読み込み

  1. ある最初のケースで

  2. すべてのオブジェクトをシリアライズし、それをSparkに移動します。
  3. 最後にすべてをフィルタリングします。あなたが実際にはステップ3

はありませんこれだけの小さなサブセット

  • のシリアルC *
  • から必要なデータだけを読み込み

    1. ある第2のケースで

    追加のコメントとして、ケースクラスタイプをコールに直接入れることができます

    sc.cassandraTable[CaseClassObject](keyspace, table) 
    
  • +0

    私は1/20が誇張だと思います。主に時間がクラスタリングされているので、cassandraは常にテーブルスキャン全体を行います。しかし、実際には、キャッサンドラからスパークへのデータの移動に費やされる時間が節約されます。 –

    +0

    私は間違いなく読み取られたデータの量に比例削減することを期待します。 1レンジ・スキャンを行っていないでしょうしながら、レンジ・スキャンが読み取られるように、すべてのsstablesを必要としないので)読まれるために必要なsstablesの数が大幅に少なくなります。 2)C * JVMを通過する必要がオブジェクトの量がはるかに少ないことウィル、やはりこれが割り当てられ、デシリアライズされるかなりの量の少ないオブジェクトです。ガベージコレクションが少なくて済み、オブジェクトの割り当てが少なくて済みます。 3)のC *プロセスとスパークエグゼキュータ・プロセスの間のトラフィックの量が最小化されます。これはゼロコピーシステムではありません。 – RussS

    +0

    お返事ありがとうございます。私はこの振る舞いを期待しますが、必ずしたかった 。私の唯一の関心事は、cassandraは(私はパーティションキーの条件を指定していないため)完全なテーブルスキャンを行わなければならないということでしたが、とにかく、2番目の解決方法は道のりが良いです。 –