2016-05-24 21 views
2

実際、2スレッドが開始されたとしましょう。そして5つの要素の配列は、すべての計算が1つの取引で行われます。 2が現れ、仕事の欠如のために死ぬ。または、9999要素を指定すると、彼は最後の30-50項目で目を覚まします。彼は本当にマルチスレッドではないようです。スレッドの最大値

#! usr/bin/perl -w 
use strict; 
use warnings; 
use 5.010; 
use threads; 
use threads::shared; 
use Data::Dumper; 

my $n=0; 

my $max:shared=0; 
my @threads; 
my $shr:shared=0; 
say "Enter the number of threads"; 
my $threads=<stdin>; 
my @pack; 

#Заполняем массив 
say "Enter the number of array elements"; 
my $c=<stdin>; 
for (my $i=0;$i<$c;$i++){ 
    my $r= int rand 999; 
    @pack=(@pack,$r); 
         } 
         say @pack; 

# Создаём нужное количество потоков 
for my $t (1..$threads) { 
    push @threads,threads->create(\&find,$t); 
         } 
# Дожидаемся окончания работы всех потоков 
for my $t (@threads){ $t->join();} 

sub find { 
    # Номер текущего потока 
    my $num= shift; 
    say "+Thread $num started"; 

    while($n<=$#pack){ 
     # Распределяем элементы по потокам 
     lock($shr);#lock ($max); 
     my $stg=1+$shr++; 

     # Если элементы закончились, прерываем работу 
     if ($stg>=$#pack+2) { 
      say "-Thread $num done. \n"; 
      return;} 


     if($max<$pack[$n]){$max=$pack[$n];} 
     say "Thread: $num \tStage: $stg \tMax: $max"; 
     $n++; 
     sleep 1+int rand (3);  
      } 
     } 
+1

http://ru.stackoverflow.comでアカウントを開くことをお勧めします。このサイトは英語の質問のみのためです。 – Adriaan

+3

スレッドの作業コード全体が 'lock($ shr)'が得られたブロックにあり、複数のスレッドが何らかの作業をするのを防ぎます。 – ikegami

+0

ohh、神..ありがとう!) – QWD666

答えて

5

スレッドの全体の作業コードがlock($shr)がすべての作業を行ってから、複数のスレッドを防止し、取得されたブロックです。


大きな配列があり、ワーカースレッドがその大きな配列の一部を処理するように思えます。

スレッドを同期させる必要があるのは、作業を取り込むときと、結果を更新するときだけです。

アクセスしているときに配列をロックする必要がありますが、作業中はロックを保持できないため、ワークユニットにスレッドローカル変数にコピーする必要があります。

以下は、配列のすべての要素の合計を求めます。

#!/usr/bin/perl 
use strict; 
use warnings; 
use feature qw(say); 

use threads; 
use threads::shared; 

use List::Util qw(sum); 

# This is where the real work is done. 
# Notice how it there's no thread-related code in it 
# and how it doesn't use any shared variables. 
sub work { 
    sleep(1+rand(3)); 
    return sum(@_); 
} 

{ 
    @ARGV == 3 
     or die("usage\n"); 

    my ($data_size, $num_threads, $chunk_size) = @ARGV; 

    my $next :shared = 0; 
    my @data :shared; 
    my $sum :shared; 

    for (1..$data_size) { 
     push @data, int(rand(9999)); 
    } 

    say "Without threads: ", sum @data; 

    for my $t (1..$num_threads) { 
     async { 
      say sprintf("[%s]: Worker started.", threads->tid); 
      LOOP: while (1) { 
       # Grab work. 
       my @local_data; 
       { 
        lock $next; 
        last LOOP if $next >= @data; 

        my $new_next = $next + $chunk_size; 
        $new_next = @data if $new_next > @data; 

        say sprintf("[%s] Processing %s..%s", threads->tid, $next, $new_next-1); 
        @local_data = @data[$next .. $new_next-1]; 
        $next = $new_next; 
       } 

       my $local_sum = work(@local_data);  

       # Update results. 
       { lock $sum; $sum += $local_sum; } 
      } 
      say sprintf("[%s] done.", threads->tid) 
     }; 
    } 

    $_->join for threads->list; 

    say "With threads: $sum"; 
} 

出力:

$ ./a 125 3 20 
Without threads: 577556 
[1]: Worker started. 
[1] Processing 0..19 
[2]: Worker started. 
[2] Processing 20..39 
[3]: Worker started. 
[3] Processing 40..59 
[2] Processing 60..79 
[1] Processing 80..99 
[3] Processing 100..119 
[1] Processing 120..124 
[2] done. 
[1] done. 
[3] done. 
With threads: 577556 
0
ます。また、スレッドに ジョブまたは 作業項目を提供するThread::Queueを使用することができます

:そのようにそれらのコピーへの参照をチャンクに@dataを分割し、押しキュー。これらのスレッドは、dequeueこれらの参照を参照解除し、それらを合計し、(部分)結果をグローバル$sumに追加します。 ワークアイテムをキューにプッシュした後は、スレッドが多数存在するため、各スレッドがループの "undef"を取得し、処理を停止するように多くの "undefs"をスレッドに挿入する必要があります。

Thread::Queueの新しいバージョンは、方法end()を持っています。あなたのケースでは、メインスレッドから呼び出すことができます(n undefsをエンキューするのではなく)結果はほぼ同じです:スレッド内のdequeueコールは残りのアイテムを返し、次に各スレッドに対してundefを返します。私はより明示的に(そしてThreads::Queueの実際のバージョンに依存しないので)undefをエンキューすることを好みます。

コードは次のようになります。

#!/usr/bin/env perl 

use strict; 
use warnings; 

use threads; 
use threads::shared; 
use Thread::Queue; 
use List::Util; 

my $sum : shared = 0; 
my $jobs = Thread::Queue->new(); 

sub add_in_thread 
{ 
    printf("[%s] started\n", threads->tid()); 
    while (my $workitem = $jobs->dequeue()) { 
     printf("[%s] Processing %d items\n", threads->tid(), scalar @{$workitem}); 
     my $local_sum = List::Util::sum(@{$workitem}); 
     { lock $sum; $sum += $local_sum; } 
     sleep(1 + rand(3)); 
    } 
    printf("[%s] done\n", threads->tid()); 
} 

sub main 
{ 
    my ($data_size, $num_threads, $chunk_size) = @ARGV; 

    my @data = map { int(rand(9999)) } (1 .. $data_size); 

    print "Without threads: ", List::Util::sum(@data), "\n"; 

    # split @data into chunks and put references 
    # of copies of them into the $jobs queue 
    while (my @chunk = splice(@data, 0, $chunk_size)) { 
     $jobs->enqueue([@chunk]); 
    } 

    # add 'undef' for each thread so it knows when to finish: 
    $jobs->enqueue((undef) x $num_threads); 

    # start threads: 
    threads->create('add_in_thread') foreach (1 .. $num_threads); 

    # wait for them to finish:      } 
    $_->join() for threads->list(); 

    print "With threads: $sum\n"; 

    return 0; 
} 

exit(main()); 

出力


Without threads: 623399 
[1] started 
[1] Processing 20 items 
[2] started 
[2] Processing 20 items 
[3] started 
[3] Processing 20 items 
[1] Processing 20 items 
[2] Processing 20 items 
[3] Processing 20 items 
[1] Processing 5 items 
[2] done 
[3] done 
[1] done 
With threads: 623399 

./so17.pl 125 3 20のためにあなたはまた、事前にスレッドを開始することができます。それらは、何かがそのキューに入るまでwhileループ(実際はdequeue()コール)でブロックされます。それがundef(または他の偽物値)の場合、処理を終了して終了します。そうでなければ、そのキュー内のアイテムを処理します。 Thread::Queueはすべてのロックと同期を行います。 2番目のキューを設定し、その結果をスレッドにプッシュさせ、メインスレッドがその2番目の結果キューから結果をフェッチするようにすることもできます。

+1

' - > end'はもっと実際にはもっと明示的です。それはその名の通りにそこにあると言います。エラーの発生が少なく、より多くの状況で役立ちます。 ///また、スレッドを作成してキューを間違った順序で埋めています。これらのスレッドをできるだけ早く稼働させ、できるだけ早く働き始めることができます。 ///最後に、あなたがコピーしたポストの全ポイントは、労働者が何かをやるべきだということでした。*それはトラブルのOPを得ているからです。スレッドループが作業ルーチンから分離されている方が良いです。 – ikegami

+0

彼は実際のコードで配列を更新したいと思っていたので私は配列を保持しましたが、私は実際にThread :: Queueをここで使用します(前のコメントで述べた修正で)。 – ikegami

+0

@ikegami ' - > end()'はT :: Q 3.01(別名Perl 5.18)で導入されました。私はT :: Qをかなり長く使い、undefsをエンキューするのに使われました。また、私は ' - > end()'を間違って解釈し、間違って_one_ 'dequeue()'コールが_all_残りの項目を一度に返すと誤って考えました。ドキュメントを読み直した後、私は ' - > end()'が優先されるべきだということに完全に同意します(利用可能な場合)。 ///作業者をスレッドコードから分離すると、 'List :: Util :: sum(...);'行を 'work()'のような別の関数に移動させることになります。付与された。結局のところ、私の投稿の意図は、T :: Qの実際の動作を示すことでした。それはとてもエレガントです! – PerlDuck