2016-11-28 7 views
0

私は、プロデューサー - コンシューマーのパラダイムを模倣したプログラムを実装する際に問題に取り組んでいます。私が使用しているコードは、1つのプロデューサと1つのコンシューマしか持たない場合でも動作しますが、別のプロデューサと別のコンシューマを追加すると動作しません。Cプロデューサー/コンシューマー(PThreadsを使用)

私はこれにしばらく時間を費やしましたが、なぜエラーが発生しているのかわかりません。Synchronization Error: Producer x Just overwrote x from Slot x。私はさまざまなテストを通じてこの問題に追いついてきました。問題は、別のプロデューサがクリティカルセクションにあることに気づいたときに、プロデューサがブロックされていないという事実にあります。

#include <stdio.h> 
#include <pthread.h> 
#include <semaphore.h> 
#include <stdlib.h> 

void *producer (void *) ; 
void *consumer(void *) ; 
sem_t empty, full, mutex ; 

int buffer[10] /*note extra long space!*/ ; 
int ID[10] ; 
int in = 0 ; int out = 0 ; 
int BUFFER_SIZE = 10 ; 
int nextProduced = 0 ; 

main() { 
    int i ; 
    pthread_t TID[10] ; 

    sem_init(&empty, 0, 10) ; 
    sem_init(&full, 0, 0) ; 
    sem_init(&mutex, 0, 1) ; 

    for(i = 0; i < 10; i++) { 
     ID[i] = i ; 
     buffer[i] = -1 ; 
    } 

    //for(i = 0; i < 5000; i += 2) { 
     pthread_create(&TID[0], NULL, producer, (void *) &ID[0]) ; 
     printf("Producer ID = %d created!\n", 0) ; 
     pthread_create(&TID[1], NULL, consumer, (void *) &ID[1]) ; 
     printf("Consumer ID = %d created!\n", 1) ; 

     pthread_create(&TID[2], NULL, producer, (void *) &ID[2]) ; 
     printf("Producer ID = %d created!\n", 2) ; 
     pthread_create(&TID[3], NULL, consumer, (void *) &ID[3]) ; 
     printf("Consumer ID = %d created!\n", 3) ; 
    //} 

    for(i = 0; i < 10 ; i++) { 
     pthread_join(TID[i], NULL) ; 
    } 
} 

void *producer(void *Boo) { 
    int *ptr; 
    int ID; 
    ptr = (int *) Boo; 
    ID = *ptr; 
    while (1) { 
     nextProduced++; //Producing Integers 
     /* Check to see if Overwriting unread slot */ 
     sem_wait(&empty); 
     sem_wait(&mutex); 

     if (buffer[in] != -1) { 
      printf("Synchronization Error: Producer %d Just overwrote %d from Slot %d\n", ID, buffer[in], in); 
      exit(0); 
     } 

     /* Looks like we are OK */ 
     buffer[in] = nextProduced; 
     printf("Producer %d. Put %d in slot %d\n", ID, nextProduced, in); 
     in = (in + 1) % BUFFER_SIZE; 
     printf("incremented in!\n"); 

     sem_post(&mutex); 
     sem_post(&full); 
    } 
} 

void *consumer (void *Boo) { 
    static int nextConsumed = 0 ; 
    int *ptr ; 
    int ID ; 
    ptr = (int *) Boo ; 
    ID = *ptr ; 
    while (1) { 
     sem_wait(&full); 
     sem_wait(&mutex); 

     nextConsumed = buffer[out]; 
     /*Check to make sure we did not read from an empty slot*/ 
     if (nextConsumed == -1) { 
      printf("Synch Error: Consumer %d Just Read from empty slot %d\n", ID, out) ; 
      exit(0) ; 
     } 
     /* We must be OK */ 
     printf("Consumer %d Just consumed item %d from slot %d\n", ID, nextConsumed, out) ; 
     buffer[out] = -1 ; 
     out = (out + 1) % BUFFER_SIZE; 

     sem_post(&mutex); 
     sem_post(&empty); 
    } 
} 

出力リレー:

Producer ID = 0 created! 
Producer 0. Put 1 in slot 0 
Consumer ID = 1 created! 
incremented in! 
Consumer 1 Just consumed item 1 from slot 0 
Producer ID = 2 created! 
Producer 0. Put 2 in slot 1 
Synchronization Error: Producer 2 Just overwrote 2 from Slot 1 
Consumer 1 Just consumed item 2 from slot 1 
Consumer ID = 3 created! 
incremented in! 
Consumer 3 Just consumed item 2 from slot 1 
Synch Error: Consumer 1 Just Read from empty slot 2 
Producer 0. Put 4 in slot 2 

あなたが見ることができるように、プロデューサー0はプロデューサー0 inをインクリメントすることができます前に、しかし、スロット1に2を入れて管理し、スロット1にデータを書き込むためのプロデューサー2つの試みinがインクリメントされていないためです。

なんらかの理由で私のsem_waits()が機能していないようです。誰か助けてくれますか?

答えて

0

自分のシステムで実行するようにコードを修正しました。正常に動作しているようです。

特定の変更:私はOSXを使っているので、sem_init()からsem_open()sem_unlink()に変更する必要がありました。その変更と一般的なコードに対応するために、私は型を入れて^ Cがコンシューマーとプロデューサーに仕上がり、pthread_join()とそれに続くクリーンアップコードを実行できるように、インターハンドハンドダーを追加しました。プロセス数を独立したバッファ数に結びつけました(あなたのIDとバッファの初期化コードを参照してください)。ミューテックス内でnextProduced++を移動しました。様々なランダムなスタイルの微調整:

#include <stdio.h> 
#include <pthread.h> 
#include <semaphore.h> 
#include <stdlib.h> 
#include <signal.h> 

#define MAX_THREADS 5 
#define BUFFER_SIZE 10 

sem_t *empty, *full, *mutex; 

int buffer[BUFFER_SIZE]; 
int in = 0, out = 0; 

static volatile int keepRunning = 1; 

void intHandler(int dummy) { 
    keepRunning = 0; 
} 

void *producer(void * id_ptr) { 
    int ID = *((int *) id_ptr); 
    static int nextProduced = 0; 

    while (keepRunning) { 

     (void) sem_wait(empty); 
     (void) sem_wait(mutex); 

     /* Check to see if Overwriting unread slot */ 
     if (buffer[in] != -1) { 
      fprintf(stderr, "Synchronization Error: Producer %d Just overwrote %d from Slot %d\n", ID, buffer[in], in); 
      exit(1); 
     } 

     nextProduced++; // Producing Integers 

     /* Looks like we are OK */ 
     buffer[in] = nextProduced; 
     printf("Producer %d. Put %d in slot %d\n", ID, nextProduced, in); 
     in = (in + 1) % BUFFER_SIZE; 
     printf("incremented in!\n"); 

     (void) sem_post(mutex); 
     (void) sem_post(full); 
    } 

    return NULL; 
} 

void *consumer (void *id_ptr) { 
    int ID = *((int *) id_ptr); 
    static int nextConsumed = 0; 

    while (keepRunning) { 

     (void) sem_wait(full); 
     (void) sem_wait(mutex); 

     nextConsumed = buffer[out]; 

     /* Check to make sure we did not read from an empty slot */ 
     if (nextConsumed == -1) { 
      fprintf(stderr, "Synch Error: Consumer %d Just Read from empty slot %d\n", ID, out); 
      exit(1); 
     } 

     /* We must be OK */ 
     printf("Consumer %d Just consumed item %d from slot %d\n", ID, nextConsumed, out); 
     buffer[out] = -1; 
     out = (out + 1) % BUFFER_SIZE; 
     printf("incremented out!\n"); 

     (void) sem_post(mutex); 
     (void) sem_post(empty); 
    } 

    return NULL; 
} 

int main() { 
    int ID[MAX_THREADS]; 
    pthread_t TID[MAX_THREADS]; 

    empty = sem_open("/empty", O_CREAT, 0644, 10); 
    full = sem_open("/full", O_CREAT, 0644, 0); 
    mutex = sem_open("/mutex", O_CREAT, 0644, 1); 

    signal(SIGINT, intHandler); 

    for (int i = 0; i < MAX_THREADS; i++) { 
     ID[i] = i; 
    } 

    for (int i = 0; i < BUFFER_SIZE; i++) { 
     buffer[i] = -1; 
    } 

    pthread_create(&TID[0], NULL, producer, (void *) &ID[0]); 
    printf("Producer ID = %d created!\n", 0); 
    pthread_create(&TID[1], NULL, consumer, (void *) &ID[1]); 
    printf("Consumer ID = %d created!\n", 1); 

    pthread_create(&TID[2], NULL, producer, (void *) &ID[2]); 
    printf("Producer ID = %d created!\n", 2); 
    pthread_create(&TID[3], NULL, consumer, (void *) &ID[3]); 
    printf("Consumer ID = %d created!\n", 3); 

    for (int i = 0; i < 4; i++) { 
     pthread_join(TID[i], NULL); 
    } 

    (void) sem_unlink("/empty"); 
    (void) sem_unlink("/full"); 
    (void) sem_unlink("/mutex"); 

    return 0; 
} 

OUTPUT

> ./a.out 
Producer ID = 0 created! 
Producer 0. Put 1 in slot 0 
Consumer ID = 1 created! 
incremented in! 
Producer ID = 2 created! 
Producer 0. Put 2 in slot 1 
incremented in! 
Producer 2. Put 3 in slot 2 
incremented in! 
Consumer ID = 3 created! 
Producer 0. Put 4 in slot 3 
incremented in! 
Consumer 1 Just consumed item 1 from slot 0 
incremented out! 
Consumer 3 Just consumed item 2 from slot 1 
incremented out! 
Producer 2. Put 5 in slot 4 
incremented in! 
Producer 0. Put 6 in slot 5 
incremented in! 
Consumer 1 Just consumed item 3 from slot 2 
incremented out! 
Consumer 3 Just consumed item 4 from slot 3 
incremented out! 
Producer 2. Put 7 in slot 6 
incremented in! 
Producer 0. Put 8 in slot 7 
incremented in! 
Consumer 1 Just consumed item 5 from slot 4 
incremented out! 
Consumer 3 Just consumed item 6 from slot 5 
... 
関連する問題