2011-08-15 13 views
3

私は潜在的に高いレートでポイントを生成するデータソースを持っており、各ポイントで時間のかかる操作を実行したいと思います。私はまた、過剰なデータポイントを落とすことによってシステムが過負荷になったときに、システムが正常に機能しなくなることを望みます。erlang/OTPのレート制限付きイベントハンドラ

私が知る限り、gen_eventを使用するとイベントはスキップされません。概念的には、私がgen_eventをしたいのは、ハンドラを再度実行する前に、最新の保留中のイベント以外のすべてを破棄することです。

標準のOTPでこれを行う方法はありますか?そういうことをしてはならない理由があるのでしょうか?

これまでのところ、私は高価なイベントトリガするgen_serverを使用して、タイムアウトに依存して持っている最高:

-behaviour(gen_server). 
init() -> 
    {ok, Pid} = gen_event:start_link(), 
    {ok, {Pid, none}}. 

handle_call({add, H, A},_From,{Pid,Data}) -> 
    {reply, gen_event:add_handler(Pid,H,A), {Pid,Data}}. 

handle_cast(Data,{Pid,_OldData}) -> 
    {noreply, {Pid,Data,0}}. % set timeout to 0 

handle_info(timeout, {Pid,Data}) -> 
    gen_event:sync_notify(Pid,Data), 
    {noreply, {Pid,Data}}. 

は、このアプローチは正しいですか? (特に監督に関して)

答えて

0

標準OTPでこれを行う方法はありますか?

私はそのようなものを扱うべきではない理由を正当な理由があるのですか?

いいえ、タイムアウトが早すぎると、システム全体のパフォーマンスが向上する可能性があります。 hereについては、こちらをご覧ください。

このアプローチは正しいですか? (特に監督に関して)

あなたは監視コードを提供していません。あなたの最初の質問への追加情報のビットとして


あなたはOTPの外でサードパーティのライブラリを使用できる場合は、それはあなたが何であるかであるプリエンプティブタイムアウトを、追加することができますそこにいくつかあります記述。

私が最初に慣れているのはdispcountで、もう1つはchickです(私はひよこの著者です、私はここでプロジェクトを宣伝しないでしようとします)。

Dispcountは、同時に実行できるジョブの数が限られていて、キューに入れられていない単一のリソースには本当にうまく機能します。あなたはそれについて読むことができますhere警告多くの本当に興味深い情報!)。

Dispcountは私のアプリ内で異なるキューの量を処理するために4000以上のプロセスプールを生成しなければならないため、私のためには機能しませんでした。キューの長さを動的に増減させる方法と、要求をキューに入れ、他のプロセスを拒否する方法が必要でした。なぜなら、4000個以上のプロセスプールを生成する必要はありませんでした。

もし私があなただったら私は割引を最初に試してみましょう(ほとんどのソリューションはひよこを必要としないので)。そして少し動かす必要があるなら、特定の数のリクエストに応答できるプールがひよこを試します。

1

私は監督についてはコメントできませんが、期限切れのアイテムを持つキューとしてこれを実装します。

以下で使用できるものを実装しました。

私はそれをgen_serverにしました。あなたがそれを作成するときに、あなたはそれに古いアイテムの最大年齢を与えます。

インターフェイスは、処理するアイテムを送信し、デキューされていないアイテムをリクエストできることです。すべてのアイテムを受け取る時刻を記録します。処理されるアイテムを受け取るたびに、キュー内のすべてのアイテムをチェックし、最大エージングよりも古いアイテムをデキューして破棄します。

データソースが作業キューにデータ({process_this, Anything})をキャストすると、コンシューマプロセスが呼び出す(遅くなる可能性がある)プロセスが呼び出されます(最大遅延時間を常に考慮する必要がある場合)。 (gimme)を使用してデータを取得します。 REPLで

-module(work_queue). 
-behavior(gen_server). 

-export([init/1, handle_cast/2, handle_call/3]). 

init(DiscardAfter) -> 
    {ok, {DiscardAfter, queue:new()}}. 

handle_cast({process_this, Data}, {DiscardAfter, Queue0}) -> 
    Instant = now(), 
    Queue1 = queue:filter(fun({Stamp, _}) -> not too_old(Stamp, Instant, DiscardAfter) end, Queue0), 
    Queue2 = queue:in({Instant, Data}, Queue1), 
    {noreply, {DiscardAfter, Queue2}}. 

handle_call(gimme, From, State = {DiscardAfter, Queue0}) -> 
    case queue:is_empty(Queue0) of 
    true -> 
     {reply, no_data, State}; 
    false -> 
     {{value, {_Stamp, Data}}, Queue1} = queue:out(Queue0), 
     {reply, {data, Data}, {DiscardAfter, Queue1}} 
    end. 

delta({Mega1, Unit1, Micro1}, {Mega2, Unit2, Micro2}) -> 
    ((Mega2 - Mega1) * 1000000 + Unit2 - Unit1) * 1000000 + Micro2 - Micro1. 

too_old(Stamp, Instant, DiscardAfter) -> 
    delta(Stamp, Instant) > DiscardAfter. 

リトルデモ:

c(work_queue). 
{ok, PidSrv} = gen_server:start(work_queue, 10 * 1000000, []).   
gen_server:cast(PidSrv, {process_this, <<"going_to_go_stale">>}),  
timer:sleep(11 * 1000),             
gen_server:cast(PidSrv, {process_this, <<"going to push out previous">>}), 
{gen_server:call(PidSrv, gimme), gen_server:call(PidSrv, gimme)}.   
+2

"データを待っているワーカープロセスのキュー"は、Erlangのようには聞こえません。 Erlangの利点の1つは、新しいプロセスを生成するのが安価であり、完了した作業でそれらを終了させることです。あなたは過負荷状態を避けるために、gen_server内の稼働中のワーカーの数を数えることができます。あるいは、さらに簡単に:プロセスがまだ周りにいる場合、プロセスを自分自身で撃つことができます。しかし、複数のワーカー・プロセスは、SMPマシンでのみ役に立ちます。 –

+0

優れた点。私は最近脳に「データを待っているプロセス」を持っていましたが、ここでそれらを使う必要は全くありません。残りの答えはどう思いますか? – ellisbben

+0

私はすべてを完全に理解していないかもしれませんが、これは別の問題を解決すると思います。固定のDiscardAfter間隔にコミットしたくありません。たぶん別の言い方をすると、古いデータを処理する時間を無駄にせずに、自分の処理の結果が常に私が知っている最新のデータポイントを反映してほしいということです。しかし、必要な処理量は動的に変化する可能性があり、ソースのデータレートも変化する可能性があります。 – b0fh