私はマルチスレッドに精通しており、JavaとObjective-Cで多くのマルチスレッドプログラムを開発しました。 )参加なしでマネージャ/ワーカーpthreadを同期するにはどうすればいいですか?
メインで(::私は上記のコードに次のように達成したいと思います
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_OF_THREADS 2
struct thread_data {
int start;
int end;
int *arr;
};
void print(int *ints, int n);
void *processArray(void *args);
int main(int argc, const char * argv[])
{
int numOfInts = 10;
int *ints = malloc(numOfInts * sizeof(int));
for (int i = 0; i < numOfInts; i++) {
ints[i] = i;
}
print(ints, numOfInts); // prints [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pthread_t threads[NUM_OF_THREADS];
struct thread_data thread_data[NUM_OF_THREADS];
// these vars are used to calculate the index ranges for each thread
int remainingWork = numOfInts, amountOfWork;
int startRange, endRange = -1;
for (int i = 0; i < NUM_OF_THREADS; i++) {
amountOfWork = remainingWork/(NUM_OF_THREADS - i);
startRange = endRange + 1;
endRange = startRange + amountOfWork - 1;
thread_data[i].arr = ints;
thread_data[i].start = startRange;
thread_data[i].end = endRange;
pthread_create(&threads[i], NULL, processArray, (void *)&thread_data[i]);
remainingWork -= amountOfWork;
}
// 1. Signal to the threads to start working
// 2. Wait for them to finish
print(ints, numOfInts); // should print [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
free(ints);
return 0;
}
void *processArray(void *args)
{
struct thread_data *data = (struct thread_data *)args;
int *arr = data->arr;
int start = data->start;
int end = data->end;
// 1. Wait for a signal to start from the main thread
for (int i = start; i <= end; i++) {
arr[i] = arr[i] + 1;
}
// 2. Signal to the main thread that you're done
pthread_exit(NULL);
}
void print(int *ints, int n)
{
printf("[");
for (int i = 0; i < n; i++) {
printf("%d", ints[i]);
if (i+1 != n)
printf(", ");
}
printf("]\n");
}
:しかし、私はメインスレッドから参加を使用せずにpthreadsを使用してCに次のように達成することができませんでし
を- スレッドが動作を開始するように信号を送ります。
- バックグラウンドスレッドが終了するまで待ちます。 processArrayで
():私はしませんあなたが
を行っているメインスレッドにメインスレッド
- 待ちthe real applicationでメインスレッドが一度スレッドを作成し、その後バックグラウンドスレッドに何度も動作するように通知するため、メインスレッドで結合を使用したい場合は、すべてのバックグラウンド〜 eadsは作業を終了しました。私は、CおよびPOSIX APIに新たなんだ
void *processArray(void *args) { struct thread_data *data = (struct thread_data *)args; while (1) { // 1. Wait for a signal to start from the main thread int *arr = data->arr; int start = data->start; int end = data->end; // Process for (int i = start; i <= end; i++) { arr[i] = arr[i] + 1; } // 2. Signal to the main thread that you're done } pthread_exit(NULL); }
注意をので、私は何かを明らかに欠けている場合は恐れ入ります:
processArray
機能では、私は次のように無限ループを配置します。しかし、私は実際にミューテックスやセマフォの配列から始めて、多くのことを試してみました。私は条件変数が助けになると思いますが、どのように使用できるのか理解できませんでした。お時間をいただきありがとうございます。
問題は解決:
はそんなに皆さんありがとうございました!私は最終的に、これをあなたのヒントに従って参加を使わずに、安全に作業できるようにしました。ソリューションは多少醜いですが、それは仕事を完了させ、パフォーマンスの向上は価値があります(下記参照)。興味のある人にとって、これはメインスレッドがバックグラウンドスレッドに連続的に仕事を与え続けている私が働いている実際のアプリケーションのシミュレーションです:
pthread_cancel
がために十分である場合#include <stdio.h> #include <stdlib.h> #include <pthread.h> #define NUM_OF_THREADS 5 struct thread_data { int id; int start; int end; int *arr; }; pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t currentlyIdleCond = PTHREAD_COND_INITIALIZER; int currentlyIdle; pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t workReadyCond = PTHREAD_COND_INITIALIZER; int workReady; pthread_cond_t currentlyWorkingCond = PTHREAD_COND_INITIALIZER; pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER; int currentlyWorking; pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t canFinishCond = PTHREAD_COND_INITIALIZER; int canFinish; void print(int *ints, int n); void *processArray(void *args); int validateResult(int *ints, int num, int start); int main(int argc, const char * argv[]) { int numOfInts = 10; int *ints = malloc(numOfInts * sizeof(int)); for (int i = 0; i < numOfInts; i++) { ints[i] = i; } // print(ints, numOfInts); pthread_t threads[NUM_OF_THREADS]; struct thread_data thread_data[NUM_OF_THREADS]; workReady = 0; canFinish = 0; currentlyIdle = 0; currentlyWorking = 0; // these vars are used to calculate the index ranges for each thread int remainingWork = numOfInts, amountOfWork; int startRange, endRange = -1; // Create the threads and give each one its data struct. for (int i = 0; i < NUM_OF_THREADS; i++) { amountOfWork = remainingWork/(NUM_OF_THREADS - i); startRange = endRange + 1; endRange = startRange + amountOfWork - 1; thread_data[i].id = i; thread_data[i].arr = ints; thread_data[i].start = startRange; thread_data[i].end = endRange; pthread_create(&threads[i], NULL, processArray, (void *)&thread_data[i]); remainingWork -= amountOfWork; } int loops = 1111111; int expectedStartingValue = ints[0] + loops; // used to validate the results // The elements in ints[] should be incremented by 1 in each loop while (loops-- != 0) { // Make sure all of them are ready pthread_mutex_lock(¤tlyIdleMutex); while (currentlyIdle != NUM_OF_THREADS) { pthread_cond_wait(¤tlyIdleCond, ¤tlyIdleMutex); } pthread_mutex_unlock(¤tlyIdleMutex); // All threads are now blocked; it's safe to not lock the mutex. // Prevent them from finishing before authorized. canFinish = 0; // Reset the number of currentlyWorking threads currentlyWorking = NUM_OF_THREADS; // Signal to the threads to start pthread_mutex_lock(&workReadyMutex); workReady = 1; pthread_cond_broadcast(&workReadyCond); pthread_mutex_unlock(&workReadyMutex); // Wait for them to finish pthread_mutex_lock(¤tlyWorkingMutex); while (currentlyWorking != 0) { pthread_cond_wait(¤tlyWorkingCond, ¤tlyWorkingMutex); } pthread_mutex_unlock(¤tlyWorkingMutex); // The threads are now waiting for permission to finish // Prevent them from starting again workReady = 0; currentlyIdle = 0; // Allow them to finish pthread_mutex_lock(&canFinishMutex); canFinish = 1; pthread_cond_broadcast(&canFinishCond); pthread_mutex_unlock(&canFinishMutex); } // print(ints, numOfInts); if (validateResult(ints, numOfInts, expectedStartingValue)) { printf("Result correct.\n"); } else { printf("Result invalid.\n"); } // clean up for (int i = 0; i < NUM_OF_THREADS; i++) { pthread_cancel(threads[i]); } free(ints); return 0; } void *processArray(void *args) { struct thread_data *data = (struct thread_data *)args; int *arr = data->arr; int start = data->start; int end = data->end; while (1) { // Set yourself as idle and signal to the main thread, when all threads are idle main will start pthread_mutex_lock(¤tlyIdleMutex); currentlyIdle++; pthread_cond_signal(¤tlyIdleCond); pthread_mutex_unlock(¤tlyIdleMutex); // wait for work from main pthread_mutex_lock(&workReadyMutex); while (!workReady) { pthread_cond_wait(&workReadyCond , &workReadyMutex); } pthread_mutex_unlock(&workReadyMutex); // Do the work for (int i = start; i <= end; i++) { arr[i] = arr[i] + 1; } // mark yourself as finished and signal to main pthread_mutex_lock(¤tlyWorkingMutex); currentlyWorking--; pthread_cond_signal(¤tlyWorkingCond); pthread_mutex_unlock(¤tlyWorkingMutex); // Wait for permission to finish pthread_mutex_lock(&canFinishMutex); while (!canFinish) { pthread_cond_wait(&canFinishCond , &canFinishMutex); } pthread_mutex_unlock(&canFinishMutex); } pthread_exit(NULL); } int validateResult(int *ints, int n, int start) { int tmp = start; for (int i = 0; i < n; i++, tmp++) { if (ints[i] != tmp) { return 0; } } return 1; } void print(int *ints, int n) { printf("["); for (int i = 0; i < n; i++) { printf("%d", ints[i]); if (i+1 != n) printf(", "); } printf("]\n"); }
は私もよく分かりません掃除!障壁に関しては、@Jeremyで言及されているようないくつかのOSに限定されていなければ、大きな助けになっていました。
ベンチマーク:
私は、これらの多くの条件が実際にアルゴリズムを遅くしていないことを確認したかったので、私は2つのソリューションを比較するためのセットアップこのベンチマークました:
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> #include <sys/time.h> #include <sys/resource.h> #define NUM_OF_THREADS 5 struct thread_data { int start; int end; int *arr; }; pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t currentlyIdleCond = PTHREAD_COND_INITIALIZER; int currentlyIdle; pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t workReadyCond = PTHREAD_COND_INITIALIZER; int workReady; pthread_cond_t currentlyWorkingCond = PTHREAD_COND_INITIALIZER; pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER; int currentlyWorking; pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t canFinishCond = PTHREAD_COND_INITIALIZER; int canFinish; void *processArrayMutex(void *args); void *processArrayJoin(void *args); double doItWithMutex(pthread_t *threads, struct thread_data *data, int loops); double doItWithJoin(pthread_t *threads, struct thread_data *data, int loops); int main(int argc, const char * argv[]) { int numOfInts = 10; int *join_ints = malloc(numOfInts * sizeof(int)); int *mutex_ints = malloc(numOfInts * sizeof(int)); for (int i = 0; i < numOfInts; i++) { join_ints[i] = i; mutex_ints[i] = i; } pthread_t join_threads[NUM_OF_THREADS]; pthread_t mutex_threads[NUM_OF_THREADS]; struct thread_data join_thread_data[NUM_OF_THREADS]; struct thread_data mutex_thread_data[NUM_OF_THREADS]; workReady = 0; canFinish = 0; currentlyIdle = 0; currentlyWorking = 0; int remainingWork = numOfInts, amountOfWork; int startRange, endRange = -1; for (int i = 0; i < NUM_OF_THREADS; i++) { amountOfWork = remainingWork/(NUM_OF_THREADS - i); startRange = endRange + 1; endRange = startRange + amountOfWork - 1; join_thread_data[i].arr = join_ints; join_thread_data[i].start = startRange; join_thread_data[i].end = endRange; mutex_thread_data[i].arr = mutex_ints; mutex_thread_data[i].start = startRange; mutex_thread_data[i].end = endRange; pthread_create(&mutex_threads[i], NULL, processArrayMutex, (void *)&mutex_thread_data[i]); remainingWork -= amountOfWork; } int numOfBenchmarkTests = 100; int numberOfLoopsPerTest= 1000; double join_sum = 0.0, mutex_sum = 0.0; for (int i = 0; i < numOfBenchmarkTests; i++) { double joinTime = doItWithJoin(join_threads, join_thread_data, numberOfLoopsPerTest); double mutexTime= doItWithMutex(mutex_threads, mutex_thread_data, numberOfLoopsPerTest); join_sum += joinTime; mutex_sum+= mutexTime; } double join_avg = join_sum/numOfBenchmarkTests; double mutex_avg= mutex_sum/numOfBenchmarkTests; printf("Join average : %f\n", join_avg); printf("Mutex average: %f\n", mutex_avg); double diff = join_avg - mutex_avg; if (diff > 0.0) printf("Mutex is %.0f%% faster.\n", 100 * diff/join_avg); else if (diff < 0.0) printf("Join is %.0f%% faster.\n", 100 * diff/mutex_avg); else printf("Both have the same performance."); free(join_ints); free(mutex_ints); return 0; } // From https://stackoverflow.com/a/2349941/408286 double get_time() { struct timeval t; struct timezone tzp; gettimeofday(&t, &tzp); return t.tv_sec + t.tv_usec*1e-6; } double doItWithMutex(pthread_t *threads, struct thread_data *data, int num_loops) { double start = get_time(); int loops = num_loops; while (loops-- != 0) { // Make sure all of them are ready pthread_mutex_lock(¤tlyIdleMutex); while (currentlyIdle != NUM_OF_THREADS) { pthread_cond_wait(¤tlyIdleCond, ¤tlyIdleMutex); } pthread_mutex_unlock(¤tlyIdleMutex); // All threads are now blocked; it's safe to not lock the mutex. // Prevent them from finishing before authorized. canFinish = 0; // Reset the number of currentlyWorking threads currentlyWorking = NUM_OF_THREADS; // Signal to the threads to start pthread_mutex_lock(&workReadyMutex); workReady = 1; pthread_cond_broadcast(&workReadyCond); pthread_mutex_unlock(&workReadyMutex); // Wait for them to finish pthread_mutex_lock(¤tlyWorkingMutex); while (currentlyWorking != 0) { pthread_cond_wait(¤tlyWorkingCond, ¤tlyWorkingMutex); } pthread_mutex_unlock(¤tlyWorkingMutex); // The threads are now waiting for permission to finish // Prevent them from starting again workReady = 0; currentlyIdle = 0; // Allow them to finish pthread_mutex_lock(&canFinishMutex); canFinish = 1; pthread_cond_broadcast(&canFinishCond); pthread_mutex_unlock(&canFinishMutex); } return get_time() - start; } double doItWithJoin(pthread_t *threads, struct thread_data *data, int num_loops) { double start = get_time(); int loops = num_loops; while (loops-- != 0) { // create them for (int i = 0; i < NUM_OF_THREADS; i++) { pthread_create(&threads[i], NULL, processArrayJoin, (void *)&data[i]); } // wait for (int i = 0; i < NUM_OF_THREADS; i++) { pthread_join(threads[i], NULL); } } return get_time() - start; } void *processArrayMutex(void *args) { struct thread_data *data = (struct thread_data *)args; int *arr = data->arr; int start = data->start; int end = data->end; while (1) { // Set yourself as idle and signal to the main thread, when all threads are idle main will start pthread_mutex_lock(¤tlyIdleMutex); currentlyIdle++; pthread_cond_signal(¤tlyIdleCond); pthread_mutex_unlock(¤tlyIdleMutex); // wait for work from main pthread_mutex_lock(&workReadyMutex); while (!workReady) { pthread_cond_wait(&workReadyCond , &workReadyMutex); } pthread_mutex_unlock(&workReadyMutex); // Do the work for (int i = start; i <= end; i++) { arr[i] = arr[i] + 1; } // mark yourself as finished and signal to main pthread_mutex_lock(¤tlyWorkingMutex); currentlyWorking--; pthread_cond_signal(¤tlyWorkingCond); pthread_mutex_unlock(¤tlyWorkingMutex); // Wait for permission to finish pthread_mutex_lock(&canFinishMutex); while (!canFinish) { pthread_cond_wait(&canFinishCond , &canFinishMutex); } pthread_mutex_unlock(&canFinishMutex); } pthread_exit(NULL); } void *processArrayJoin(void *args) { struct thread_data *data = (struct thread_data *)args; int *arr = data->arr; int start = data->start; int end = data->end; // Do the work for (int i = start; i <= end; i++) { arr[i] = arr[i] + 1; } pthread_exit(NULL); }
を出力は:
Join average : 0.153074 Mutex average: 0.071588 Mutex is 53% faster.
もう一度ありがとうございます。私は本当にあなたの助けに感謝します!
申し訳ありませんが、あなたの質問を慎重に読んだことはありません。コメント番号2にリエントラントバリアを実装する必要があります。http://pcbec3.ihep.su/~miagkov/code/barrier.c – nhahtdh
@nhahtdh心配しないでください。私はコードを見てみましょう。ありがとう! – Motasim
Michael Burrはこれを行うための既存の機能を提案しました。 (私はPOSIXがこれを定義していないと思っていましたが、それはすべてそこにあります)。 – nhahtdh