3

私はスレッドからエラーをキャッチして、すべてのスレッドが完了したらそのエラーを再スローするにはどうすればよいですか?

gem 'concurrent-ruby' 

...私はスレッドを管理するためのこの宝石を持っているRailsの5を使用しています私は私のいずれかのスレッドがエラーをスローした場合、それはちょうど飲み込んだと私はそれを知ることはありませんされていることに気づきます。私は、プールからのスレッドのいずれかがエラーをスローした場合、すべてのスレッドが完了したとき、私は私のプログラムを停止し、例外を投げることができるか、疑問に思ったコンソール

pool = Concurrent::FixedThreadPool.new(1) 
    # => #<Concurrent::FixedThreadPool:0x007fe3585ab368 @__lock__=#<Thread::Mutex:0x007fe3585ab0c0>, @__condition__=#<Thread::ConditionVariable:0x007fe3585ab098>, @min_length=1, @max_length=1, @idletime=60, @max_queue=0, @fallback_policy=:abort, @auto_terminate=true, @pool=[], @ready=[], @queue=[], @scheduled_task_count=0, @completed_task_count=0, @largest_length=0, @ruby_pid=23932, @gc_interval=30, @next_gc_time=252232.13299, @StopEvent=#<Concurrent::Event:0x007fe3585aaf30 @__lock__=#<Thread::Mutex:0x007fe3585aaeb8>, @__condition__=#<Thread::ConditionVariable:0x007fe3585aae90>, @set=false, @iteration=0>, @StoppedEvent=#<Concurrent::Event:0x007fe3585aadc8 @__lock__=#<Thread::Mutex:0x007fe3585aad78>, @__condition__=#<Thread::ConditionVariable:0x007fe3585aad50>, @set=false, @iteration=0>> 
nums.each do |num| 
    pool.post do 
    if num == 1 
     asdfasdf 
    end 
    end 
end 
    # => [1, 2, 3] 
pool.shutdown    # => true 
pool.wait_for_termination # => true 

でこれを試してみました。いずれのスレッドもエラーをスローしない場合は、何が起きていても問題はありません。

上記のように、私は意図的にエラーが発生する条件が発生することに気づくでしょうが、スレッドプールが例外の出力を飲み込んでいると推測するので、私は決してそれについては知りません。

+0

setting'Thread場合、私は好奇心が強いだろう。abort_on_exception = true'で違いがあります –

+0

上記のコードを実行する前にその文を実行しましたが、何も起こっていません。 –

答えて

1

あなたの質問にお答えするには、ライブラリexplicitly silences exceptionsのように実際の方法はなく、設定はありません。

可能な回避策は、手動で例外をキャプチャするために、次のようになります。

error = nil 
pool = Concurrent::FixedThreadPool.new(1) 
numbers.each do |number| 
    pool.post do 
    begin 
     some_dangerous_action(number) 
    rescue Exception => e 
     error = e 
     raise # still let the gem do its thing 
    end 
    end 
end 

pool.shutdown 
pool.wait_for_termination 

raise error if error 
2

組み込みの例外処理が必要な場合は、スレッドプールを直接使用するのではなく、より高いレベルの抽象化を使用する必要があります。これはconcurrent-rubycommentを参照してください。

ほとんどのアプリケーションでスレッドプールを直接使用しないでください。スレッドプールは、内部使用のための低レベルの抽象化です。このライブラリ(Promise、など)の高レベルの抽象化は、すべてジョブをグローバルスレッドプールにポストし、すべて例外処理を提供します。ユースケースに最も適した抽象を選択して使用するだけです。

グローバルスレッドプールを使用するのではなく、独自のスレッドプールを構成する必要がある場合は、依然として高レベルの抽象化を使用できます。これらはすべて:executorオプションをサポートしています。このオプションを使用すると、カスタムスレッドプールを挿入できます。その後、高水準抽象化によって提供される例外処理を使用できます。

Promiseアブストラクションを使用して、あなたの例のバリエーションを示します。

require 'concurrent' 
pool = Concurrent::FixedThreadPool.new(1) 
promises = (1..10).map do |num| 
    Concurrent::Promise.execute(executor: pool) do 
    if num == 1 
     asdfasdf 
    else 
     num 
    end 
    end 
end 
promises.map(&:value!) 

# NameError: undefined local variable or method `asdfasdf' for main:Object 
#  from (irb):57:in `block (2 levels) in irb_binding' 
#  [...] 

するには(ない、すぐに最初の例外時に)のみ後のすべてのスレッドが完了した例外を再び上げるあなたはpromises.map(&:value!)を置き換えることができ、:スレッドプールが1を発生させ、これは、すぐに例外を再発生させますConcurrent::Promise.zip(*promises).value!である。それを再上昇させることなく収集結果に例外を格納する

、あなたがpromises.map { |p| p.value || p.reason }ような何かを行うことができます。

# => [#<NameError: undefined local variable or method `asdfasdf' for main:Object>, 2, 3, 4, 5, 6, 7, 8, 9, 10] 

最後に、唯一の1つのスレッドで固定されたスレッド・プールが順次上のすべてのタスクを実行することに注意してください単一のスレッド。それらをすべて並列に実行するには(スレッド10個のプール)、スレッドプール初期化子をpool = Concurrent::FixedThreadPool.new(10)に変更します。

+0

私はConcurrent :: Promiseクラスを読み上げる必要がありますが、これは10個の繰り返しをすべて並列に実行していますか? –

+0

@Nataliaスレッドが1つしかない固定スレッドプールでは、何も並行して実行されません。しかし、初期化子を 'Concurrent :: FixedThreadPool.new(10)'に変更すると、それらは並行して実行されます。 'Concurrent :: Promise.execute'ブロック(' num == 1 'の真上)の一番上に 'sleep 1'を追加して自分自身で確認/テストしてください。 – wjordan

+0

ありがとうございます。 「promises.map(&:wait).map(&:value!)」行は、プール内のすべてのスレッドが実行されるまで実行の実行をブロックしますか?私はこれを試してみるので、 "(ActiveRecord :: ConnectionTimeoutError)が5.000秒以内に接続を取得できなかった"というエラーが出始めているので、トラブルシューティングの方法を理解しようとしているからです。 –

関連する問題