一般的に、ストリームを作成した後、ストリームを開始してストリーミングを開始します。直接音。File.streamの分散参照を作成するにはどうすればよいですか?
同様:
File.stream!(path)
|> Stream.run
しかし、私はそれを実行する必要があるまで、どのように私はストリームを実行している延期することができますか?私が見ることができる唯一の方法は、ストリームを分散リファレンスに登録し、後でストリームを実行することです。
一般的に、ストリームを作成した後、ストリームを開始してストリーミングを開始します。直接音。File.streamの分散参照を作成するにはどうすればよいですか?
同様:
File.stream!(path)
|> Stream.run
しかし、私はそれを実行する必要があるまで、どのように私はストリームを実行している延期することができますか?私が見ることができる唯一の方法は、ストリームを分散リファレンスに登録し、後でストリームを実行することです。
は、ストリームを格納し、GenServerを使用して、後でそれを実行するための一つの方法です:
defmodule StreamStore do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, [])
end
def init(_) do
{:ok, %{map: %{}, next: 0}}
end
def put(server, stream) do
GenServer.call(server, {:store, stream})
end
def run(server, key) do
GenServer.call(server, {:run, key})
end
def handle_call({:store, stream}, _from, %{map: map, next: next}) do
state = %{map: Map.put(map, next, stream), next: next + 1}
{:reply, next, state}
end
def handle_call({:run, key}, _from, %{map: map} = state) do
# If the key exists, pop and run it, otherwise return `{:error, :badkey}`.
case Map.pop(map, key) do
{nil, _} ->
{:reply, {:error, :badkey}, state}
{stream, map} ->
Task.start_link(Stream, :run, [stream])
{:reply, :ok, %{state | map: map}}
end
end
end
{:ok, ss} = StreamStore.start_link
# I'm storing a `Stream.map` over a list here, but you can store file streams as well.
first = StreamStore.put(ss, [1, 2, 3] |> Stream.map(&IO.puts/1))
second = StreamStore.put(ss, [4, 5, 6] |> Stream.map(&IO.puts/1))
third = StreamStore.put(ss, [7, 8, 9] |> Stream.map(&IO.puts/1))
Process.sleep(100)
IO.inspect StreamStore.run(ss, second)
Process.sleep(100)
IO.inspect StreamStore.run(ss, third)
Process.sleep(100)
IO.inspect StreamStore.run(ss, first)
Process.sleep(100)
# The stream has been removed from the store, so this will print `{:error, :badkey}`
IO.inspect StreamStore.run(ss, first)
出力:
:ok
4
5
6
:ok
7
8
9
:ok
1
2
3
{:error, :badkey}
StreamStore.put/2
後でその特定のストリームを実行するためにStreamStore.run/2
に渡すことができるキーを返します。私はTask.start_link
を使用してプロセスを生成し、そこでストリームを実行しています。おそらくそれを使う前にすべてのものを調整したいと思うでしょうが、これは良いスタートです。
、それは私のための良いスタートです、私は読書を続けます – simo
File.Stream!
は、あなたが望むとおりに保存して何度でも実行できる値を返します。ここでは
EDIT は私がしようとしているものです。ストリームはElixirで関数の構成として実装されています。このようなもの。いつか、後で
stream_maps = Map.put(%{}, client_tag, File.Stream!(path))
、
stream_maps |> Map.get(client_tag) |> Stream.run
あなたは、元のFile.Streamからストリーム関数のいずれかの組成物にこれを拡張することができます!ここで
ストリームの最初のアイテム(または任意のアイテム)を任意の期間遅延させたいと思っていますか? – Dogbert
いいえ、私は必要なときにのみ、その参照を介して上記の図のようなものをトリガーすることができます。 – simo
key-> streamのマップを格納するGenServerの作成、キーを持つストリームを格納し、キーを受け取って新しいプロセスでそのストリームを実行し、それを状態から削除する ''呼び出し '? – Dogbert