POSIX信号量
- 📖1. 信号量的定义
- 📖2. 二值信号量
- 📖3. 用信号量作为条件变量
- 📖4. 基于环形队列的生产者消费者模型
 
我们知道,需要锁和条件变量来解决各种相关的,有趣的并发问题, Dijkstra及其同事发明了信号量,作为与同步有关的所有工作的唯一原语,可以使用信号量作为锁和条件变量.
📖1. 信号量的定义
信号量是一个有整数值的对象,可以用两个函数来操作它在POSIX标准中,是sem_wait()和sem_post(). 因为信号量的初始值能够决定其行为,所以首先要初始化信号量,才能调用函数与之交互.
#include<semaphore.h>
sem_t s;
sem_init(&s, 0, 1);
我们定义了一个信号量,通过第三个参数,将它的值初始化为1.
sem_init()的第二个参数,设置为0,表示信号量是在同一进程的多个线程共享的.可以参考man手册,了解信号量的其他用法(如何用于跨不同进程的同步访问).
信号量初始化之后,我们可以调用sem_wait()和sem_post()与之交互:
int sem_wait(sem_t* s)
{
    //将信号量的值减1, 如果信号量s的值为负, 则等待
}
int sem_post(sem_t* s)
{
    //将信号量s的值加1, 如果有一个或多个线程在等待, 唤醒一个线程
}
📖2. 二值信号量
现在我们开始使用信号量,我们可以用信号量作为锁,直接把临界区用一对sem_wait()/sem_post()环绕:
int main()
{
    sem_t m;
    sem_init(&m, 0, 1);
    sem_wait(&m);
    //临界区
    sem_post(&m);
    return 0;
}
接下来,我们来分析分析,它是如何实现锁的功能的:


我们可以使用信号量来实现锁,因为锁只有两个状态(持有或没持有),所以这种用法有时也叫作二值信号量.
📖3. 用信号量作为条件变量
信号量也可以用在一个线程暂停执行,等待某一条件成立的场景. 在这种场景下,通常一个线程等待条件成立,另外一个线程修改条件并发信号给等待线程,从而唤醒等待线程,因为等待线程在等待某些条件发生变化,所以我们将信号量作为条件变量.
下面是一个简单例子,假设一个线程创建另一个线程,并且等待它结束:
sem_t s;
void* child(void* args)
{
    cout << "child" << endl;
    sem_post(&s);
    return nullptr;
}
int main()
{
    sem_init(&s, 0, 0);
    cout << "parent: begin" << endl;
    pthread_t t;
    pthread_create(&t, nullptr, child, nullptr);
    sem_wait(&s);
    cout << "parent: end" << endl;
    return 0;
}
由于两个线程执行时的时序问题,所以我们需要分析两种情况:

📖4. 基于环形队列的生产者消费者模型

 
代码实现如下:
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <semaphore.h>
using namespace std;
const int gCap = 10;
template<class T>
class RingQueue
{
public:
    RingQueue(int cap = gCap) : ringQueue_(cap), pIndex_(0), cIndex_(0)
    {
        //生产
        sem_init(&roomSem_, 0, ringQueue_.size());
        //消费
        sem_init(&dataSem_, 0, 0);
        pthread_mutex_init(&pmutex_, nullptr);
        pthread_mutex_init(&cmutex_, nullptr);
    }
    //生产
    void push(const T& in)
    {
        sem_wait(&roomSem_);
        pthread_mutex_lock(&pmutex_);
        ringQueue_[pIndex_] = in;
        pIndex_++;
        pIndex_ %= ringQueue_.size();
        pthread_mutex_unlock(&pmutex_);
        sem_post(&dataSem_);
    }
    //消费
    T pop()
    {
        sem_wait(&dataSem_);
        pthread_mutex_lock(&cmutex_);
        T temp = ringQueue_[cIndex_];
        cIndex_++;
        cIndex_ %= ringQueue_.size();
        pthread_mutex_unlock(&cmutex_);
        sem_post(&roomSem_);
        return temp;
    }
    ~RingQueue()
    {
        sem_destroy(&roomSem_);
        sem_destroy(&dataSem_);
        pthread_mutex_destroy(&pmutex_);
        pthread_mutex_destroy(&cmutex_);
    }
private:
    vector<T> ringQueue_; //环形队列
    sem_t roomSem_;
    sem_t dataSem_;
    uint32_t pIndex_;  //当前生产者写入的位置
    uint32_t cIndex_;  //当前消费者读取的位置
    pthread_mutex_t pmutex_;
    pthread_mutex_t cmutex_;
};
测试代码:
#include "RingQueue.hpp"
#include <ctime>
#include <unistd.h>
#include <pthread.h>
void* productor(void* args)
{
    RingQueue<int>* rqp = static_cast<RingQueue<int>* >(args);
    while(true)
    {
        int data = rand() % 10;
        rqp->push(data);
        cout << "pthread[" << pthread_self() << "]" << " 生产了一个数据: " << data << endl;
        sleep(1);
    }
}
void* consumer(void* args)
{
    RingQueue<int>* rqp = static_cast<RingQueue<int>* >(args);
    while(true)
    {
        int data = rqp->pop();
        cout << "pthread[" << pthread_self() <<"]" << " 消费了一个数据: " << data << endl;
    }
}
int main()
{
    srand((unsigned long)time(nullptr) ^ getpid());
    RingQueue<int> rq;
    pthread_t c1, c2, c3;
    pthread_t p1, p2, p3;
    pthread_create(&p1, nullptr, productor, &rq);
    pthread_create(&p2, nullptr, productor, &rq);
    pthread_create(&p3, nullptr, productor, &rq);
    pthread_create(&c1, nullptr, consumer, &rq);
    pthread_create(&c2, nullptr, consumer, &rq);
    pthread_create(&c3, nullptr, consumer, &rq);
    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(c3, nullptr);
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);
    pthread_join(p3, nullptr);
    return 0;
}
测试结果如下:



















