2013-04-26 11 views
5

scalaz iterateeパッケージを使用して、一定のスペースで大きなzipファイルを処理しようとしています。 zipファイルの各ファイルで実行する必要がある、長時間実行されるプロセスがあります。これらのプロセスは並行して実行できます。Scalaz 7大きなzipファイルを処理するためのIteratee(OutOfMemoryError)

FileオブジェクトにそれぞれZipEntryを膨らませたEnumeratorTを作成しました。

def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO] 

私は、各ファイルに実行時間の長いプロセスを実行しますIterateeTを添付したい:シグネチャは次のようになります。私はそれを実行しようとすると

type IOE[A] = IoExceptionOr[A] 

def action(f:File):IO[List[Promise[IOE[File]]]] = (
    consume[Promise[IOE[File]], IO, List] %= 
    map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %= 
    map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &= 
    enumZipFile(f) 
).run 

def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] = 
    Promise { Thread.sleep(5000); iof } 

:私は基本的にのようなもので終わる

action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get 

私はjava.lang.OutOfMemoryError: Java heap spaceメッセージが表示されます。それはこれらのすべてのメモリに大量のリストを構築しようとしているので、私には意味があります。IOPromiseオブジェクトです。

いくつかの質問:

  • 誰もがこれを回避する方法上の任意のアイデアを持っていますか?私は実際にその副作用のためにlongRunningProcessを気にするだけなので、問題に間違って近づいているように感じます。
  • ここではEnumeratorのアプローチが間違ったアプローチですか?

私はかなり多くのアイディアを持っているので、何か助けになります。

ありがとうございます!ここで

更新#1

は、スタックトレースです:

[error] java.lang.OutOfMemoryError: Java heap space 
[error]   at scalaz.Free.flatMap(Free.scala:46) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 

私は現在、私はそれだと思うようにすべてが機能していることを確認するためにnadavwrのアドバイスを受けています。私は更新を報告します。以下の回答の両方からのアイデアを使用して

更新#2

、私はまともな解決策を見つけました。 huynhjlが示唆したように(そしてnadavwrのヒープダンプ解析の提案を使用して確認した)、consumeはメモリが不足しているため、メモリが不足していました。私はconsumefoldMに変更し、ファイルへの参照の代わりにPromise[IOE[Unit]]を返すように長期実行プロセスを更新しました。そうすれば、私は最後にすべてのIoExceptionsのコレクションを持っています。

def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
    foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %= 
    map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %= 
    map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &= 
    enumZipFile(f) 
).run 

def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] = 
    Promise { Thread.sleep(5000); iof.map(println) } 

このソリューションは、非同期アップロード中に各エントリを膨張させます。最終的には、エラーを含むPromiseのオブジェクトの膨大なリストがあります。私はまだこれがIterateeの正しい使用であると完全には確信していませんが、私は今私たちのシステムの他の部分で使うことができるいくつかの再利用可能な構成可能な部分を持っています。

ありがとうございました!

+0

長いプロセスは何をしますか? zipコンテンツから何かを計算していますか? – huynhjl

+0

zipファイルの各ファイルはイメージです。長いプロセスは、そのファイルをRackspace CloudFilesにアップロードします。これを理解したら、イメージのサイズを変更してアップロードするプロセスを追加する必要があります。 –

+0

Iterateesは、作業負荷を並列化したいので、このジョブの間違った抽象化のように感じます。俳優たちはもっとうまくいくと思う。 – huynhjl

答えて

4

consumeを使用しないでください。私の他の最近の答えを見てください:How to use IO with Scalaz7 Iteratees without overflowing the stack?

foldMが良い選択かもしれません。

ファイルを別のもの(成功戻りコードのようなもの)にマップして、JVMが膨らんだzipエントリをガベージコレクトできるかどうかを確認してください。

+0

ありがとうございます。結局、 'foldM'を使うことがキーのように見えました。 –

0

迅速に通読し、何とか代わりに「メモリ不足エラー」の私の心の中で立ち往生「スタックオーバーフロー」を持っていた後、私は答えを出し始めた...それでも

:-) URLを指定する必要があり、再帰に依存する機能的な計算はスタックオーバーフローの影響を受けやすいので、私はあらゆる身体が遭遇した場合にその答えを残し、より関連性の高い答えを出そうと約束します。

スタックオーバフローがあった場合は、再帰とスタックの間でスタックから計算を引き出す構造である 'trampoline'が必要になります。

@ eed3si9nの優れた一連の記事の一部であるLearning Scalaz Day 18のセクション「フリーモナドを使ったスタックレススカラ」を参照してください。

も参照してください。@mpilquistによるthis gistも参照してください。これは、トランポリン処理したイテレートを示しています。ファイルデフレについて?彼らはあなたがそれらがあることを期待した回数だけ実行されているか?あなたのlongRunningProcessどのように高価(メモリの面で

+1

長期的で機能的なプロセスについて話しているときは、stackoverflow.comは残念な名前です。 –

1

?(簡単なカウンターが参考になる)

スタックトレースします

メモリが大量に消費されていることを確認したい場合は、-XX:+HeapDumpOnOutOfMemoryError JVM引数を使用してVisualVM、Eclipseで分析することができますMAT、または他のヒープアナライザ

フォローwup

あなたが約束を列挙していることは私にとっては奇妙なようです。列挙子とiterateeの両方から独立した計算を開始するのは直観的ではありません。 iterateeベースのソリューションは、約束の代わりに '不活性'要素を返す列挙子によって優れたサービスを提供することができます。残念ながら、これは個々のファイルのシリアル処理を行いますが、それはya-non-blockingストリーム処理のためのiterateesです。

アクターベースのソリューションはより良いIMHOに適合しますが、アクターとイテレート(特に後者)は、達成しようとしているもの(少なくともあなたが共有している部分)の過剰なようです。

Scala 2.10のscala.concurrentパッケージの明白な未来/約束を考えてください。また、Scalaの並列コレクションも見てください。これらが不十分であることを証明する前に、コードに追加の概念を導入することはありません。あなたの並列性を制限するための固定サイズのExecutionContextを定義してみてください。

+0

すばらしいアドバイス。私はそれが前提であるように、すべてが実行されていることを確認するためにステップバイステップで行くつもりです。上記の質問をスタックトレースで更新しました。私は次にヒープダンプを試みるつもりです。ありがとう! –

+0

あなたのフォローアップについて:私はこのプロセスにIterateeを使用することについてのあなたの懸念に同意します。私が投稿したものから、それは間違いなく過度のようです。しかし、ファイル(またはファイル)をダウンロードしたり、コンテンツをストリーミングしたり、各エントリを処理したり、結果を使って何かをやったりするパターンは、アプリのどこでも使用されています。私は、Iterateeが私にこれらのより大きなプロセスを構築するのに使うことができる素敵で再利用可能なコードを与えたような気がします。あなたの時間と助けをありがとう、ありがとう! –