2017-11-26 15 views
2

私は電子メールキューであるシンプルなテーブルを持っています。Innodbトランザクションまたはテーブルがロックされていますか?

CREATE TABLE `emails_queue_batch` (
    `eq_log_id` int(11) NOT NULL DEFAULT '0', 
    `eq_to` varchar(120) CHARACTER SET utf8 DEFAULT NULL, 
    `eq_bcc` varchar(80) CHARACTER SET utf8 DEFAULT '', 
    `eq_from` varchar(80) CHARACTER SET utf8 DEFAULT NULL, 
    `eq_title` varchar(100) COLLATE utf8_unicode_ci DEFAULT NULL, 
    `eq_headers` varchar(80) CHARACTER SET utf8 DEFAULT NULL, 
    `eq_content` longtext CHARACTER SET utf8, 
    `eq_sid` int(11) DEFAULT '0', 
    `eq_type` int(11) DEFAULT '0' COMMENT 'email type', 
    `eq_esp` int(11) DEFAULT '0', 
    PRIMARY KEY (`eq_log_id`), 
    KEY `email` (`eq_to`) 
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci 

複数のスレッドが一度に50行ずつ繰り返し読み込み、行を削除します。私が使用したのと同じ行の二重読み取りを避けるため

$db->query(" LOCK TABLE $table WRITE "); 

    $query= "SELECT * FROM $table LIMIT ".CHUNK_SIZE. " " ; 

    $emails2send=$db->get_results ($query); 

    if (!empty ($emails2send)){ 

     // DELETE EMAIL 
     $eq_log_ids = array(); 
     foreach ($emails2send as $email) $eq_log_ids[]= $email->eq_log_id ; 

     $query= "DELETE FROM $table WHERE eq_log_id IN (".implode(',', $eq_log_ids).") "; 
     $db->query ($query); 

     $db->query (" UNLOCK TABLES "); // unlock the table so other sessions can read next rows 
     ........ code processing the read rows here ............. 
    } else { // if !empty emails2send 
     // $emails2send is empty 
     $db->query (" UNLOCK TABLES; ");  
     $stop_running=true; // stop running 
    } 

別のスレッド(複数可)は、同時にテーブルに書き込みます。 私は、この構成が読み取りと書き込みの両方のロックされたテーブルでデッドロックしていることを理解していません。

私の質問は: これは、私が各行を一度(一度だけ)読んでいることを確認するための正しい解決方法です。

これはトランザクションとしてより適切に処理されますか?その場合はどのような種類ですか?私は取引経験がありません。

+1

正しい解決策は、実際のメッセージキュー(例:RabbitMQまたはActiveMQ)を使用することです。デタクト・キューとしてトランザクション・データベースを使用すると、データベース・キューから読み取るスレッドを1つだけに制限しない限り、常にロック競合とデッドロックが発生します。 –

+0

ありがとう、私はそれに取り組んでいます。 メッセージキューの1つの問題は、何かがうまくいかない場合、たとえば電子メールバッチをキャンセルする必要がある場合、キューから選択的に削除できないことです。 – Nir

+0

私の過去の仕事の1つでは、データベースにタスクを保存しましたが、 "ディスパッチャ"プロセスは実行準備ができたらタスクを引き出し、MQに投稿しました。ディスパッチャがシングルスレッドである限り、あなたが尋ねている問題は回避されます。その後、MQは多くのワーカースレッドによって読み取られます。それはバランスを取ることになります。DB内のタスクを削除したり、MQに投稿されるまで修正することができます。 –

答えて

1

プランAは:

これは2秒、たとえば、未満でN行を処理することができ前提としています。あなたはN = 50です - これは大きすぎるかもしれません。

BEGIN; 
SELECT ... LIMIT 50 FOR UPDATE; 
... process ... 
... gather a list of ids to delete ... 
DELETE ... WHERE id IN (...) 
COMMIT; 

あなたが掴むほど、速くなりますが、デッドロックが発生する可能性も高くなります。デッドロックが発生した場合、トランザクションを開始するだけです。また、 "50"を調整するために、デッドロックが発生する頻度を記録します。

プランB:

項目の処理は、トランザクションのために、「長すぎる」を取るときに便利です。私はおそらく2秒は "長すぎる"と言う。

Grab a row to process: 
with autocommit=ON ... 
UPDATE ... SET who_is_processing = $me, 
       when_grabbed = NOW() 
       id = LAST_INSERT_ID(id), 
      WHERE when_grabbed IS NULL 
      AND any-other-criteria 
      LIMIT 1; 
$id = SELECT LAST_INSERT_ID(); 

... process $id ... (This may or may not involve transactions) 

Release the row (or, in your case, delete it): 
again, autocommit=ON suffices... 
DELETE ... WHERE id = $id; 

「Never」はInnoDBでテーブルロックを使用します。

1

トランザクションを使用するほうが、特にデッドロックしている場合は、おそらくより良いでしょう。

まず、バッチサイズを50から1に減らして、改善するかどうかを確認してください。彼らはそうかもしれない。それは簡単です。そして、それはあなたがトランザクションを使用する場合、あなたがしたいことです。

第2に、このような一連のクエリを試してみてください。

START TRANSACTION; 
    SELECT @id := table.eq_log_id, table.* FROM table LIMIT 1 FOR UPDATE; 
    /* handle the item here */ 
    DELETE FROM table WHERE eq_log_id = @id; 
    COMMIT; 

eq_log_idがユニーク(またはプライマリ)キーである場合にのみ機能します。 SELECT操作で行が返されなくなるまで、PHPプログラムのループでこれを実行します。その後、しばらく眠り、もう一度やり直してください。

ヌルデフォルト値を使用して、processedというTIMESTAMPをテーブルに追加することもできます。行を削除する代わりに、タイムスタンプを更新することができます。これにより、トラブルシューティングの方法が得られます。

START TRANSACTION; 
SELECT @id:=eq_log_id, * FROM table WHERE processed IS NULL LIMIT 1 FOR UPDATE; 
/* handle the item here */ 
UPDATE table SET processed=NOW() WHERE eq_log_id = @id; 
COMMIT; 

あなたはこの

DELETE FROM table WHERE processed < CURDATE() - INTERVAL 1 DAY; 

のように、すべての古いレコードを一掃するために夜間のバッチを実行することができ、生産に送信されたメッセージの時間履歴を見て、本当に役立つことができたので、私はこのことを示唆しています。

関連する問題