MQからエリクシルのコンシューマにイベントのストリームが届きました。 3分間そのIDには、新たなデータがない場合、下流のIDのために集約されたデータを送信エリクシールを使用したデバウンスイベント
そのIDによって- 集約イベントと
- :消費者で は私がする必要があります。
私のケースではデータセットは大きくありません。数百のIDと数千の更新が1日に発生することがあります。
GenServerの魔法を使ってこの問題を解決する方法はありますか?
ありがとうございました!
MQからエリクシルのコンシューマにイベントのストリームが届きました。 3分間そのIDには、新たなデータがない場合、下流のIDのために集約されたデータを送信エリクシールを使用したデバウンスイベント
そのIDによって私のケースではデータセットは大きくありません。数百のIDと数千の更新が1日に発生することがあります。
GenServerの魔法を使ってこの問題を解決する方法はありますか?
ありがとうございました!
私はこのようにそれを行うだろう:新しいイベントが出るたび
:
それはそのIDを持つ最初のイベントだ場合、3分のタイムアウトでProcess.send_after/3
を使用して、タイマーREFを作成します、イベントとタイマーをその状態に格納する。
idが最初のイベントでない場合は、Process.cancel_timer/1
で保存されたタイマーrefをキャンセルし、前の手順で説明したように新しいタイマーを作成し、新しいイベントを古いイベントと連結して保存します。
、タイマーによってトリガーhandle_info
で、下流そのIDのイベントをプッシュした状態からそのエントリを削除します。
defmodule DebouncedEcho do
@timeout 1000
use GenServer
def start_link do
GenServer.start_link __MODULE__, []
end
def init(_) do
{:ok, %{}}
end
def handle_cast({:store, id, event}, state) do
case state[id] do
nil ->
timer = Process.send_after(self, {:timer, id}, @timeout)
state = Map.put(state, id, %{events: [event], timer: timer})
{:noreply, state}
%{events: events, timer: timer} ->
Process.cancel_timer(timer)
timer = Process.send_after(self, {:timer, id}, @timeout)
state = Map.put(state, id, %{events: [event | events], timer: timer})
{:noreply, state}
end
end
def handle_info({:timer, id}, state) do
%{events: events} = state[id]
IO.inspect {:flush, id, events}
state = Map.delete(state, id)
{:noreply, state}
end
end
テスト:
{:ok, server} = DebouncedEcho.start_link
GenServer.cast server, {:store, 1, :foo}
GenServer.cast server, {:store, 1, :bar}
GenServer.cast server, {:store, 2, :foo}
:timer.sleep(500)
GenServer.cast server, {:store, 2, :bar}
:timer.sleep(500)
GenServer.cast server, {:store, 2, :baz}
:timer.sleep(500)
GenServer.cast server, {:store, 1, :baz}
:timer.sleep(2000)
出力:
{:flush, 1, [:bar, :foo]}
{:flush, 2, [:baz, :bar, :foo]}
{:flush, 1, [:baz]}
優秀解答
ここでは上記の単純な実装です!私はまた、状態でタイムアウトを維持するので、init関数に渡すことができます。これにより、単体テストのタイムアウトをゼロに設定できるため、テストが容易になる可能性があります。 – tkowal
非常に洗練されたソリューション。ありがとう! –