
信号量
信号量是Dijkstra大牛发明的概念,用来协调并发程序对共享资源的访问,信号量包含一个整形变量c和两个操作,PV操作,分别对应wait和signal,wait操作对c-1,表示占用资源,如果c<0,那么就阻塞,signal对c+1,表示释放资源。信号量在并发编程中很有用,比如我现在想要多线程并发处理一个任务,那么怎么控制同时能够运行的线程数呢,这里就可以用信号量来实现,如果要使用n个线程并发,就把信号量的值设置为n。
管程
管程是编程语言提供的一种抽象的数据结构,用于多线程互斥访问共享资源,管程和信号量不同的是,进入了临界区后,管程可以不等待代码执行完就放弃控制权,这是属性是锁,信号量等等同步机制没有的。我们不禁要想,怎么才能实现这样的机制呢,对临界区代码控制之后如果不满足自己的条件,仍然可以放弃控制权,把资源释放出来?自然就想到了条件变量,我们先对一个临界区上锁,然后通过条件变量判断,如果获取失败,我们可以通过signal释放资源,管程的编程范式如下:
/** 这是一个等待的线程 **/
lock(mutex);
// 如果条件不满足就要等待
// 这里为什么使用while而不是if????
while (!p) {
// wait操作将当前线程阻塞在等待队列中,释放对管程的互斥访问
condition_varaible.wait(mutex);
}
unlock(mutex);
/** 唤醒线程 **/
lock(mutex)
// 让p为true
// 通知阻塞线程继续执行
mutex.signal();
unlock(mutex);
上面的代码可能大家看到了问题所在,第一个等待的线程已经将互斥量lock了,唤醒线程怎么能再lock呢?的确是,互斥量同时只能被一个线程占用,但是wait操作的时候,就已经对mutex进行的释放,可以将wait函数协程下面的伪代码:
condition::wait(mutex& mut) {
unlock(mutex);
// 挂起当前线程
yield_current_thread();
lock(mutex);
}
刚才上面提到了一个问题,第一个等待线程的代码中,为什么使用while而不是if呢,这里就涉及了管程的两种模式,一种使用while的叫做hansen模式,一种使用if称之为hoare模式,这两种模式,具体来说就是调用signal操作的时候,是继续在当前线程执行,还是切换到被唤醒的线程执行。这也很容易理解,signal之后线程1从刚才wait阻塞的地方继续执行,如果是while就会再对条件进行一次判断。
生产者消费者问题
生产者消费者问题是非常经典的同步模型,在编程中也有很广泛的应用。生产者消费者问题描述如下:一个或者多个生产者往消费队列里面插入数据,单个消费者从消息队列里面取出数据处理,并且对消息队列的操作必须是互斥的,分析可知需要解决三个同步问题:
- 对消息队列的操作必须是互斥的,需要加锁(当然也可以使用无锁队列,原则上也是一种访问控制)
- 消息队列中没有数据时,消费者需要等待生产者产生数据,这就是条件同步。
- 消息队列满时,生产者需要等到消费者消费数据,这也是使用条件同步
下面给出信号量实现
#include <stdio.h>
#include <pthread.h>
#include <time.h>
#include <stdlib.h>
#include <unistd.h>
#include <semaphore.h>
#define BUFF_SIZE 3
#define PRODUCE_THREAD_SIZE 5
int g_buff[BUFF_SIZE];
int g_write_index = 0;
int g_read_index = 0;
sem_t lock;
sem_t consume_sem, produce_sem;
void* produce(void *ptr){
int idx = *(int*)ptr;
printf("in produce %d %d %dn",idx, g_write_index, g_read_index);
while(1){
sem_wait(&produce_sem); # 限制了生产者并发的数目
sem_wait(&lock); # 对临界区的访问要加锁
g_buff[g_write_index] = idx;
g_write_index = (g_write_index + 1) % BUFF_SIZE;
sem_post(&lock);
sem_post(&consume_sem);
}
return NULL;
}
void* consume(void *ptr){
while(1){
sem_wait(&consume_sem);
sem_wait(&lock);
int data = g_buff[g_read_index];
g_buff[g_read_index] = -1;
g_read_index = (g_read_index + 1) % BUFF_SIZE;
printf("consume %d %d %dn", data, g_read_index, g_write_index);
sem_post(&lock);
sem_post(&produce_sem);
}
return NULL;
}
int main(int argc, char * argv[]){
pthread_t con;
pthread_t pros[PRODUCE_THREAD_SIZE];
sem_init(&lock, 0, 1);
sem_init(&consume_sem,0, 0);
sem_init(&produce_sem,0, BUFF_SIZE);
pthread_create(&con, NULL, consume, NULL);
int thread_args[PRODUCE_THREAD_SIZE];
for(int i = 0; i < PRODUCE_THREAD_SIZE; i++){
thread_args[i] = i + 1;
pthread_create(&pros[i], NULL, produce, (thread_args + i));
}
pthread_join(con,0);
for(int i = 0; i < PRODUCE_THREAD_SIZE; i++)
pthread_join(pros[i],0);
sem_destroy(&lock);
sem_destroy(&consume_sem);
sem_destroy(&produce_sem);
return 0;
}
管程实现
生产者消费者问题用管程(互斥锁+条件变量)来实现也很简单,pthread已经帮我们实现好了,就是pthread_mutex和pthread_cond函数,代码如下:
#include <stdio.h>
#include <pthread.h>
#include <time.h>
#include <stdlib.h>
#include <unistd.h>
#define BUFF_SIZE 3
#define PRODUCE_THREAD_SIZE 5
int g_buff[BUFF_SIZE];
int g_write_index = 0;
int g_read_index = 0;
pthread_mutex_t lock;
pthread_cond_t consume_cond, produce_cond;
void* produce(void *ptr){
int idx = *(int*)ptr;
printf("in produce %d %d %dn",idx, g_write_index, g_read_index);
while(1){
pthread_mutex_lock(&lock);
while((g_write_index + 1) % BUFF_SIZE == g_read_index)
pthread_cond_wait(&produce_cond, &lock);
g_buff[g_write_index] = idx;
g_write_index = (g_write_index + 1) % BUFF_SIZE;
pthread_cond_signal(&consume_cond);
pthread_mutex_unlock(&lock);
}
return NULL;
}
void* consume(void *ptr){
while(1){
pthread_mutex_lock(&lock);
while(g_read_index == g_write_index)
pthread_cond_wait(&consume_cond, &lock);
int data = g_buff[g_read_index];
g_buff[g_read_index] = -1;
g_read_index = (g_read_index + 1) % BUFF_SIZE;
printf("consume %dn", data);
pthread_cond_signal(&produce_cond);
pthread_mutex_unlock(&lock);
}
return NULL;
}
int main(int argc, char * argv[]){
pthread_t con;
pthread_t pros[PRODUCE_THREAD_SIZE];
srand((unsigned)time(NULL));
pthread_mutex_init(&lock, 0);
pthread_cond_init(&consume_cond,0);
pthread_cond_init(&produce_cond,0);
pthread_create(&con, NULL, consume, NULL);
int thread_args[PRODUCE_THREAD_SIZE];
for(int i = 0; i < PRODUCE_THREAD_SIZE; i++){
thread_args[i] = i + 1;
pthread_create(&pros[i], NULL, produce, (thread_args + i));
}
pthread_join(con,0);
for(int i = 0; i < PRODUCE_THREAD_SIZE; i++)
pthread_join(pros[i],0);
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&consume_cond);
pthread_cond_destroy(&produce_cond);
return 0;
}




近期评论