2017-06-09 1 views
1

私が知る限り、sparkは、cassandraから読み込むときに、最大1つのタスクをcassandraパーティションごとに使用します。残念ながら、私は非常にアンバランスな(初期のテーブルデザインが悪い)いくつかのパーティションをcassandraに持っています。そのデータを新しいテーブルに読み込む必要があります。これはホットスポットを処理するために設計されていますが、通常のスパーク・アベニューを使用すると効果的に機能しません。私は永遠に動作するいくつかのタスク(10+)を残して、それらのいくつかの巨大なパーティションキーに取り組んでいます。パーティションホットスポットを持つcassandraデータを読み出すためにsparkを効果的に使用するにはどうすればよいですか?

スケールの考え方を示すために、これはサイズが約1.5TBで、レプリケーションファクタが3の5台のサーバーにまたがっています。ノードあたり〜500GB。

他のアイデアは大歓迎ですが、単にCSVにダンプするのはおそらく現実的な選択肢ではありません。

マテリアライズドビューの作成は、これまでのところ行われていません。それは完全に長すぎます。少なくとも3.0.8では、作成中に監視がほとんどまたはまったくありません。

答えて

1

これは本当に自動的に解決することはできない難しい問題です。しかしあなたのデータが本当に巨大なファイルの中にどのように配分されているか知っていれば、私はあなたにオプションを与えることができます。

テーブルを表すために1つのRDD/DataFrameを実行する代わりに、複数のコールに分割して結合します。

基本的にはあなたが私たちの最大のパーティションがこの

Key1 -> C1, C2, C3, ..., C5000000 

のように設定して、我々は一般的なCは

Min C = 0 
Max C = 5000000 
Average C = 250000 

状に分布さに私たちがいることを推測することができます知っているを考えると、この

をしたいですレンジのプッシュダウンを100K Cの値ごとに行うことで、これらの大きなパーティションをきれいにカットすることができます。

val interval = 100000 
val maxValue = 500000 
sc.union(
(0 until maxValue by interval).map{ lowerBound => 
    sc.cassandraTable("ks", "tab") 
    .where(s"c > $lowerBound AND c < ${lowerBound + interval}") 
    } 
) 

私たちは、より小さなパーティション(おそらく空のものの多く)で終わるが、これは私たちが成功し、これらの巨大なパーティションを切るようにする必要があります。これは、パーティション内の値の分布を把握できる場合にのみ実行できます。

注::同じことは、私は、確信を持って、*各パーティション内の最大範囲を知って*で、これはオプションかもしれ組合-INGのデータフレーム

+0

で可能です。また、DF.groupBy( '​​partitionKey).count.describeというスパークを行うこともできます。このディストリビューションで配布されるはずです。パーティションキーは実際には最初のクラスタリングキーのプレフィックスです。このアプローチはさらに簡単になります。 私はこれをspark-cassandra-connectorの欠陥の何かに見ています。それに取り組むための標準的な方法を見つけることは面白いでしょう。 – Loki

+0

問題は、Cの分布を知らなくても、カットオフポイントを確立する場所を知る方法がありません。 Cassandraにおおよその統計情報があるまでは、自動的に行うことはあまりありません。しかし、ポイントはあなたが全体を読む前にスライスとユニオンを行うことです。 – RussS

関連する問題