0

私はreduceByKeyステップの数を最小限に抑えて、より少ないステージで実行するようにZipkin Dependencies Spark jobを最適化しようとしています。データは以下の表から読み込まれます:あり1人のSparkワーカーでCassandraパーティションのすべての行を読み取ることはできますか?

CREATE TABLE IF NOT EXISTS zipkin.traces (
    trace_id bigint, 
    ts  timestamp, 
    span_name text, 
    span  blob, 
    PRIMARY KEY (trace_id, ts, span_name) 
) 

は、単一のパーティションtrace_idは、完全なトレースが含まれており、数百行にどこにでも数から含まれています。しかし、そのパーティション全体がSparkジョブによって非常に単純なRDD[((String, String), Long)]に変換され、エントリ数が数十億から数百に減少します。

残念ながら、現在のコードは

sc.cassandraTable(keyspace, "traces") 

を経由して独立してすべての行を読み込み、RDD[((String, String), Long)]を思い付く2つのreduceByKeyの手順を使用してそれをやっています。 1つのSparkワーカープロセスでパーティション全体を読み取ってすべてをメモリ上で処理する方法があれば、速度が大幅に改善され、現在のものから出てくる巨大なデータセットを保存/ストリームする必要がなくなります最初の段階。

- 編集 -

明確にするために、ジョブがテーブルからすべてのデータ、パーティションの数十億を読み取る必要があります。

答えて

1

シャッフルを行うことなく、同じ火花ワーカー上のすべてのパーティションのデータを保持するための鍵は、変更のすべてがで行われますよりも、何のシャッフルがない場合spanByKey

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#grouping-rows-by-partition-key

CREATE TABLE events (year int, month int, ts timestamp, data varchar, PRIMARY KEY (year,month,ts)); 

sc.cassandraTable("test", "events") 
    .spanBy(row => (row.getInt("year"), row.getInt("month"))) 

sc.cassandraTable("test", "events") 
    .keyBy(row => (row.getInt("year"), row.getInt("month"))) 
    .spanByKey 

を使用することですイテレータとして一緒にパイプライン化されています。

警告に注意することを確認します。

注:順次これだけの作品は、データを命じました。データはクラスタリングキーによってCassandraで注文された なので、実行可能なすべてのスパンは、 の自然なクラスタリングキーの順序に従わなければなりません。

+0

コメントを残したのは –

+0

でしたが、Sparkの仕事は*すべての*パーティションを読むと予想されています。このアプローチは機能しません。 –

+0

ああ、おそらくあなたはspanByKeyをしたいですか?基本的に、ReduceByKeyのシャッフルを避ける場合は、大丈夫です。 https://github.com/datastax/spark-cassandra-connector/blob/edba853b9630f60de2b3f1b0db2118792a5a5a89/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/PairRDDFunctions.scala – RussS

関連する問題