2016-04-03 17 views
4

多くのエンティティを持つ巨大なプロジェクトでは、私はsave()の共通メソッドを書いていました。Zend framework 2/Doctrine 2 /バルク操作とイベントトリガー

このメソッドは、抽象サービスに格納され、エンティティの状態を保存するためにすべてのプロジェクトで使用されます。 (私の場合は

public function save($entity) 
{ 
    $transactionStarted = $this->beginTransaction(); 

    try 
    { 
     $action = $entity->getId() ? self::UPDATE : self::CREATION; 

     $this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]); 

     $this->getEntityManager()->persist($entity); 
     $this->getEntityManager()->flush(); 

     $this->getEventManager()->trigger('save.post', $entity, ['action' => $action]); 

     if ($transactionStarted) 
     { 
      $this->commitTransaction(); 
     } 
    } catch (\Exception $e) 
    { 
     if ($transactionStarted) 
     { 
      $this->rollbackTransaction(); 
     } 

     throw new Exception('Unable to save entity', $e); 
    } 

    return true; 
} 

public function beginTransaction() 
{ 
    if (!$this->getEntityManager()->getConnection()->isTransactionActive()) 
    { 
     $this->getEntityManager()->getConnection()->beginTransaction(); 

     return true; 
    } 

    return false; 
} 

public function commitTransaction() 
{ 
    $this->getEntityManager()->getConnection()->commit(); 

    return $this; 
} 

public function rollbackTransaction() 
{ 
    $this->getEntityManager()->getConnection()->rollBack(); 

    return $this; 
} 

、メンバーが挿入されたとき(新しいMemberエンティティ)(AbstractServiceを拡張)Memberサービスを呼び出すときに、電子メールが送信されます。

AbstractService ::セーブ()は次のようになります例えば、save.postイベントを通して)。 saveメソッドを呼び出す別のサービスに関連する別のアクションも実行できます。 "子" MemberService :: save()メソッド

MemberService 

public function save(Member $member) 
{ 
    // some stuff, e.g set a property 
    $member->setFirstName('John'); 

    return parent::save($member); 
} 

簡単な保存処理のために素晴らしいことだ、トリガイベント

$sharedEventManager->attach(MemberService::class, 'save.post', [$this, 'onMembersCreation']); 

public function onMembersCreation(EventInterface $event) 
{ 
    // send an email 

    // anything else ... update another entity ... (call AnotherService::save() too) 
} 

の例の

例。

しかし今、私はたくさんのメンバーを作成し、更新して大量にインポートしたいと思います。それを達成するために、一括インポートに関連するDoctrineのドキュメントを読んでいます。 Doc here

「バルク保存」と「シングルセービング」を処理するためにコードを正しく更新するにはどうすればよいですか?トランザクションのセキュリティとイベントを維持しますか?

+0

「たくさんのメンバー」とは何ですか? 1k? 1M?あなたの答えは、あなたが助けるべき戦略を定義するでしょう – JesusTheHun

+0

こんにちはJesusTheHun、私の問題で最初に興味を持っていただきありがとうございます:) "多くのメンバー"は4kから10kです – ceadreak

+0

これはワンショットのインポートですか?頻繁に走る?根底にある問題は、パフォーマンスが重要かどうかです。 – JesusTheHun

答えて

1

基本的には、Doctrine \ Common \ Collections \ Collectionインターフェイスを実装して、おそらくArrayCollectionを拡張し、ドキュメントの指示に従うメソッドsaveを作成することをお勧めします。そのような

<?php 

class MyDirtyCollection extends \Doctrine\Common\Collections\ArrayCollection { 

    public function __construct(AbstractService $abstractService) 
    { 
     $this->service = $abstractService; 
    } 

    public function save() 
    { 
     foreach ($this as $entity) { 
      $this->service->save($entity); 
     } 
    } 
} 

class MyCollection extends \Doctrine\Common\Collections\ArrayCollection { 

    public $bulkSize = 500; 

    protected $eventManager; 
    protected $entityManager; 

    public function __construct(EntityManager $entityManager, EventManager $eventManager) 
    { 
     $this->entityManager = $entityManager; 
     $this->eventManager = $eventManager; 
    } 

    public function getEventManager() 
    { 
     return $this->eventManager; 
    } 

    public function getEntityManager() 
    { 
     return $this->entityManager; 
    } 

    public function setBulkSize(int $bulkSize) 
    { 
     $this->bulkSize = $bulkSize; 
    } 

    public function save() 
    { 
     $transactionStarted = $this->getEntityManager()->getConnection()->beginTransaction(); 

     try { 
      foreach ($this as $entity) { 
       $action = $entity->getId() ? self::UPDATE : self::CREATION; 
       $this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]); 
      } 

      $i = 0; 
      foreach ($this as $entity) { 
       $i++; 

       $this->getEntityManager()->persist($entity); 

       if (($i % $this->bulkSize) === 0) { 
        $this->getEntityManager()->flush(); 
        $this->getEntityManager()->clear(); 
       } 
      } 

      $this->getEntityManager()->flush(); 
      $this->getEntityManager()->clear(); 

      foreach ($this as $entity) { 
       $action = $entity->getId() ? self::UPDATE : self::CREATION; 
       $this->getEventManager()->trigger('save.post', $entity, ['action' => $action]); 
      } 

      if ($transactionStarted) { 
       $this->getEntityManager()->getConnection()->commitTransaction(); 
      } 

     } catch (Exception $e) { 
      $this->getEntityManager()->rollbackTransaction(); 
     } 
    } 
} 

何か;)あなたはあなたのコレクションを水和してデータをフェッチするとき、あなたはあなたのエンティティに対処し、最終的に呼び出す $collection->save();

EDIT:インサート・クラスを追加し、以下のケースを使用します。

ここでのパフォーマンスは低くなりますが、コミットによってコミットするよりも優れています。しかし、hgihのパフォーマンスを探している場合は、ORMの代わりにDoctrine DBALを使用することを検討する必要があります。ここで私はあなたと一緒に一括挿入のための私のDBALクラスを共有:

<?php 

namespace JTH\Doctrine\DBAL; 

use Doctrine\DBAL\Query\QueryBuilder; 
use Exception; 
use InvalidArgumentException; 
use Traversable; 
use UnderflowException; 

class Insert extends QueryBuilder 
{ 
    const CALLBACK_FAILURE_SKIP = 0; 
    const CALLBACK_FAILURE_BREAK = 1; 

    protected $callbackFailureStrategy = self::CALLBACK_FAILURE_BREAK; 

    public static $defaultBulkSize = 500; 

    public $ignore = false; 
    public $onDuplicate = null; 

    public function values(array $values) 
    { 
     $this->resetQueryPart('values'); 
     $this->addValues($values); 
    } 

    public function addValues(array $values) 
    { 
     $this->add('values', $values, true); 
    } 

    public function setCallbackFailureStrategy($strategy) 
    { 
     if ($strategy == static::CALLBACK_FAILURE_BREAK) { 
      $this->callbackFailureStrategy = static::CALLBACK_FAILURE_BREAK; 
     } elseif ($strategy == static::CALLBACK_FAILURE_SKIP) { 
      $this->callbackFailureStrategy = static::CALLBACK_FAILURE_SKIP; 
     } else { 
      $class = self::class; 
      throw new InvalidArgumentException(
       "Invalid failure behaviour. See $class::CALLBACK_FAILURE_SKIP and $class::CALLBACK_FAILURE_BREAK" 
      ); 
     } 
    } 

    public function getCallbackFailureStrategy() 
    { 
     return $this->callbackFailureStrategy; 
    } 

    public function execute() 
    { 
     return $this->getConnection()->executeUpdate(
      $this->getSQLForInsert(), 
      $this->getParameters(), 
      $this->getParameterTypes() 
     ); 
    } 

    /** 
    * Converts this instance into an INSERT string in SQL. 
    * @return string 
    * @throws \Exception 
    */ 
    private function getSQLForInsert() 
    { 
     $count = sizeof($this->getQueryPart('values')); 

     if ($count == 0) { 
      throw new UnderflowException("No values ready for INSERT"); 
     } 

     $values = current($this->getQueryPart('values')); 
     $ignore = $this->ignore ? 'IGNORE' : '' ; 
     $sql = "INSERT $ignore INTO " . $this->getQueryPart('from')['table'] . 
      ' (' . implode(', ', array_keys($values)) . ')' . ' VALUES '; 

     foreach ($this->getQueryPart('values') as $values) { 
      $sql .= '(' ; 

      foreach ($values as $value) { 
       if (is_array($value)) { 
        if ($value['raw']) { 
         $sql .= $value['value'] . ','; 
        } else { 
         $sql .= $this->expr()->literal($value['value'], $value['type']) . ','; 
        } 
       } else { 
        $sql .= $this->expr()->literal($value) . ','; 
       } 
      } 

      $sql = substr($sql, 0, -1); 
      $sql .= '),'; 
     } 

     $sql = substr($sql, 0, -1); 

     if (!is_null($this->onDuplicate)) { 
      $sql .= ' ON DUPLICATE KEY UPDATE ' . $this->onDuplicate . ' '; 
     } 

     return $sql; 
    } 

    /** 
    * @param $loopable array | Traversable An array or object to loop over 
    * @param $callable Callable A callable that will be called before actually insert the row. 
    * two parameters will be passed : 
    * - the key of the current row 
    * - the row values (Array) 
    * An array of rows to insert must be returned 
    * @param $bulkSize int How many rows will be inserted at once 
    * @param bool $transactionnal 
    * @throws \Doctrine\DBAL\ConnectionException 
    * @throws \Exception 
    */ 
    public function bulk($loopable, callable $callable, $bulkSize = null, $transactionnal = true) 
    { 
     if (!is_array($loopable) and !($loopable instanceof Traversable)) { 
      throw new InvalidArgumentException("\$loppable must be either an array or a traversable object"); 
     } 

     $bulkSize = $bulkSize ?? static::$defaultBulkSize; 

     $this->getConnection()->getConfiguration()->setSQLLogger(null); // Avoid MonoLog memory overload 

     if ($transactionnal) { 
      $this->getConnection()->beginTransaction(); 
     } 

     $this->resetQueryPart('values'); 

     foreach ($loopable as $key => $values) { 
      try { 
       $callbackedValues = $callable($key, $values); 

       if (sizeof($callbackedValues) > 0) { 
        foreach ($callbackedValues as $callbackedValuesRow) { 
         $this->addValues($callbackedValuesRow); 
        } 
       } 
      } catch (Exception $e) { 
       /* 
       * If a callback exception must break the transaction, then throw the exception to the call stack 
       * Else, skip the row insertion 
       */ 
       if ($this->callbackFailureStrategy == static::CALLBACK_FAILURE_BREAK) { 
        throw $e; 
       } else { 
        continue; 
       } 
      } 

      $count = count($this->getQueryPart('values')); 

      if ($count >= $bulkSize) { 
       $this->execute(); 
       $this->resetQueryPart('values'); 
      } 
     } 

     $count = count($this->getQueryPart('values')); 

     if ($count > 0) { 
      $this->execute(); 
     } 

     $this->resetQueryPart('values'); 

     if ($transactionnal) { 
      $this->getConnection()->commit(); 
     } 
    } 

    /** 
    * @return boolean 
    */ 
    public function isIgnore() 
    { 
     return $this->ignore; 
    } 

    /** 
    * @param boolean $ignore 
    */ 
    public function setIgnore(bool $ignore) 
    { 
     $this->ignore = $ignore; 
    } 

    /** 
    * @return null|string 
    */ 
    public function getOnDuplicate() : string 
    { 
     return $this->onDuplicate; 
    } 

    /** 
    * @param null $onDuplicate 
    */ 
    public function setOnDuplicate($onDuplicate) 
    { 
     $this->onDuplicate = $onDuplicate; 
     $this->ignore = false; 
    } 


} 

ユースケース:

try { 
     $i = new Insert($this->getDoctrine()->getConnection('myDB')); 
     $i->insert('myTable'); 
     $i->setOnDuplicate('col1 = VALUES(col1), updated_last = NOW()'); 
     $i->setCallbackFailureStrategy(Insert::CALLBACK_FAILURE_BREAK); 
     $i->bulk($myArrayOfRows, function ($key, $row) { 

      // Some pre-insert processing 

      $rowset[] = $row; 

      return $rowset; 

     }, 500, true); 

     $this->addFlash('success', 'Yay !'); 

    } catch (DBALException $e) { 
     $this->addFlash('error', 'Damn, error : ' . $e->getMessage()); 
    } 
+0

Mhhh私はそれについてはわかりません、配列ではなくコレクションで、トランザクションには問題がありました。問題が同じかどうかをチェックするためにコレクションで再度テストします – ceadreak

+0

問題は何ですかその取引は?私の更新を確認してください – JesusTheHun

+0

@ JesusTheHun私はあなたの解決策をこのp.mテストします。 – ceadreak

0

は最後に、私はmerge教義の方法を使用し、素晴らしい動作しているようです。いくつかのイベントのために呼ばれるので、

/** 
    * @param ArrayCollection $entities 
    * 
    * @return bool 
    * @throws Exception 
    */ 
    public function saveBulk(ArrayCollection $entities) 
    { 
     $batchSize = 100; 
     $i   = 0; 

     foreach ($entities as $entity) 
     { 
      $transactionStarted = $this->beginTransaction(); 

      try 
      { 
       $action = $entity->getId() ? self::UPDATE : self::CREATION; 

       $this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]); 

       $entity = $this->getEntityManager()->merge($entity); 

       $this->getEntityManager()->persist($entity); 
       $this->getEntityManager()->flush(); 

       $this->getEventManager()->trigger('save.post', $entity, ['action' => $action]); 

       if (($i % $batchSize) === 0) 
       { 
        $this->getEntityManager()->clear(); 
       } 

       if ($transactionStarted) 
       { 
        $this->commitTransaction(); 
       } 
      } catch (\Exception $e) 
      { 
       if ($transactionStarted) 
       { 
        $this->rollbackTransaction(); 
       } 

       throw new Exception(Exception::UNEXPECTED_ERROR, 'Unable to save entity', $e); 
      } 
     } 

     $this->getEntityManager()->clear(); 

     return true; 
    } 

doctrine2のドキュメントに反して、私はバッチごとにclear()なくflush() + clear()を呼び出す:

は私のような Memberエンティティの高い番号を保存するために別の AbstractService::saveBulk()方法を書きましたエンティティにデータベース識別子があるかどうかを知る必要があります。

@JesusTheHun私の多くのお役に立てるあなたのコメントに感謝します。

+0

一度に一度ではなく、すべてのエンティティのトランザクションを開始します。私はあなたがMyCollectionクラスであなたに与えたメソッドを使ってみませんか? (http://stackoverflow.com/questions/36390063/zend-framework-2-doctrine-2-bulk-operations-and-events-triggering/36602128#36602128) すべてのエンティティをフラッシュすると意味がありませんが、すべての$ batchSizeをフラッシュする必要があります。バッチサイズvarは、構造体または静的プロパティで初期化されたパラメータ、クラス定数またはオブジェクトプロパティでなければなりません。また、モジュロ条件を使用し、121個のエンティティをexempleにすると、最後の21個のエンティティをコミットしません。 – JesusTheHun

+0

私はあなたのメソッドを試しましたが、マージせずに、このエラーを持っています。 '関係を通して新しいエンティティが見つかりました。'beginTransaction()'メソッドは、トランザクションがすでに起動しているかどうかを確認します。すべてのエンティティに対してトランザクションを開始しません。 – ceadreak

+0

マージを使用して永続化する方法を再度使用しようとしました。 500エンティティの場合、私の方法よりもメソッドが4秒少なくなります)。回答は+1 – ceadreak

関連する問題