状況
新しい小さなファイルが定期的に入ってきます。私は最近の300ファイルの計算をする必要があります。だから、基本的に前進している窓がある。ウィンドウのサイズは300で、ウィンドウ上で計算が必要です。最新の300個のファイルを処理するsparkプログラムを設計するには?
しかし、非常に重要なことは、これがスパークストリームコンピューティングではないことです。スパークストリームでは、ウィンドウの単位/範囲は時間です。単位/範囲はファイルの数です。
ソリューション1
私は辞書を維持する、辞書の大きさは、それぞれの新しいファイルが入ってくる300、私は火花データフレームにそれを回すと、辞書に入れてあります。次に、dictの長さが300を超える場合、dictの中の最も古いファイルがでポップされていることを確認します。 その後、私はdict内のすべてのデータフレームをより大きなものにマージし、計算を行います。
上記のプロセスはループで実行されます。新しいファイルが来るたびに、私たちはループを辿ります。
ソリューション1
ためのソリューション1
for file in file_list:
data_frame = get_data_frame(file)
my_dict[ timestamp ] = data_frame
for timestamp in my_dict.keys():
if timestamp older than 24 hours:
# not only unpersist, but also delete to make sure the memory is released
my_dict[timestamp].unpersist
del my_dict[ timestamp ]
# pop one data frame from the dict
big_data_frame = my_dict.popitem()
for timestamp in my_dict.keys():
df = my_dict.get(timestamp)
big_data_frame = big_data_frame.unionAll(df)
# Then we run SQL on the big_data_frame to get report
問題のための擬似コードは、必ずメモリまたはGCオーバーヘッドの制限のうち
質問
あなたは何を見るかをを打ちますソリューション1には不適切ですか?
もっと良い解決策はありますか?
これはsparkを使用するのに適した状況ですか?
あなたの計算ウィンドウは300であると言っていますか?しかし、ソリューション1では、最も古いファイルをポップアップすると299個の古いファイルが残っていますか?私の理解を明確にしていただけますか? –
@LokeshKumarPこんにちは、私は質問を修正しました。私はデータを飛び出す前にその辞書をチェックします。 dictの全長が300に達しない場合、私は何もポップしません。まだ明確でない場合教えてください –
説明をお寄せいただきありがとうございます。各ファイルのサイズは何ですか?また、ストリーミングジョブではないと言われました。 –