私はreduceByKey
ステップの数を最小限に抑えて、より少ないステージで実行するようにZipkin Dependencies Spark jobを最適化しようとしています。データは以下の表から読み込まれます:あり1人のSparkワーカーでCassandraパーティションのすべての行を読み取ることはできますか?
CREATE TABLE IF NOT EXISTS zipkin.traces (
trace_id bigint,
ts timestamp,
span_name text,
span blob,
PRIMARY KEY (trace_id, ts, span_name)
)
は、単一のパーティションtrace_id
は、完全なトレースが含まれており、数百行にどこにでも数から含まれています。しかし、そのパーティション全体がSparkジョブによって非常に単純なRDD[((String, String), Long)]
に変換され、エントリ数が数十億から数百に減少します。
残念ながら、現在のコードは
sc.cassandraTable(keyspace, "traces")
を経由して独立してすべての行を読み込み、RDD[((String, String), Long)]
を思い付く2つのreduceByKey
の手順を使用してそれをやっています。 1つのSparkワーカープロセスでパーティション全体を読み取ってすべてをメモリ上で処理する方法があれば、速度が大幅に改善され、現在のものから出てくる巨大なデータセットを保存/ストリームする必要がなくなります最初の段階。
- 編集 -
明確にするために、ジョブがテーブルからすべてのデータ、パーティションの数十億を読み取る必要があります。
コメントを残したのは –
でしたが、Sparkの仕事は*すべての*パーティションを読むと予想されています。このアプローチは機能しません。 –
ああ、おそらくあなたはspanByKeyをしたいですか?基本的に、ReduceByKeyのシャッフルを避ける場合は、大丈夫です。 https://github.com/datastax/spark-cassandra-connector/blob/edba853b9630f60de2b3f1b0db2118792a5a5a89/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/PairRDDFunctions.scala – RussS