HDFSに格納されている40 MBから160 MBまでのサイズの約3000のパーケットファイル(拡張子は.gz.parquet)があります。 HDFSブロックサイズは128 MBです。すべてのファイルの合計サイズは約360 GBです。すべてのファイルが同じディレクトリにあります。つまり、データは分割されていません。Spark SQL Thrift Serverでキャッシュされたデータのパーティション数を指定する
16ノードでSpark SQL Thrift Serverを起動し、実行プログラムごとに160 GBのメモリを指定します。各ノードには、256 GBのメモリと32のコアがあります。
IそしてIが「キャッシュテーブルacdata」を使用してデータをキャッシュ
「を寄せ木位置 『/ユーザ/ AC /データ』として記憶されている外部表acdata(...)を作成」を使用して外部表を作成します。これにより、期待どおりに約4000個のパーティションが作成されます。データは、クラスタ全体で約1200GBのメモリを占有し、データをキャッシュするために1280GB(16 * 160GB/2)を使用できるので、すべてがメモリに収まる必要があります。パーティションの範囲は530 MBから2 MBです。問題は、パーティションがノード全体に均等に分散していないことです。一部のノードには50GBのキャッシュデータがあり、他のノードには80GBのキャッシュデータがあります。
JDBCクライアントから「選択」クエリを実行します。キャッシュされたデータの少ないノードがローカル・データの処理を完了すると、他のノードにキャッシュされたデータの処理が開始され、このデータがネットワークを介して送信されます。これにより、select文の実行時間が長くなります。
データは毎日このディレクトリに追加され、データのサイズは毎日異なるため、HDFSに保存されたデータを再パーティション化することは選択できません。私は同じディレクトリにデータを少しずつ追加するのではなく、毎日すべてのデータを再分割する必要があります。
理想的には、すべてのノードでデータを均等に分散してすべてのタスクに同じ時間がかかるようにしたいと考えています。
もしそれができないのであれば、ノードはローカルにキャッシュされたデータだけを処理することをお勧めします。私は "spark.locality.wait"の値を増やすことでパフォーマンスを少し向上させることができますが、それはすべてのタスクに影響します。
優れたソリューションが、それは均等に分散されている列に依存しています。残念ながら、私は自分のデータにそのような列を持っていません。これの1つの副作用は、最初の再パーティションでは200個のパーティション(デフォルト値のspark.sql.shuffle.partitions)しか使用されないということでした。これにより、シャッフルパーティションが2 GBを超える場合、つまりデータの合計サイズが400 GBを超える場合、「Size is Integer.MAX_VALUE」というエラーが発生します。その問題を回避するには、 "cache table acdata"文の前に "set spark.sql.shuffle.partitions = 6000"を実行します。データをキャッシュした後、パーティションを200に戻します。 – ACH