2015-09-15 13 views
11

私は大きなリストの上にパラレルマップを作成したい。コードは次のように多少なりますElixir StreamのTask.async

big_list 
|> Stream.map(&Task.async(Module, :do_something, [&1])) 
|> Stream.map(&Task.await(&1)) 
|> Enum.filter filter_fun 

しかし、私はストリームの実装をチェックしていたし、私の知る限り理解しStream.mapが機能を兼ね備え、シーケンスは次のようであることを意味するストリームの要素に組み合わせた機能を適用します。

  1. 最初の要素を取り、第2 elelemntを取る
  2. を終了するのを待って、非同期タスク
  3. を作成...

この場合、パラレルではありません。私は正しいのですか、何か不足していますか?

私が正しいとすれば、このコードはどうですか?

Stream.map Task.async ... 
|> Enum.map Task.await ... 

これは並行して実行されますか?

+2

がこれを読んで - http://www.theerlangelist.com/2015/07/beyond-taskasync.html – emaillenin

答えて

9

第2のものもあなたが望むことをしません。

defmodule Test do 
    def test do 
    [1,2,3] 
    |> Stream.map(&Task.async(Test, :job, [&1])) 
    |> Enum.map(&Task.await(&1)) 
    end 

    def job(number) do 
    :timer.sleep 1000 
    IO.inspect(number) 
    end 
end 

Test.test 

数字が表示され、次に1秒待機し、別の番号などが表示されます。ここで重要な点は、できるだけ早くタスクを作成したいということです。したがって、 遅延型のStream.mapは使用しないでください。代わりに、その時点で熱心Enum.mapを使用します。

|> Enum.map(&Task.async(Test, :job, [&1])) 
|> Enum.map(&Task.await(&1)) 

一方待っているときに限り、あなたはあなたのfilterのように、後でいくつかの熱心な操作を行うと、Stream.mapを使用することができます。そうすれば、あなたは結果に何らかの処理をしているかどうか分かります。

4

Elixir 1.4は、列挙可能な各項目で同時に指定の関数を実行するストリームを返す新しいTask.async_stream/5関数を提供します。

:max_concurrency:timeoutオプションパラメータを使用して、最大ワーカー数とタイムアウトを指定するオプションもあります。


これはあなたの例では、同時に実行ようになります:

big_list 
|> Task.async_stream(Module, :do_something, [&1]) 
|> Enum.filter(filter_fun) 
0

あなたがParallel Streamを試すことができます。

stream = 1..10 |> ParallelStream.map(fn i -> i * 2 end) 
stream |> Enum.into([]) 
[2,4,6,8,10,12,14,16,18,20] 

UPD またはそれ以上使用Flow