2016-04-12 3 views
0

状況

新しい小さなファイルが定期的に入ってきます。私は最近の300ファイルの計算をする必要があります。だから、基本的に前進している窓がある。ウィンドウのサイズは300で、ウィンドウ上で計算が必要です。最新の300個のファイルを処理するsparkプログラムを設計するには?

しかし、非常に重要なことは、これがスパークストリームコンピューティングではないことです。スパークストリームでは、ウィンドウの単位/範囲は時間です。単位/範囲はファイルの数です。

ソリューション1

私は辞書を維持する、辞書の大きさは、それぞれの新しいファイルが入ってくる300、私は火花データフレームにそれを回すと、辞書に入れてあります。次に、dictの長さが300を超える場合、dictの中の最も古いファイルがでポップされていることを確認します。 その後、私はdict内のすべてのデータフレームをより大きなものにマージし、計算を行います。

上記のプロセスはループで実行されます。新しいファイルが来るたびに、私たちはループを辿ります。

ソリューション1

ためのソリューション1

for file in file_list: 
    data_frame = get_data_frame(file) 
    my_dict[ timestamp ] = data_frame 

    for timestamp in my_dict.keys(): 
     if timestamp older than 24 hours: 
      # not only unpersist, but also delete to make sure the memory is released 
      my_dict[timestamp].unpersist 
      del my_dict[ timestamp ] 

    # pop one data frame from the dict 
    big_data_frame = my_dict.popitem() 

    for timestamp in my_dict.keys(): 
     df = my_dict.get(timestamp) 
     big_data_frame = big_data_frame.unionAll(df) 

    # Then we run SQL on the big_data_frame to get report 

問題のための擬似コードは、必ずメモリまたはGCオーバーヘッドの制限のうち

質問

あなたは何を見るかをを打ちますソリューション1には不適切ですか?

もっと良い解決策はありますか?

これはsparkを使用するのに適した状況ですか?

+0

あなたの計算ウィンドウは300であると言っていますか?しかし、ソリューション1では、最も古いファイルをポップアップすると299個の古いファイルが残っていますか?私の理解を明確にしていただけますか? –

+0

@LokeshKumarPこんにちは、私は質問を修正しました。私はデータを飛び出す前にその辞書をチェックします。 dictの全長が300に達しない場合、私は何もポップしません。まだ明確でない場合教えてください –

+0

説明をお寄せいただきありがとうございます。各ファイルのサイズは何ですか?また、ストリーミングジョブではないと言われました。 –

答えて

0

おそらくpopitemを使用したくない場合、Python辞書のキーはソートされていないので、最も早いアイテムをポップしているとは限りません。代わりに、ソートされたタイムスタンプのリストを使用して毎回辞書を再作成します。あなたのファイル名がされていると仮定するだけでは、タイムスタンプ:これはあなたの問題を解決するかどう

my_dict = {file:get_dataframe(file) for file in sorted(file_list)[-300:]} 

は、あなたが質問にあなたのエラーの完全なスタックトレースを貼り付けることができわかりませんか? Sparkのマージ/ジョインで問題が発生している可能性があります(質問には含まれていません)。

+0

ありがとうmaxymoo。私は実際にはdictのキーとしてタイムスタンプを使用しています。したがって、最も古いデータフレームを見つけることは問題ではありません。スタックトレースはあまりにも大きいです。しかし、あなたは正しいです、私は後でそれを掲示するでしょう。ありがとうございます –

+0

私のポイントは、 'pop'を使って最も古いアイテムを削除しようとすると' OrderedDict'を使う必要があるということです。通常のdictは常にキーをソートするわけではないので、 'pop'は最も早い日付になります。ありがとう。 – maxymoo

+0

私は実際にシンプルなポップの方法を実際に使用していません。私は普通の辞書を使います。次に、キーを使用して、対応する項目を削除する必要があるかどうかを判断します。 –

0

私の提案はストリーミングですが、時間に関してではなく、まだウィンドウとスライド間隔が設定されていますが、60秒と言います。

したがって、60秒ごとに、ファイルコンテンツのDStreamが「x」パーティションで取得されます。これらの「x」パーティションは、HDFSまたはファイルシステムにドロップしたファイルを表します。 このようにして、読み取られたファイル/パーティションの数を300以下にしてから300になるまで待つことができます。カウントが300に達すると、処理を開始できます。それが最新のファイルを追跡することが可能ですか、それだけでたまにはそれらを発見することが可能ですなら、私は

sc.textFile(','.join(files)); 

か、特定することが可能かどうような何かをすることをお勧めしたい場合は

0

特定のこれらの300個のファイルを取得するためのパターン、そして

sc.textFile("*pattern*"); 

そして、それはカンマで区切られたパターンを有することも可能ですが、一つのパターン以上に一致するいくつかのファイルは、より多くの、何度も読まされることが起こるかもしれません。

+0

こんにちはありがとうございます。はい、私たちはパターンの方法を使用することができます。しかし、問題はちょうどあなたが言ったようなものです、いくつかのファイルが複数回読み込まれる可能性があります。そのため、私はすでに読み込まれたファイルを追跡するために辞書を使用しています。私はそれらの読み取りコストを避けたい –

+0

その後、コンマで区切られたリストに参加し、毎回読む。コードはそれほどシンプルなので、ディスク読み取りのパフォーマンスについてあまり心配する必要はありません。 SSDがあり、ディスクバッファがあります。そして、処理する時間と読む時間のパーセンテージは何ですか。とにかく、私はそれが懸念されている場合、私は読んで時間を測定するだろうが、おそらくちょうど単純なコードで終わる=) – evgenii

関連する問題