2016-07-07 2 views
0

私は、2つの外部プログラムを実行するperlスクリプトを用意しています。現在、各データセットに対して一度に1つずつ実行し、最初のプログラムを実行し、qxで結果を収集し、その結果を使用して2番目のプログラムを実行します。データは、各データセットごとに1つのファイルである第2プログラムの結果を含む出力ファイルに追加されます。私はうまくいけば、私の現在のアプローチキャプチャし、簡単な再現性の例を作成しました:Perl - パラレルプログラミング - 2つの外部プログラムの実行

#!/usr/bin/perl 
# 
# stackoverflow_q_7-7-2016.pl 

use warnings; 
use strict; 

my @queries_list = (2, 4, 3, 1); 

foreach my $query (@queries_list) { 
    #Command meant to simulate the first, shorter process, and return a list of results for the next process 
    my $cmd_1 = "sleep " . $query . "s; shuf -i 4-8 -n 3"; 
    print "Running program_1 on query $query...\n"; 
    my @results = qx($cmd_1); 

    foreach (@results) { 
     chomp $_; 
     #Command meant to simulate a longer process whose input depends on program_1; the output I write to a separate file for each query 
     my $cmd_2 = "sleep " . $_ . "s; fortune -s | head -c " . $_ * 5 . " >> $query.output"; 
     print "\tRunning program_2 on query $query with input param $_...\n"; 
     system($cmd_2);   } 
} 

最初のプログラムは、一般的に第二のものより速く完了しているために、私はそれが新しい実行し続けることにより、この全体の取引をスピードアップするために、おそらく可能です思いましたprogram_2が前のクエリでも実行されているのと同時に、program_1を介してクエリを実行します。現在、これを完了するまでに数時間の処理が必要となるため、これをスピードアップするのはすばらしいことです。しかし、私はこれについてどうやって行くのか分かりません。 Parallel :: ForkManagerには解決策がありますか?またはPerlでスレッドを使用していますか?

私の実際のコードでは、私はいくつかのエラー処理を行い、program_2のタイムアウトを設定しました - 私はfork、exec、$ SIG {ALRM}を使ってこれを行っていますが、それら。これを行う能力がまだ残っていることが重要です。そうしないと、program_2が失敗した理由が不明確になるか、不十分に報告される可能性があります。エラー処理でコードがどのように見えるかは次のとおりです。私はそれが再現可能な例では必要な方法で動作するとは思わないが、少なくとも私がやろうとしていることをうまく見るだろう。ここにエラー処理があります:

#!/usr/bin/perl 
# 
# stackoverflow_q_7-7-2016.pl 

use warnings; 
use strict; 

my @queries_list = (2, 4, 3, 1); 

foreach my $query (@queries_list) { 
    #Command meant to simulate the first, shorter process, and return a list of results for the next process 
    my $cmd_1 = "sleep " . $query . "s; shuf -i 4-15 -n 3"; 
    print "Running program_1 on query $query...\n"; 
    my @results = qx($cmd_1); 

    foreach (@results) { 
     chomp $_; 
     #Command meant to simulate a longer process whose input depends on program_1; the output I write to a separate file for each query 
     my $cmd_2 = "sleep " . $_ . "s; fortune -s | head -c " . $_ * 3 . " >> $query.output"; 
     print "\tRunning program_2 on query $query with input param $_...\n"; 

     my $childPid; 
     eval { 
      local $SIG{ALRM} = sub { die "Timed out" }; 
      alarm 10; 
      if ($childPid = fork()) { 
       wait(); 
      } else { 
       exec($cmd_2); 
      } 
      alarm 0; 
     }; 
     if ($? != 0) { 
      my $exitCode = $? >> 8; 
      print "Program_2 exited with error code $exitCode. Retry...\n"; 
     } 
     if ([email protected] =~ /Timed out/) { 
      print "\tProgram_2 timed out. Skipping...\n"; 
      kill 2, $childPid; 
      wait; 
     }; 
    } 
} 

すべてのご協力をいただきありがとうございます。

答えて

3

一つの解決策:

use threads; 

use Thread::Queue; # 3.01+ 

sub job1 { ... } 
sub job2 { ... } 

{ 
    my $job1_request_queue = Thread::Queue->new(); 
    my $job2_request_queue = Thread::Queue->new(); 

    my $job1_thread = async { 
     while (my $job = $job1_request_queue->dequeue()) { 
     my $result = job1($job); 
     $job2_request_queue->enqueue($result); 
     } 

     $job2_request_queue->end(); 
    }; 

    my $job2_thread = async { 
     while (my $job = $job2_request_queue->dequeue()) { 
     job2($job); 
     } 
    }; 

    $job1_request_queue->enqueue($_) for ...; 

    $job1_request_queue->end();  
    $_->join() for $job1_thread, $job2_thread; 
} 

あなたも、どちらか/両方のタイプの複数のワーカーを持つことができます。

use threads; 

use Thread::Queue; # 3.01+ 

use constant NUM_JOB1_WORKERS => 1; 
use constant NUM_JOB2_WORKERS => 3; 

sub job1 { ... } 
sub job2 { ... } 

{ 
    my $job1_request_queue = Thread::Queue->new(); 
    my $job2_request_queue = Thread::Queue->new(); 

    my @job1_threads; 
    for (1..NUM_JOB1_WORKERS) { 
     push @job1_threads, async { 
     while (my $job = $job1_request_queue->dequeue()) { 
      my $result = job1($job); 
      $job2_request_queue->enqueue($result); 
     } 
     }; 
    } 

    my @job2_threads; 
    for (1..NUM_JOB2_WORKERS) { 
     push @job2_threads, async { 
     while (my $job = $job2_request_queue->dequeue()) { 
      job2($job); 
     } 
     }; 
    } 

    $job1_request_queue->enqueue($_) for ...; 

    $job1_request_queue->end();  
    $_->join() for @job1_threads; 
    $job2_request_queue->end(); 
    $_->join() for @job2_threads; 
} 

代わりqxの使用IPC::Runタイムアウトを追加します。信号の必要はありません。

+0

こんにちは、援助ありがとうございます。スレッドの終了と結合について説明できますか?複数のワーカー・アプローチを使用しようとすると、「Perlはアクティブ・スレッドで終了しました」というエラーが発生します。ほとんどは実行中で、結合されていません。私は下の答えとして私の最新のコードを掲載することができます。 – Tsaari

+0

これは、従業員にこれまで以上に仕事がないことを伝え、完了するのを待ちます。それ以外の場合、プログラムは早期に終了します。 – ikegami

+0

私のコードの不具合を修正しました。 ( '@ job1_threads'と' @ job2_threads'は入力されていませんでした) – ikegami

関連する問題