2017-02-13 17 views
0

一般的に、ストリームを作成した後、ストリームを開始してストリーミングを開始します。直接音。File.streamの分散参照を作成するにはどうすればよいですか?

同様:

File.stream!(path) 
|> Stream.run 

しかし、私はそれを実行する必要があるまで、どのように私はストリームを実行している延期することができますか?私が見ることができる唯一の方法は、ストリームを分散リファレンスに登録し、後でストリームを実行することです。

これも可能ですか?どうやって? enter image description here

+0

ストリームの最初のアイテム(または任意のアイテム)を任意の期間遅延させたいと思っていますか? – Dogbert

+0

いいえ、私は必要なときにのみ、その参照を介して上記の図のようなものをトリガーすることができます。 – simo

+0

key-> streamのマップを格納するGenServerの作成、キーを持つストリームを格納し、キーを受け取って新しいプロセスでそのストリームを実行し、それを状態から削除する ''呼び出し '? – Dogbert

答えて

0

は、ストリームを格納し、GenServerを使用して、後でそれを実行するための一つの方法です:

defmodule StreamStore do 
    use GenServer 

    def start_link do 
    GenServer.start_link(__MODULE__, []) 
    end 

    def init(_) do 
    {:ok, %{map: %{}, next: 0}} 
    end 

    def put(server, stream) do 
    GenServer.call(server, {:store, stream}) 
    end 

    def run(server, key) do 
    GenServer.call(server, {:run, key}) 
    end 

    def handle_call({:store, stream}, _from, %{map: map, next: next}) do 
    state = %{map: Map.put(map, next, stream), next: next + 1} 
    {:reply, next, state} 
    end 

    def handle_call({:run, key}, _from, %{map: map} = state) do 
    # If the key exists, pop and run it, otherwise return `{:error, :badkey}`. 
    case Map.pop(map, key) do 
     {nil, _} -> 
     {:reply, {:error, :badkey}, state} 
     {stream, map} -> 
     Task.start_link(Stream, :run, [stream]) 
     {:reply, :ok, %{state | map: map}} 
    end 
    end 
end 

{:ok, ss} = StreamStore.start_link 

# I'm storing a `Stream.map` over a list here, but you can store file streams as well. 
first = StreamStore.put(ss, [1, 2, 3] |> Stream.map(&IO.puts/1)) 
second = StreamStore.put(ss, [4, 5, 6] |> Stream.map(&IO.puts/1)) 
third = StreamStore.put(ss, [7, 8, 9] |> Stream.map(&IO.puts/1)) 

Process.sleep(100) 
IO.inspect StreamStore.run(ss, second) 
Process.sleep(100) 
IO.inspect StreamStore.run(ss, third) 
Process.sleep(100) 
IO.inspect StreamStore.run(ss, first) 
Process.sleep(100) 
# The stream has been removed from the store, so this will print `{:error, :badkey}` 
IO.inspect StreamStore.run(ss, first) 

出力:

:ok 
4 
5 
6 
:ok 
7 
8 
9 
:ok 
1 
2 
3 
{:error, :badkey} 

StreamStore.put/2後でその特定のストリームを実行するためにStreamStore.run/2に渡すことができるキーを返します。私はTask.start_linkを使用してプロセスを生成し、そこでストリームを実行しています。おそらくそれを使う前にすべてのものを調整したいと思うでしょうが、これは良いスタートです。

+0

、それは私のための良いスタートです、私は読書を続けます – simo

0

File.Stream!は、あなたが望むとおりに保存して何度でも実行できる値を返します。ここでは

EDIT は私がしようとしているものです。ストリームはElixirで関数の構成として実装されています。このようなもの。いつか、後で

stream_maps = Map.put(%{}, client_tag, File.Stream!(path)) 

stream_maps |> Map.get(client_tag) |> Stream.run 

あなたは、元のFile.Streamからストリーム関数のいずれかの組成物にこれを拡張することができます!ここで

+0

ありがとう、しかし、私は後でそれを使用するstream_mapsを格納する方法が必要 – simo

+0

参照、文字列を使用して?それは私が探しているものです – simo

関連する問題