信号量和管程

信号量

信号量是Dijkstra大牛发明的概念,用来协调并发程序对共享资源的访问,信号量包含一个整形变量c和两个操作,PV操作,分别对应wait和signal,wait操作对c-1,表示占用资源,如果c<0,那么就阻塞,signal对c+1,表示释放资源。信号量在并发编程中很有用,比如我现在想要多线程并发处理一个任务,那么怎么控制同时能够运行的线程数呢,这里就可以用信号量来实现,如果要使用n个线程并发,就把信号量的值设置为n。

管程

管程是编程语言提供的一种抽象的数据结构,用于多线程互斥访问共享资源,管程和信号量不同的是,进入了临界区后,管程可以不等待代码执行完就放弃控制权,这是属性是锁,信号量等等同步机制没有的。我们不禁要想,怎么才能实现这样的机制呢,对临界区代码控制之后如果不满足自己的条件,仍然可以放弃控制权,把资源释放出来?自然就想到了条件变量,我们先对一个临界区上锁,然后通过条件变量判断,如果获取失败,我们可以通过signal释放资源,管程的编程范式如下:

/** 这是一个等待的线程 **/
lock(mutex);
// 如果条件不满足就要等待
// 这里为什么使用while而不是if????
while (!p) {
	// wait操作将当前线程阻塞在等待队列中,释放对管程的互斥访问
	condition_varaible.wait(mutex);
}
unlock(mutex);

/** 唤醒线程 **/
lock(mutex)
// 让p为true
// 通知阻塞线程继续执行
mutex.signal();
unlock(mutex);

上面的代码可能大家看到了问题所在,第一个等待的线程已经将互斥量lock了,唤醒线程怎么能再lock呢?的确是,互斥量同时只能被一个线程占用,但是wait操作的时候,就已经对mutex进行的释放,可以将wait函数协程下面的伪代码:

condition::wait(mutex& mut) {
	unlock(mutex);
	// 挂起当前线程
	yield_current_thread();
	lock(mutex);
}

刚才上面提到了一个问题,第一个等待线程的代码中,为什么使用while而不是if呢,这里就涉及了管程的两种模式,一种使用while的叫做hansen模式,一种使用if称之为hoare模式,这两种模式,具体来说就是调用signal操作的时候,是继续在当前线程执行,还是切换到被唤醒的线程执行。这也很容易理解,signal之后线程1从刚才wait阻塞的地方继续执行,如果是while就会再对条件进行一次判断。

生产者消费者问题

生产者消费者问题是非常经典的同步模型,在编程中也有很广泛的应用。生产者消费者问题描述如下:一个或者多个生产者往消费队列里面插入数据,单个消费者从消息队列里面取出数据处理,并且对消息队列的操作必须是互斥的,分析可知需要解决三个同步问题:

  1. 对消息队列的操作必须是互斥的,需要加锁(当然也可以使用无锁队列,原则上也是一种访问控制)
  2. 消息队列中没有数据时,消费者需要等待生产者产生数据,这就是条件同步。
  3. 消息队列满时,生产者需要等到消费者消费数据,这也是使用条件同步

下面给出信号量实现

#include <stdio.h>
#include <pthread.h>
#include <time.h>
#include <stdlib.h>
#include <unistd.h>
#include <semaphore.h>

#define BUFF_SIZE  3
#define PRODUCE_THREAD_SIZE 5
int g_buff[BUFF_SIZE];
int g_write_index = 0;
int g_read_index = 0;

sem_t lock;
sem_t consume_sem, produce_sem;


void* produce(void *ptr){
    int idx = *(int*)ptr;
    printf("in produce %d %d %dn",idx, g_write_index, g_read_index);
    while(1){
        sem_wait(&produce_sem); # 限制了生产者并发的数目

        sem_wait(&lock);     # 对临界区的访问要加锁
        g_buff[g_write_index] = idx;
        g_write_index = (g_write_index + 1) % BUFF_SIZE;
        sem_post(&lock);

        sem_post(&consume_sem);
    }
    return NULL;
}

void* consume(void *ptr){
    while(1){
        sem_wait(&consume_sem);
        sem_wait(&lock);
        int data = g_buff[g_read_index];
        g_buff[g_read_index] = -1;
        g_read_index = (g_read_index + 1) % BUFF_SIZE;
        printf("consume %d %d %dn", data, g_read_index, g_write_index);
        sem_post(&lock);
        sem_post(&produce_sem);
    }
    return NULL;
}

int main(int argc, char * argv[]){
    pthread_t con;
    pthread_t pros[PRODUCE_THREAD_SIZE];
    sem_init(&lock, 0, 1);
    sem_init(&consume_sem,0, 0);
    sem_init(&produce_sem,0, BUFF_SIZE);

    pthread_create(&con, NULL, consume, NULL);
    int thread_args[PRODUCE_THREAD_SIZE];
    for(int i = 0; i < PRODUCE_THREAD_SIZE; i++){
        thread_args[i] = i + 1;
        pthread_create(&pros[i], NULL, produce, (thread_args + i));
    }

    pthread_join(con,0);
    for(int i = 0; i < PRODUCE_THREAD_SIZE; i++)
        pthread_join(pros[i],0);

    sem_destroy(&lock);
    sem_destroy(&consume_sem);
    sem_destroy(&produce_sem);

    return 0;
}

管程实现

生产者消费者问题用管程(互斥锁+条件变量)来实现也很简单,pthread已经帮我们实现好了,就是pthread_mutex和pthread_cond函数,代码如下:

#include <stdio.h>
#include <pthread.h>
#include <time.h>
#include <stdlib.h>
#include <unistd.h>


#define BUFF_SIZE  3
#define PRODUCE_THREAD_SIZE 5
int g_buff[BUFF_SIZE];
int g_write_index = 0;
int g_read_index = 0;

pthread_mutex_t lock;
pthread_cond_t consume_cond, produce_cond;


void* produce(void *ptr){
    int idx = *(int*)ptr;
    printf("in produce %d %d %dn",idx, g_write_index, g_read_index);
    while(1){
        pthread_mutex_lock(&lock);
        while((g_write_index + 1) % BUFF_SIZE == g_read_index)
            pthread_cond_wait(&produce_cond, &lock);

        g_buff[g_write_index] = idx;
        g_write_index = (g_write_index + 1) % BUFF_SIZE;

        pthread_cond_signal(&consume_cond);
        pthread_mutex_unlock(&lock);

    }
    return NULL;
}

void* consume(void *ptr){
    while(1){
        pthread_mutex_lock(&lock);
        while(g_read_index == g_write_index)
             pthread_cond_wait(&consume_cond, &lock);

        int data = g_buff[g_read_index];
        g_buff[g_read_index] = -1;
        g_read_index = (g_read_index + 1) % BUFF_SIZE;
        printf("consume %dn", data);

        pthread_cond_signal(&produce_cond);
        pthread_mutex_unlock(&lock);
    }
    return NULL;
}

int main(int argc, char * argv[]){
    pthread_t con;
    pthread_t pros[PRODUCE_THREAD_SIZE];

    srand((unsigned)time(NULL));
    pthread_mutex_init(&lock, 0);
    pthread_cond_init(&consume_cond,0);
    pthread_cond_init(&produce_cond,0);

    pthread_create(&con, NULL, consume, NULL);
    int thread_args[PRODUCE_THREAD_SIZE];
    for(int i = 0; i < PRODUCE_THREAD_SIZE; i++){
        thread_args[i] = i + 1;
        pthread_create(&pros[i], NULL, produce, (thread_args + i));
    }

    pthread_join(con,0);
    for(int i = 0; i < PRODUCE_THREAD_SIZE; i++)
        pthread_join(pros[i],0);

    pthread_mutex_destroy(&lock);
    pthread_cond_destroy(&consume_cond);
    pthread_cond_destroy(&produce_cond);

    return 0;
}