2015-12-01 3 views
5

Symfony2/PHPに、計算量の多いバックエンドプロセスがあり、マルチスレッドで実行したいと思っています。PHPのCLIで並列スレッドで大きなループを実行する

私は何千ものオブジェクトを反復処理するので、オブジェクトごとに1つのスレッドを開始すべきではないと思います。私はパラレルでどれくらいのスレッドを必要とするかを定義する$ cores変数を持っていきたいと思っています。ループを繰り返して、多くのスレッドを実行させておいてください。したがって、スレッドが終了するたびに、すべてのオブジェクトが完了するまで、次のオブジェクトを持つ新しいスレッドを開始する必要があります。

pthreadsのドキュメントを調べていくつかのGoogle検索を行っても、このような状況で役に立つ例は見つけられません。私が見つけたすべての例は、一度実行される固定数のスレッドを持ち、何も何千ものオブジェクトを反復しません。

誰かが私を正しい方向に向けることができますか?私はスレッドを設定し、それに参加するなどの基本を理解していますが、待ち状態のループでそれを行う方法は理解していません。

+1

phpは本当にマルチスレッド用に設計されていないため、pthreadsのドキュメントにはそれに関する顕著な警告があります。より簡単なオプションは、すべてのタスクをキューにプッシュし、x個のプロセス(exec)を起動してキューにサービスを提供することです。別のオプションは、タスクを(taskcount/x)のグループに分割し、再びxプロセスを開始し、各プロセスにグループを渡すことです。 – Steve

+0

オブジェクトの所与の量のスレッドがあるかどうかわからない場合は、あなたが多くのオブジェクトを持っていて、あなたのプログラムがより速くなるのではなく、より遅くなるならば、 IMHOあなたが見つけた例でスレッドが修正された理由 – Freelancer

+1

私はこれをあなたに提案できますか? https://github.com/facile-it/paraunit私はmantainerです、私はSymonfy Processを使いました。あなたのユースケースではないかもしれませんが、多分それはあなたに頭のスタートを与えることができます...またはあなたはhttps://github.com/liuggio/fastestを見てみることができます – Jean

答えて

3

質問に対する回答は、PoolWorkerという抽象的なものです。

基本的な考え方は、それがすべてWorkers渡ってごThreadedオブジェクト(ラウンドロビン)を配布し、利用できる次のWorker上にスタックPoolにあなた::submitThreadedオブジェクトということです。

従うが、超シンプルなコードがPHP7のためです(pthreadsのv3の):

<?php 
$jobs = []; 
while (count($jobs) < 2000) { 
    $jobs[] = mt_rand(0, 1999); 
} 

$pool = new Pool(8); 

foreach ($jobs as $job) { 
    $pool->submit(new class($job) extends Threaded { 
     public function __construct(int $job) { 
      $this->job = $job; 
     } 
     public function run() { 
      var_dump($this->job); 
     } 
    }); 
} 

$pool->shutdown(); 
?> 

の仕事は明らかに、無意味です。現実の世界では$jobs配列が増え続けると思いますので、foreachdo {} whileに交換して、新しい仕事のために::submitを呼び出してください。

現実世界では、同じループ内でゴミを収集したいと思うでしょう(デフォルト動作についてはパラメータなしでPool::collectを呼び出してください)。

注目すべきことは、本当にPHPは、マルチスレッド環境で動作するように意図されていなかった場合であれば、これのどれも可能ないだろう...それは、間違いされます。

が質問に答えですが、それはそれで問題に最善の解決策ことはありません。

あなたは、symfonyコードを実行する8つのスレッドが8つのプロセスより少ないメモリを占めると仮定しているとコメントで述べました。これは当てはまりません。PHPは何も共有されていません。 8つのSymfonyスレッドが8つのSymfonyプロセスと同じくらい多くのメモリを占めることが期待できます。プロセス上でスレッドを使用する利点は、スレッド間で通信し、同期し、(見かけ上)共有できることです。

あなたができるからといって、あなたがするべきではありません。手近な作業のための最善の解決策は、おそらく、何が必要なのかを意図した既製のパッケージまたはソフトウェアを使用することです。

堅牢なソリューションを実装するのに十分なものを勉強するのは時間がかかりますが、その最初のソリューションを展開したくないということです...

私のアドバイスを無視して、それをやりなおす場合は、pthreads用のgithubリポジトリに多くのexamplesがあります。

1

ジョーは良いアプローチをしていますが、私は現在使用している別のソリューションを見つけました。基本的には、私は2つのコマンド、1つのコントロールと1つのワーカーコマンドを持っています。制御コマンドは、バックグラウンド・プロセスを開始し、その結果をチェックします:ます$ this->パラレルは私が私の8コアのマシン上の6に設定した変数です

protected function process($worker, $entity, $timeout=60) { 
    $min = $this->em->createQuery('SELECT MIN(e.id) FROM BM2SiteBundle:'.$entity.' e')->getSingleScalarResult(); 
    $max = $this->em->createQuery('SELECT MAX(e.id) FROM BM2SiteBundle:'.$entity.' e')->getSingleScalarResult(); 

    $batch_size = ceil((($max-$min)+1)/$this->parallel); 
    $pool = array(); 
    for ($i=$min; $i<=$max; $i+=$batch_size) { 
     $builder = new ProcessBuilder(); 
     $builder->setPrefix($this->getApplication()->getKernel()->getRootDir().'/console'); 
     $builder->setArguments(array(
      '--env='.$this->getApplication()->getKernel()->getEnvironment(), 
      'maf:worker:'.$worker, 
      $i, $i+$batch_size-1 
      )); 
     $builder->setTimeout($timeout); 

     $process = $builder->getProcess(); 
     $process->start(); 
     $pool[] = $process; 
    } 
    $this->output->writeln($worker.": started ".count($pool)." jobs"); 
    $running = 99; 
    while ($running > 0) { 
     $running = 0; 
     foreach ($pool as $p) { 
      if ($p->isRunning()) { 
       $running++; 
      } 
     } 
     usleep(250); 
    } 

    foreach ($pool as $p) { 
     if (!$p->isSuccessful()) { 
      $this->output->writeln('fail: '.$p->getExitCode().'/'.$p->getCommandLine()); 
      $this->output->writeln($p->getOutput()); 
     } 
    } 

} 

、それが起動するプロセスの数を意味します。このメソッドでは、特定のエンティティ(それによって分割されます)を反復処理する必要があることに注意してください。これは、使用例では常にtrueです。

これは完璧ではありませんが、スレッドの代わりに完全に新しいプロセスを開始します。私はこれをより良いソリューションと考えています。

workerコマンドは、最小ID番号と最大ID番号を取り、その2つの間のセットの実際の作業を行います。

このアプローチは、データセットが合理的によく分散されている限り機能します。 1-1000の範囲にデータがないが、1000と2000の間のすべてのIDが使用されている場合、最初の3つのプロセスは何もしません。

関連する問題