2013-04-29 23 views
5

最近、私はIPC用に共有メモリを使用しています。私が実装しようとしてきたことの1つは、1つのプロセスを生成し、1つのプロセスを消費する単純なリングバッファです。各プロセスには、その位置を追跡するための独自のシーケンス番号があります。これらのシーケンス番号は、原子値演算を使用して更新され、正しい値が他のプロセスに見えるようにします。プロデューサは、リングバッファが一杯になるとブロックします。セマフォまたはミューテックスが使用されないという点で、コードはロックフリーです。賢明な私はむしろ控えめなVM上で毎秒およそ2000万メッセージを取得しています共有メモリ内の単一プロデューサ/コンシューマリングバッファ

パフォーマンス - それでかなり満足:)

私は私のコードがどのように「正しい」についての興味は何

。誰もが固有の問題/競合状態を見つけられますか?ここに私のコードです。ご意見ありがとうございます。

#include <stdlib.h> 
#include <stdio.h> 
#include <fcntl.h> 
#include <sys/mman.h> 
#include <sys/stat.h> 
#include <time.h> 
#include <unistd.h> 
#include <string.h> 

#define SHM_ID "/mmap-test" 
#define BUFFER_SIZE 4096 
#define SLEEP_NANOS 1000 // 1 micro 

struct Message 
{ 
    long _id; 
    char _data[128]; 
}; 

struct RingBuffer 
{ 
    size_t _rseq; 
    char _pad1[64]; 

    size_t _wseq; 
    char _pad2[64]; 

    Message _buffer[BUFFER_SIZE]; 
}; 

void 
producerLoop() 
{ 
    int size = sizeof(RingBuffer); 
    int fd = shm_open(SHM_ID, O_RDWR | O_CREAT, 0600); 
    ftruncate(fd, size+1); 

    // create shared memory area 
    RingBuffer* rb = (RingBuffer*)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 
    close(fd); 

    // initialize our sequence numbers in the ring buffer 
    rb->_wseq = rb->_rseq = 0; 
    int i = 0; 

    timespec tss; 
    tss.tv_sec = 0; 
    tss.tv_nsec = SLEEP_NANOS; 

    while(1) 
    { 
     // as long as the consumer isn't running behind keep producing 
     while((rb->_wseq+1)%BUFFER_SIZE != rb->_rseq%BUFFER_SIZE) 
     { 
      // write the next entry and atomically update the write sequence number 
      Message* msg = &rb->_buffer[rb->_wseq%BUFFER_SIZE]; 
      msg->_id = i++; 
      __sync_fetch_and_add(&rb->_wseq, 1); 
     } 

     // give consumer some time to catch up 
     nanosleep(&tss, 0); 
    } 
} 

void 
consumerLoop() 
{ 
    int size = sizeof(RingBuffer); 
    int fd = shm_open(SHM_ID, O_RDWR, 0600); 
    if(fd == -1) { 
     perror("argh!!!"); return; 
    } 

    // lookup producers shared memory area 
    RingBuffer* rb = (RingBuffer*)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 

    // initialize our sequence numbers in the ring buffer 
    size_t seq = 0; 
    size_t pid = -1; 

    timespec tss; 
    tss.tv_sec = 0; 
    tss.tv_nsec = SLEEP_NANOS; 

    while(1) 
    { 
     // while there is data to consume 
     while(seq%BUFFER_SIZE != rb->_wseq%BUFFER_SIZE) 
     { 
      // get the next message and validate the id 
      // id should only ever increase by 1 
      // quit immediately if not 
      Message msg = rb->_buffer[seq%BUFFER_SIZE]; 
      if(msg._id != pid+1) { 
       printf("error: %d %d\n", msg._id, pid); return; 
      } 
      pid = msg._id; 
      ++seq; 
     } 

     // atomically update the read sequence in the ring buffer 
     // making it visible to the producer 
     __sync_lock_test_and_set(&rb->_rseq, seq); 

     // wait for more data 
     nanosleep(&tss, 0); 
    } 
} 

int 
main(int argc, char** argv) 
{ 
    if(argc != 2) { 
     printf("please supply args (producer/consumer)\n"); return -1; 
    } else if(strcmp(argv[1], "consumer") == 0) { 
     consumerLoop(); 
    } else if(strcmp(argv[1], "producer") == 0) { 
     producerLoop(); 
    } else { 
     printf("invalid arg: %s\n", argv[1]); return -1; 
    } 
} 

答えて

1

一見して正しいと思われます。私はあなたがパフォーマンスに満足していることを理解していますが、__sync_fetch_and_addよりも軽いものを使用するのが楽しい実験かもしれません。 AFAIKそれは高価な完全な記憶の障壁です。単一のプロデューサと単一のコンシューマが存在するため、リリースとそれに対応する取得操作により、パフォーマンスが向上します。 FacebookのFollyライブラリには、新しいC++ 11アトミックを使用する単一のプロデューサの単一のコンシューマキューがあります。https://github.com/facebook/folly/blob/master/folly/ProducerConsumerQueue.h