2017-08-26 8 views
2

私はPHPのマルチスレッドにPThreadを使用しています。私はWindows上の私のXAMPPサーバーにそれをインストールして正常に実行しました。 私は、このコードにそれを実行するとthem.HereこのPHPのPthreadについて

require('mailscript.php'); 
class My extends Thread{ 

    function __construct() { 
     $this->mailscript = new mailscript(); 
    } 

    function run(){ 
     $this->mailscript->runMailScript(5000); 
    } 
} 

for($i=0;$i<20;$i++){ 
    $pool[] = new My(); 
} 

foreach($pool as $worker){ 
    $worker->start(); 
} 
foreach($pool as $worker){ 
    $worker->join(); 
} 

ための私のコードで私は、データベース内の100Kレコードを持っていると私は、データベースとプロセスから5kのレコードを呼び出しますparallel.Everyスレッドで20個のスレッドを実行したいですスレッドごとに約600件のレコードが実行されます.PThreadsのスレッド数には制限があります。何が問題なのですか?

答えて

1

hie、ここでは、nescessaryの場合に使用するコレクションメソッドでpthreadを処理する方法を示します。これがあなたを助けることを願っています。

/*pthreads batches */ 
$batches = array(); 

$nbpool = 20; // cpu 10 cores 

/* job 1 */ 
$list = [/* data1 */]; 
$batches[] = array_chunk($list, 5000); 

/* job 2 */ 
$list2 = [/* data2 */]; 
$batches[] = array_chunk($list2, 10000); 

/*final collected results*/ 
$resultFinal = []; 

/* loop across batches */ 
foreach ($batches as $key => $chunks) { 

    /* for intermediate collection */ 
    $data[$key] = []; 

    /* how many workers */ 
    $workCount = count($chunks); 

    /* set pool job up to max cpu capabilities */ 
    $pool = new Pool($nbpool, Worker::class); 

    /* pool cycling submit */ 
    foreach (range(1, $workCount) as $i) { 
     $chunck = $chunks[$i - 1]; 
     $pool->submit(new processWork($chunck)); 
    } 

    /* on collection cycling */ 
    $collector = function (\Collectable $work) use (&$data) { 

     /* is worker complete ? */ 
     $isGarbage = $work->isGarbage(); 

     /* worker complete */ 
     if ($isGarbage) { 
      $data[$key] = $work->result; 
     } 
     return $isGarbage; 
    }; 
    do { 
     /* collection on pool stack */ 
     $count = $pool->collect($collector); 
     $isComplete = count($data) === $workCount; 
    } while (!$isComplete); 

    /* push stack results */ 
    array_push($resultFinal, $data); 

    /* close pool */ 
    $pool->shutdown(); 
} 

class processWork extends \Threaded implements \Collectable { 

    private $isGarbage; 
    private $process; 
    public $result; 

    public function __construct($process) { 
     $this->process = $process; 
    } 

    public function run() { 
     $workerDone = array(); 
     foreach ($this->process as $k => $el) { 
      /* whatever stuff with $this->process */ 
     } 
     $this->result = $workerDone; 
     $this->isGarbage = true; // yeah, it s done 
    } 

    public function isGarbage(): bool { 
     return $this->isGarbage; 
    } 
}