2012-03-08 11 views
3

PHPアプリケーションでは、Gearmanジョブが複数のワーカーに渡されることがあるという問題があります。私はコードを1つのファイルに再現するために減らすことができます。今は、これがGearmanのバグか、peclライブラリのバグか、私のコードのバグかどうかはわかりません。ここでGearmanジョブが複数のワーカー(PHP)に渡されます

は、エラーを再現するためのコードです:

#!/usr/bin/php 
<?php 

// Try 'standard', 'exception' or 'exception-sleep'. 
$sWorker = 'exception'; 



// Detect run mode "client" or "worker". 
if (!isset($argv[1])) 
    $sMode = 'client'; 
else 
    $sMode = 'worker-' . $sWorker; 

$sLogFilePath = __DIR__ . '/log.txt'; 

switch ($sMode) { 

    case 'client': 

     // Remove all queued test jobs and quit if there are test workers running. 
     prepare(); 

     // Init the greaman client. 
     $Client= new GearmanClient; 
     $Client->addServer(); 

     // Empty the log file. 
     file_put_contents($sLogFilePath, ''); 

     // Start some worker processes. 
     $aPids = array();  
     for ($i = 0; $i < 100; $i++) 
      $aPids[] = exec('php ' . __FILE__ . ' worker > /dev/null 2>&1 & echo $!'); 

     // Start some jobs. Also try doHigh(), doBackground() and 
     // doBackgroundHigh(); 
     for ($i = 0; $i < 50; $i++) 
      $Client->doNormal('test', $i); 

     // Wait a second (when running jobs in background). 
     // sleep(1); 

     // Prepare the log file entries. 
     $aJobs = array(); 
     $aLines = file($sLogFilePath); 
     foreach ($aLines as $sLine) { 
      list($sTime, $sPid, $sHandle, $sWorkload) = $aAttributes = explode("\t", $sLine); 
      $sWorkload = trim($sWorkload); 
      if (!isset($aJobs[$sWorkload])) 
       $aJobs[$sWorkload] = array(); 
      $aJobs[$sWorkload][] = $aAttributes; 
     } 

     // Remove all jobs that have been passed to only one worker as expected. 
     foreach ($aJobs as $sWorkload => $aJob) { 
      if (count($aJob) === 1) 
       unset($aJobs[$sWorkload]); 
     } 

     echo "\n\n"; 

     if (empty($aJobs)) 
      echo "No job has been passed to more than one worker."; 
     else { 
      echo "Those jobs has been passed more than one times to a worker:\n"; 
      foreach ($aJobs as $sWorload => $aJob) { 

       echo "\nJob #" . $sWorload . ":\n"; 
       foreach ($aJob as $aAttributes) 
        echo " $aAttributes[2] (Worker PID: $aAttributes[1])\n"; 
      } 
     } 

     echo "\n\n"; 

     // Kill all started workers. 
     foreach ($aPids as $sPid) 
      exec('kill ' . $sPid . ' > /dev/null 2>&1'); 

    break; 

    case 'worker-standard': 
     $Worker = new GearmanWorker; 
     $Worker->addServer(); 
     $Worker->addFunction('test', 'logJob'); 
        $bShutdown = false; 
     while ($Worker->work()) 
      if ($bShutdown) 
       continue; 
     break; 

    case 'worker-exception': 
     try { 
      $Worker = new GearmanWorker; 
      $Worker->addServer(); 
      $Worker->addFunction('test', 'logJob'); 
      $bShutdown = false; 
      while ($Worker->work()) 
       if ($bShutdown) 
        throw new \Exception; 

     } catch (\Exception $E) { 
     } 
    break; 

    case 'worker-exception-sleep': 
     try { 
      $Worker = new GearmanWorker; 
      $Worker->addServer(); 
      $Worker->addFunction('test', 'logJob'); 
      $bShutdown = false; 
      while ($Worker->work()) 
      { 
       if ($bShutdown) { 
        sleep(1); 
        throw new \Exception; 
       } 
      } 
     } catch (\Exception $E) { 
     } 
    break; 
} 

function logJob(\GearmanJob $Job) 
{ 
    global $bShutdown, $sLogFilePath; 
    $sLine = microtime(true) . "\t" . getmypid() . "\t" . $Job->handle() . "\t" . $Job->workload() . "\n"; 
    file_put_contents($sLogFilePath, $sLine, FILE_APPEND); 
    $bShutdown = true; 
} 


function prepare() 
{ 
    $rGearman = fsockopen('127.0.0.1', '4730', $iErrno, $sErrstr, 3); 
    $aBuffer = array(); 
    fputs ($rGearman, 'STATUS' . PHP_EOL); 
    stream_set_timeout($rGearman, 1); 
    while (!feof($rGearman)) 
     if ('.' . PHP_EOL !== $sLine = fgets($rGearman, 128)) 
      $aBuffer[] = $sLine; 
     else 
      break; 
    fclose($rGearman); 

    $bJobsInQueue = false; 
    $bWorkersRunning = false; 
    foreach ($aBuffer as $sFunctionLine) { 
     list($sFunctionName, $iQueuedJobs, $iRunningJobs, $iWorkers) = explode("\t", $sFunctionLine); 
     if ('test' === $sFunctionName) { 
      if (0 != $iQueuedJobs) 
       $bJobsInQueue = true; 
      if (0 != $iWorkers) 
       $bWorkersRunning = true;; 
     } 
    } 

    // Exit if there are workers running. 
    if ($bWorkersRunning) 
     die("There are some Gearman workers running that have registered a 'test' function. Please stop these workers and run again.\n\n"); 

    // If there are test jobs in the queue start a worker that eat up the jobs. 
    if ($bJobsInQueue) { 
     $sPid = exec('gearman -n -w -f test > /dev/null 2>&1 & echo $!'); 
     sleep(1); 
     exec ("kill $sPid > /dev/null 2>&1"); 
     // Repeat this method to make sure all jobs are removed. 
     prepare(); 
    } 
} 

あなたは、コマンドライン上でこのコードを実行すると、それは出力すべき"No job has been passed to more than one worker."が、それはなかれに渡されたいくつかのジョブのリストを出力しinsted複数の労働者。 $sWorker = 'standard';または'exception-sleep'に設定すると、エラーは表示されません。

コードを実行して、コードにバグがある場合のエラーを再現できるかどうかを教えてください。

+0

コードを実行すると、予想される出力が得られます。どのGearmanのバージョンを使用していますか? – Aurimas

+1

テストしていただきありがとうございます。 LinuxでPECL libバージョン1.0.1と1.0.2を使って0.28と0.27でテストしました。どのバージョンとOSを使用していますか? – stofl

+1

私は0.29で動作していますが、もう一度試してみましたが、エラーを再現することもできました(少なくとも最初のテストではたまにしか発生しませんが、少なくとも3〜4回はエラーが発生しませんでした)。 – Aurimas

答えて

3

Gearman 0.24、PECL lib 1.0.2とまったく同じ問題がありました。毎回あなたのスクリプトでエラーを再現できました。

Gearmanの以前のバージョン(私は思うが0.14)は正常に動作していた。

Gearmanを0.33にアップグレードすると問題が解決しました。

関連する問題