2012-02-13 8 views
7

スレ:Perlのキューと、私は次のことを達成しようとしている

  1. は、非常に大きなファイルからのデータを約 10ギガバイトを言うと、キューにそれらを押して読み込みスレッドを持っています。 buildQueueスレッドが同時にキューにデータをプッシュしている間

  2. を(私はどちらかの非常に大規模な取得 にキューを希望していない) 約5ワーカースレッドデキューおよびプロセスデータを持っています。

私は試行しましたが、私のbuildQueueスレッドの連続ループのために他のスレッドに到達できません。

私のアプローチは完全に間違っている可能性があります。助けてくれてありがとう、それは非常に感謝しています。

ここbuildQueueためのコードです:

sub buildQueue { 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 
    open DICT_FILE, $dict_path or die("Sorry, could not open file!"); 
    while (1) { 
     if (<DICT_FILE>) { 
      if ($queue->pending() < 100) { 
       my $query = <DICT_FILE>; 
       chomp($query); 
       $queue->enqueue($query); 
       my $count = $queue->pending(); 
       print "Queue Size: $count Query: $query\n"; 
      } 
     } 
    } 
} 

そして、このスレッドが終了しませんので、実行されます後、このスレッドは、他に何も実行されません取得したときに、私は期待しましたように。

my $builder = new Thread(&buildQueue); 

ビルダースレッドは長時間実行されるため、ワーカースレッドを作成することはありません。

ここで全体のコードです:

#!/usr/bin/perl -w 
use strict; 
use Thread; 
use Thread::Queue; 


my $queue = new Thread::Queue(); 
my @threads; 

sub buildQueue { 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 
    open dict_file, $dict_path or die("Sorry, could not open file!"); 
    while (1) { 
     if (<dict_file>) { 
      if ($queue->pending() < 100) { 
       my $query = <dict_file>; 
       chomp($query); 
       $queue->enqueue($query); 
       my $count = $queue->pending(); 
       print "Queue Size: $count Query: $query\n"; 
      } 
     } 
    } 
} 

sub processor { 
    my $query; 
    while (1) { 
     if ($query = $queue->dequeue) { 
      print "$query\n"; 
     } 
    } 
} 

my $builder = new Thread(&buildQueue); 
push @threads, new Thread(&processor) for 1..5; 
+0

いくつかの質問:あなたのキュービルダスレッドは完了しないと言いますが、何もしませんか?キューのサイズが100より小さくなったり、0より大きくなったりしますか?また、[私はあなたがあなたのスレッドを正しく作っているかどうかわからない](http://perldoc.perl.org/perlthrtut.html)。それは 'my $ builder = threads-> create(\&buildQueue);'? –

+0

キュービルダはうまく構築されますが、ワーカースレッドは作成されていないため、キューから何も削除できないため、連続ループのためにビルドキューが実行されている間にキューが100で停止します。 – Sinista

+0

Hmmm、特にスレッドを作成する場所で、コンテキストを確立するためのコードをもっと見る必要があります。ワーカースレッドを作成する前に、キュービルダを 'join'または' detach'しているわけではありません。 –

答えて

10

は、あなたのスレッドが(joinor detachのいずれかを介して)終了したいときは、マークする必要があります。 lastステートメントが無限ループであるという事実から、それらを打破することも問題です。

編集:私も非常に重要な部分を忘れてしまった! Each worker thread will block, waiting for another item to process off of the queue until they get an undef in the queue。したがって、なぜキュービルダーが完了した後に、スレッドごとに具体的にundefをエンキューするのですか?

試してみてください。

#!/usr/bin/perl -w 
use strict; 
use threads; 
use Thread::Queue; 


my $queue = new Thread::Queue(); 
our @threads; #Do you really need our instead of my? 

sub buildQueue 
{ 
    print "Enter a file name: "; 
    my $dict_path = <STDIN>; 
    chomp($dict_path); 

    #Three-argument open, please! 
    open my $dict_file, "<",$dict_path or die("Sorry, could not open file!"); 
    while(my $query=<$dict_file>) 
    { 
     chomp($query); 
     while(1) 
     { #Wait to see if our queue has < 100 items... 
      if ($queue->pending() < 100) 
      { 
       $queue->enqueue($query); 
       print "Queue Size: " . $queue->pending . "\n"; 
       last; #This breaks out of the infinite loop 
      } 
     } 
    } 
    close($dict_file); 
    foreach(1..5) 
    { 
     $queue->enqueue(undef); 
    } 
} 

sub processor 
{ 
    my $query; 
    while ($query = $queue->dequeue) 
    { 
     print "Thread " . threads->tid . " got $query\n"; 
    } 
} 

my $builder=threads->create(\&buildQueue); 
push @threads,threads->create(\&process) for 1..5; 

#Waiting for our threads to finish. 
$builder->join; 
foreach(@threads) 
{ 
    $_->join; 
} 
+1

これは、廃止されたスレッドモジュールで問題が発生したようですが、代わりにスレッドモジュールに切り替えました。ありがとうございましたJack多くの人が正しい方向に私を指差してくれました。 – Sinista

1

この場合はParallel::ForkManagerモジュールで行うことができますように聞こえます。

+0

可能であれば、ForkManagerソリューションが好きでしょうか。 – Sinista

0

異なるアプローチ:あなたはまた、(それは大きなファイルですので、保存したファイルが求める読みながら、あなたはまた、並列読み出しから利益を得ることができる)MCE 1.2+user_tasksを使用して読み取るため2 マルチ労働者tasks、1つのタスクを作成することができます処理のための1つのタスクなどが含まれます。

以下のコードは、バッファキューを管理するためにまだThread::Queueを使用しています。

サブタイトルはキューサイズコントロールを持ち、スレッドを使用しているのでマネージャのプロセス '$ R_QUEUE'に直接データをプッシュするので、親のメモリスペースにアクセスできます。代わりにフォークを使用する場合は、コールバック機能を使用してキューにアクセスできます。しかし、ここで私は単純に待ち行列にプッシュすることを選択しました。

サブprocessQueueサブは、何も保留されなくなるまで待ち行列にあるものを単にデキューします。

各タスクのtask_end subは、各タスクの終了時にマネージャプロセスによって1回だけ実行されるため、これを使用してワーカープロセスの停止を通知します。

あなたはチャンクの大きさを決めることができたりして、あなたのデータを読まことさえどのように明らかに、自由の多くは、あなたが労働者にチャンクデータをする方法にあります。

#!/usr/bin/env perl 
use strict; 
use warnings; 
use threads; 
use threads::shared; 
use Thread::Queue; 
use MCE; 

my $R_QUEUE = Thread::Queue->new; 
my $queue_workers = 8; 
my $process_workers = 8; 
my $chunk_size = 1; 

print "Enter a file name: "; 
my $input_file = <STDIN>; 
chomp($input_file); 

sub buildQueue { 
    my ($self, $chunk_ref, $chunk_id) = @_; 
    if ($R_QUEUE->pending() < 100) { 
     $R_QUEUE->enqueue($chunk_ref); 
     $self->sendto('stdout', "Queue Size: " . $R_QUEUE->pending ."\n"); 
    } 
} 

sub processQueue { 
    my $self = shift; 
    my $wid = $self->wid; 
    while (my $buff = $R_QUEUE->dequeue) { 
     $self->sendto('stdout', "Thread " . $wid . " got $$buff"); 
    } 
} 

my $mce = MCE->new(
    input_data => $input_file, # this could be a filepath or a file handle or even a scalar to treat like a file, check the documentation for more details. 
    chunk_size => $chunk_size, 
    use_slurpio => 1, 

    user_tasks => [ 
     { # queueing task 
      max_workers => $queue_workers, 
      user_func => \&buildQueue, 
      use_threads => 1, # we'll use threads to have access to the parent's variables in shared memory. 
      task_end => sub { $R_QUEUE->enqueue((undef) x $process_workers) } # signal stop to our process workers when they hit the end of the queue. Thanks > Jack Maney! 
     }, 
     { # process task 
      max_workers => $process_workers, 
      user_func => \&processQueue, 
      use_threads => 1, # we'll use threads to have access to the parent's variables in shared memory 
      task_end => sub { print "Finished processing!\n"; } 
     } 
    ] 
); 

$mce->run(); 

exit; 
3

Perl用のMCEモジュールは大きなファイルを大好きです。 MCEを使うと、一度に多くの行をチャンクしたり、大きなチャンクをスカラー文字列としてスラップしたり、一度に1行ずつ読むことができます。一度に多くの回線をチャンクすると、IPCのオーバーヘッドが減少します。

MCE 1.504がリリースされました。 MCE :: Queueには、スレッドを含む子プロセスのサポートが用意されています。さらに、1.5リリースには、MCEインスタンスのインスタンス化にも注意を払う5つのモデル(MCE :: Flow、MCE :: Grep、MCE :: Loop、MCE :: Map、およびMCE :: Stream)が付属しています。 max_workersとchunk_sizeのチューニング。これらのオプションを無効にすることができます。

以下、デモンストレーションにMCE :: Loopを使用しています。

use MCE::Loop; 

print "Enter a file name: "; 
my $dict_path = <STDIN>; 
chomp($dict_path); 

mce_loop_f { 
    my ($mce, $chunk_ref, $chunk_id) = @_; 

    foreach my $line (@$chunk_ref) { 
     chomp $line; 
     ## add your code here to process $line 
    } 

} $dict_path; 

従業員数やチャンクサイズを指定する場合は、2つの方法があります。

use MCE::Loop max_workers => 5, chunk_size => 300000; 

それとも...

use MCE::Loop; 

MCE::Loop::init { 
    max_workers => 5, 
    chunk_size => 300000 
}; 

チャンクが大きなファイルのために好ましいが、1は、一度に1つの行をチャンクとの時間を比較することができます。ブロック内の最初の行(コメントアウト)を省略することができます。 inner forループが必要ないことに注目してください。 $ chunk_refは依然として1行を含む配列参照です。入力スカラー$ _には、chunk_sizeが1のときの行が含まれます。そうでなければ、$ chunk_refを指します。

use MCE::Loop; 

MCE::Loop::init { 
    max_workers => 5, 
    chunk_size => 1 
}; 

print "Enter a file name: "; 
my $dict_path = <STDIN>; 
chomp($dict_path); 

mce_loop_f { 
# my ($mce, $chunk_ref, $chunk_id) = @_; 

    my $line = $_; 
    ## add your code here to process $line or $_ 

} $dict_path; 

このデモンストレーションは、ファイルを並行して処理したいと思っている人に役立つことを願っています。

:) mario

関連する問題