2015-10-05 9 views
11

postgresデータベースを使用して、複数のライターと複数のリーダーで信頼できるキューを実装しようとしています。キュー・リーダーが表をスキャンして、進行中のトランザクションが読み取った後にコミットするときに行が欠落しないようにする方法。PostgreSQL - 信頼できるキューを実装する

「チェックポイント」時間を使用してバッチ内の行を選択する読者がいます。各バッチは前のバッチの最後のタイムスタンプの後に行を取得し、行がありません。 (理由:タイムスタンプの値は、INSERTが発生した時刻(00.00.00)に基づいています。負荷が高い場合、トランザクションに時間がかかると、10秒後に(00.00.10)挿入され、その10秒間に読み込んで、その行のINSERT時刻がrow1よりも遅い(00.00.05)行を見つけた場合は、(row1)この問題の詳細な説明は、このブログに書かれているものと同様です。

コンテキストの

関連前の質問:Postgres LISTEN/NOTIFY - low latency, realtime?

アップデート:私は、複数の読者に、単一の読者を持つからの質問を更新しました。読者が読むことは重要です。

+1

http://stackoverflow.com/q/6507475/330315とhttp://stackoverflow.com/q/22765054/330315役立つかもしれない:実装を確認し、言っ

。これを調べることもできます:http://pgxn.org/dist/pg_message_queue/ –

+0

すべての行が*順番に処理されることは重要ですか?では、順序はどのように*正確に定義されていますか?または行の不足を避けたいですか?それでは、ここに示すような解決策が必要です:[Postgres Update、limit 1](http://dba.stackexchange.com/a/69497/3684) –

+0

これは本当にpostgresqlで行う必要がありますか?これは非常に簡単に赤字で完了できる要件の一種です – e4c5

答えて

3

複数のリーダーを考慮すると、各リーダーがすでに受信したレコードを制御する必要があります。

また、注文は読者にもレコードを送信する条件であると言われています。したがって、あるトランザクションが以前のトランザクションよりも前にコミットした場合、読者に送信されるレコードの順序を維持するために、「停止」し、コミットしたときにレコードを再度送信する必要があります。

-- lets create our queue table 
drop table if exists queue_records cascade; 
create table if not exists queue_records 
(
    cod serial primary key, 
    date_posted timestamp default timeofday()::timestamp, 
    message text 
); 


-- lets create a table to save "checkpoints" per reader_id 
drop table if exists queue_reader_checkpoint cascade; 
create table if not exists queue_reader_checkpoint 
(
    reader_id text primary key, 
    last_checkpoint numeric 
); 



CREATE OR REPLACE FUNCTION get_queue_records(pREADER_ID text) 
RETURNS SETOF queue_records AS 
$BODY$ 
DECLARE 
    vLAST_CHECKPOINT numeric; 
    vCHECKPOINT_EXISTS integer; 
    vRECORD   queue_records%rowtype; 
BEGIN 

    -- let's get the last record sent to the reader 
    SELECT last_checkpoint 
    INTO vLAST_CHECKPOINT 
    FROM queue_reader_checkpoint 
    WHERE reader_id = pREADER_ID; 

    -- if vLAST_CHECKPOINT is null (this is the very first time of reader_id), 
    -- sets it to the last cod from queue. It means that reader will get records from now on. 
    if (vLAST_CHECKPOINT is null) then 
     -- sets the flag indicating the reader does not have any checkpoint recorded 
     vCHECKPOINT_EXISTS = 0; 
     -- gets the very last commited record 
     SELECT MAX(cod) 
     INTO vLAST_CHECKPOINT 
     FROM queue_records; 
    else 
     -- sets the flag indicating the reader already have a checkpoint recorded 
     vCHECKPOINT_EXISTS = 1; 
    end if; 

    -- now let's get the records from the queue one-by-one 
    FOR vRECORD IN 
      SELECT * 
      FROM queue_records 
      WHERE COD > vLAST_CHECKPOINT 
      ORDER BY COD 
    LOOP 

     -- if next record IS EQUALS to (vLAST_CHECKPOINT+1), the record is in the expected order 
     if (vRECORD.COD = (vLAST_CHECKPOINT+1)) then 

      -- let's save the last record read 
      vLAST_CHECKPOINT = vRECORD.COD; 

      -- and return it 
      RETURN NEXT vRECORD; 

     -- but, if it is not, then is out of order 
     else 
      -- the reason is some transaction did not commit yet, but there's another further transaction that alread did. 
      -- so we must stop sending records to the reader. And probably next time he calls, the transaction will have committed already; 
      exit; 
     end if; 
    END LOOP; 


    -- now we have to persist the last record read to be retrieved on next call 
    if (vCHECKPOINT_EXISTS = 0) then 
     INSERT INTO queue_reader_checkpoint (reader_id, last_checkpoint) values (pREADER_ID, vLAST_CHECKPOINT); 
    else   
     UPDATE queue_reader_checkpoint SET last_checkpoint = vLAST_CHECKPOINT where reader_id = pREADER_ID; 
    end if; 
end; 
$BODY$ LANGUAGE plpgsql VOLATILE; 
+0

私はいくつかの "チェックポイント"が必要です。あなたは私が "cod"を使うことを提案していますか?また、注文は完全に台無しになる。 – Chandra

+0

追加のレコードを読み取るためには、順序が必要ですか? – Christian

+0

私は@ Chandraのユースケースでは、複数の読者がすべて同じテーブルを読んでいると思いますが、おそらく異なる時と異なる速度で読んでいます。どのようにcodが彼のユースケースを助けるのかはわかりません。 –

関連する問題