2016-08-08 4 views
0

2つの外部バイオインフォマティクスプログラムに対して非同期呼び出しを行う以下のperlコードがあります。最初に、blastJobを実行し、その結果を取得してexonerateJobを実行します。私はこのコードをprevious questionから変更して、自分のコードをマルチスレッドの手法に移行させました。Perlマルチスレッド - ワーカースレッドが動作しなくなる

問題は、数時間の実行後にのみ発生するため、イライラしています。私は一晩中プログラムを実行し、午前中にexonerateJobsはもはや実行されていないが、新しいblastJobsはまだクランキングしています。報告されたエラーや何もありません。もう1つの情報は、ログにexonerateJobsの動作が停止していることが示されている入力クエリを取り消してテストしたことです。たとえ以前に問題を引き起こしていたようなクエリであっても、プログラムを少しずつ実行するだけでプログラムは正常に完了します。私はマルチスレッドのルールにあまり慣れていないので、自分のアプローチに問題があるのか​​、それとも呼び出される外部プログラムに問題があるのか​​を知りたいと思います。ここでは、コードのビットがあります:あなたのソース情報なし

#Asynchronous calls to blast and exonerate 
{ 
    my $blast_request_queue = Thread::Queue->new(); 
    my $exonerate_request_queue = Thread::Queue->new(); 

    my @blast_threads; 
    for (1..NUM_BLAST_WORKERS) { 
     push @blast_threads, async { 
     while (my $q = $blast_request_queue->dequeue()) { 
      my @results = blastJob($q, $blastopts_ref); 
      foreach (@results) { 
       my @args = ($q, $_); 
       $exonerate_request_queue->enqueue(\@args); 
      } 
     } 

     $exonerate_request_queue->end(); # I've tried with and without this line, the result seems to be the same 
     }; 
    } 

    my @exonerate_threads; 
    for (1..NUM_EXONERATE_WORKERS) { 
     push @exonerate_threads, async { 
     while (my $args_ref = $exonerate_request_queue->dequeue()) { 
      my ($queryFile, $targetName) = @$args_ref; #De-reference args 
      my $regex = qr/\Q$targetName\E/; 
      #Check for target file 
      my ($file_match) = grep { $_ =~ $regex } keys %targets; 
      if ($file_match) { 
       my $targetFile = $options{'t'} . $file_match; 
       my $result = exonerateJob($queryFile, $targetFile, $exonopts_ref); 
       #Print result to file after job is finished 
       my ($Qname, $Qpath, $Qsuffix) = fileparse($queryFile); 
       my $outFN = $Qname . ".exonerate_out"; 
       open(OUTFH, ">>$outFN") or print STDERR "Can't open $outFN: $!"; 
       print OUTFH $result; 
      } else { 
       print STDERR "Target file not found: $targetName. Can't run exonerate"; 
      } 
     } 
     }; 
    } 

    foreach (@queries) { 
     #Concatenate query path with name 
     my $queryFile = $options{'q'} . $_; 
     $blast_request_queue->enqueue($queryFile); 
    } 
    #my $queryFile = $options{'q'} . $queries[3]; 
    #$blast_request_queue->enqueue($queryFile); 

    $blast_request_queue->end();  
    $_->join() for @blast_threads; 
    $exonerate_request_queue->end(); 
    $_->join() for @exonerate_threads; 
} 

#I'm using IPC::Run to launch the programs. 
#There is some error handling which I believe should catch any probs 
sub blastJob { 
    my ($query, $blastopts_ref) = @_; 
    #De-reference blast options 
    my @blastCmd = @$blastopts_ref; 
    my ($blastOut, $err); #for blast output 
    #Add query information after first blast option 
    splice(@blastCmd, 1, 0, ("-query", $query)); 
    my ($Qname, $Qpath, $Qsuffix) = fileparse($query); 
    print "Running $blastCmd[0]: query $Qname...\n"; 
    run \@blastCmd, \undef, \$blastOut, \$err; 
    if ($err) { 
     print "Error in BLAST query $Qname. $err\n"; 
    } 
    my @results = split("\n", $blastOut); 
    return uniq(@results); 
} 

sub exonerateJob { 
    my ($query, $target, $exonopts_ref) = @_; 
    #De-reference exonerate options 
    my @exonCmd = @$exonopts_ref; 
    my ($exonOut, $err); #for exonerate output 
    #Add program, query, and target information to exonerate options 
    unshift (@exonCmd, ("exonerate", "-q", $query, "-t", $target)); 
    my ($Qname, $Qpath, $Qsuffix) = fileparse($query); 
    my ($Tname, $Tpath, $Tsuffix) = fileparse($target); 
    eval { 
     print "Running exonerate: query $Qname, target $Tname...\n"; 
     run \@exonCmd, \undef, \$exonOut, \$err, timeout(240); 
     if ($err) { 
      print STDERR "Error in exonerate query $Qname, target $Tname. $err\n"; 
     } 
    }; 
    if ([email protected] =~ /timeout/) { 
     print STDERR "Error: timeout in exonerate query $Qname, target $Tname\n"; 
    } 

    return $exonOut; 
} 
+1

ヒント: 'warn(" x \ n ")'は 'print(STDERR" x \ n ")'よりも短く、設定されていると '$ SIG {__ WARN __}'も呼び出されます。これはより良いアプローチです。 – ikegami

+0

それは意味があります。ヒントをありがとう! – Tsaari

+1

デキューループの後にメッセージを表示して、デキューがfalseを返すか、その他の理由でスレッドが終了したかどうかを確認できます。 – ikegami

答えて

1

、私はそれをテストすることはできません - しかし、私のお金は初めてであなたのようになります。

$exonerate_request_queue->end(); 

その非同期ブロックで。

私はそれはあなたが

$blast_request_queue->end(); 

を閉じるとすぐにすると、スレッドは、すぐ後に出て「出力」キューを閉じて、そうすることになると、そこにかなり可能だと思うので - あなたを意味します待ち行列が閉じられているため、保留中のものはすべて失われます。

+1

'$ exonerate_request_queue-> end();'は実際にはあまりにも早く呼び出されていますが、 '$ blast_request_queue-> end();が呼ばれるとすぐに必ずしも呼び出されるわけではありません。彼がこの問題に苦しまなかったOPベースのコード。それは、私はこれがOPが求めている問題であるとは確信していません。 – ikegami

+0

私は、その行も削除しようとしていることに注意してください。これを反映するためにQを編集します。結果は同じで、イライラしていた。 – Tsaari

1

ワーカーコードの周囲にeval BLOCKを追加するだけでよい場合は、dieを避けるために多くの問題が発生しました。

変更

my $result = job1($job); 
$job2_request_queue->enqueue($result); 

my $result = eval { job1($job) }; 
if ([email protected]) { 
    warn("Job failed: [email protected]"); 
} else { 
    $job2_request_queue->enqueue($result); 
} 

にこれは、はるかに信頼性があります。たとえば、runは、あなたの子供を殺す例外をスローすることができます。


Sobriqueに述べたようにまた、$exonerate_request_queue->end();の最上位インスタンスが追加されてはなりません。これにより、追加の作業がキューに追加されることが防止されます(キューに現在あるすべての作業が実行されると、除外するワーカーに終了を通知する)。これは、すべての爆破作業員が退出した後にのみ行われるべきですが、この追加によって、最初の爆破作業員が出るとすぐにこれが行われます。

+0

池上さんありがとうございました。私はこの方法を試し、その行を削除します。わかりやすくするために、サブルーチン内のエラーを処理する方法を変更し、STDERRに出力するのではなく、それらを終了させる方法を教えてください。 – Tsaari

+2

あなたは 'open'エラーで印刷する方法を意味しますが、とにかくハンドルを使用しようとしていますか?うん、それは変更する必要があります。 – ikegami

+0

うわー、ええ、それはかなり大変です。私は間違いなく、ほとんどが無知から「死ぬ」ことを避けました。私はそれが今一緒に来る方法を見ると思う。 – Tsaari

関連する問題