目录
1.回忆
Task.hpp
1. #pragma once
2. 头文件和命名空间
3. 类 CallTask
4. 操作符字符串
5. 回调函数 mymath
阻塞队列 BlockQueue 的实现
BlockQueue
生产者和消费者线程
生产者productor
消费者 consumer
主函数 main
代码整体说明
2. 信号量
2.1 回忆:多线程对锁的凌乱竞争✔️
2.2 信号量函数
1. 初始化信号量
2. 销毁信号量
3. 等待信号量(P操作)
4. 发布信号量(V操作)
CP 模型中的锁与信号量使用总结
3.基于环形队列的生产者消费者模型
1.回忆
生产的数据从哪里来?
用户,网络等
生产者生产的数据也是要花时间获取的!
- 获取数据
- 还有 生产数据到队列
同样的,消费者
- 消费数据
- 还有 加工处理数据
真正的高效:非临界区(获取/加工)之间高并发的同时进行
实现派发任务的代码的代码细节讲解
我们先把上一篇文章的代码拷过来解释一下,思路如下

下面这段代码定义了类 CallTask,并且展示了如何使用这些类来处理计算任务和保存任务。下面会逐行解释代码的各个部分。
Task.hpp
1. #pragma once
 
#pragma once- 这是一个预处理指令,用来防止头文件被多次包含,避免重复定义。
2. 头文件和命名空间
#include <iostream>
#include <functional>
#include <string>
using namespace std;- 包含了标准输入输出库、函数对象库和字符串库。
- 使用 std命名空间,以便在代码中直接使用标准库中的类型和函数。
3. 类 CallTask
 
class CallTask
{
    typedef function<int(int, int, char)> func_t;
public:
    CallTask() {}
    CallTask(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func)
    {
    }
    string operator()()
    {
        int result = _callback(_x, _y, _op);
        char buffer[1024];
        snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);
        return buffer;
    }//调用()实现对结果的打印
    string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};- CallTask类定义了一个计算任务。
- func_t是一个- typedef,定义了一个函数类型,它接受三个参数(两个整数和一个字符)并返回一个整数。
- CallTask的构造函数可以初始化两个整数- _x和- _y,一个字符操作符- _op,以及一个回调函数- _callback。
- operator()重载了- ()操作符,使得对象可以像函数一样被调用。调用时,它会执行- _callback函数(即用户定义的计算逻辑),并返回计算结果的字符串形式。
- toTaskString()返回一个描述任务的字符串,但不包含实际计算结果,仅展示操作符和操作数。
4. 操作符字符串
string oper = "+-*/%";- 定义了一个包含常见数学操作符的字符串。
5. 回调函数 mymath
 
int mymath(int x, int y, char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            cout << "div zero error" << endl;
            result = -1;
        }
        else
        {
            result = x / y;
        }
    }
    break;
    case '%':
    {
        if (y == 0)
        {
            cout << "mod zero error" << endl;
            result = -1;
        }
        else
        {
            result = x % y;
        }
    }
    break;
    default:
        break;
    }
    return result;
}- mymath函数根据- op操作符执行不同的数学运算。
- 如果 op是/或%并且y为 0,会输出错误信息并返回-1以表示出错。
阻塞队列 BlockQueue 的实现
 
进入线程,准备工作做完后,对线程进行判断处理
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
using namespace std;
const int maxcapacity = 5;- 这部分代码是 BlockQueue类的定义。#pragma once防止头文件被多次包含。
- maxcapacity定义了队列的最大容量。
BlockQueue
这两部分代码分别展示了一个多线程生产者-消费者模型的实现,其中使用了一个阻塞队列 BlockQueue 来管理生产者和消费者之间的任务流。下面我将详细解释每一部分的代码。
template <class T>
class BlockQueue
{
public:
    BlockQueue(const int& capacity = maxcapacity)
        : _capacity(capacity)
    {
        // 构造时初始化
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }
    void push(const T& in)
    {
        pthread_mutex_lock(&_mutex);
        while (is_full())
        {
            pthread_cond_wait(&_pcond, &_mutex);
        }
        _q.push(in);
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);
    }
    void pop(T* out)
    {
        pthread_mutex_lock(&_mutex);
        while (is_empty())
        {
            pthread_cond_wait(&_ccond, &_mutex);
        }
        *out = _q.front();
        _q.pop();
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    bool is_full()
    {
        return _q.size() == _capacity;
    }
    bool is_empty()
    {
        return _q.empty();
    }
private:
    queue<T> _q;
    int _capacity;
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond;
    pthread_cond_t _ccond;
};- BlockQueue是一个模板类,使用标准库的- queue来存储队列元素。
- _mutex是一个互斥锁,用于保护队列的并发访问。
- _pcond和- _ccond是条件变量,用于协调生产者和消费者的等待与唤醒。
- push方法用于将元素放入队列。如果队列满了,生产者会等待,直到队列有空余位置。
- pop方法用于从队列中取出元素。如果队列为空,消费者会等待,直到队列有元素可取。
- 析构函数负责销毁互斥锁和条件变量。
生产者和消费者线程
#include "BlockQueue.hpp"
#include <ctime>
#include <unistd.h>
#include "Task.hpp"- 这部分代码包含了阻塞队列 BlockQueue和任务类CallTask的头文件,并引入了ctime用于时间相关操作和unistd.h用于线程的休眠。
生产者productor
 
void* productor(void* args)
{
    BlockQueue<Task>* _c_bq = static_cast<BlockQueue<CallTask>*>(args);
    while (true)
    {
        // 生产活动
        int x = rand() % 10 + 1;
        int y = rand() % 5;
        char op = oper[rand() % oper.size()];
        Task t(x, y, op, mymath);
        _c_bq->push(t);
        cout << "productor thread, 生产计算任务: " << t.toTaskString() << endl;
        sleep(1); // 生产的慢一些
    }
}- 该函数是生产者线程的执行体。
- 生产者从 args参数中获得阻塞队列_c_bq的指针,然后进入一个无限循环。
- 在循环中,生产者随机生成两个整数 x和y以及一个操作符op,创建一个CallTask对象t,并将其推入阻塞队列。
- 每生成一个任务后,生产者线程输出一个消息,显示任务的描述,并且线程会休眠1秒,以降低生产速度。
消费者 consumer
 
void* consumer(void* args)
{
    BlockQueue<Task>* _c_bq = static_cast<BlockQueue<CallTask>*>(args);
    while (true)
    {
        // 消费活动
        Task t;
        _c_bq->pop(&t);
        cout << "cal thread, 完成计算任务: " << t() << endl;
    }
}- 该函数是消费者线程的执行体。
- 消费者从 args参数中获得阻塞队列_c_bq的指针(是从生产者中传入的),然后进入一个无限循环。
- 在循环中,消费者从阻塞队列中弹出一个 CallTask对象t,并执行该任务(通过t()调用),然后输出计算结果。
主函数 main
 
队列类当中的函数类 类型
int main()
{
    srand((unsigned int)time(nullptr));
    BlockQueue<Task>* bq = new BlockQueue<Task>();
    pthread_t p[3], c[2];
    for (int i = 0; i < 3; ++i)
    {
        pthread_create(p + i, nullptr, productor, bq);
    }
    for (int i = 0; i < 2; ++i)
    {
        pthread_create(c + i, nullptr, consumer, bq);
    }
    for (int i = 0; i < 3; ++i)
    {
        pthread_join(p[i], nullptr);
    }
    for (int i = 0; i < 2; ++i)
    {
        pthread_join(c[i], nullptr);
    }
    return 0;
}埋种子:srand((unsigned int)time(nullptr));
- main函数中首先设置了随机数种子,然后创建一个- BlockQueue<CallTask>对象,用于存放生产者生成的任务。
- 创建了3个生产者线程和2个消费者线程,所有线程都使用同一个阻塞队列对象 bq。
- 主线程等待所有生产者和消费者线程结束,确保所有任务都被正确处理。

代码整体说明
- 生产者-消费者模型:这是一个经典的多线程同步问题,生产者生成任务并放入队列,消费者从队列中取出任务并执行。
- 阻塞队列:通过互斥锁和条件变量,阻塞队列保证了生产者和消费者在多线程环境下的正确协调,防止竞争条件和资源浪费。生产者在队列满时等待,消费者在队列空时等待,从而保证了任务的有序处理。
💡小注意:
- 在 Visual Studio Code (VSCode) 中进行代码的批量替换操作非常简单,可以通过以下步骤完成:按下 Ctrl + H快捷键
- 如果确认无误,点击“Replace All”按钮来一次性替换所有匹配项。
为什么要先加锁?
因为判断临界资源调试是否满足,也是在访问临界资源!判断资源是否就绪,是通过再临界资源内部判断的。
为何什么要用 while?解决伪唤醒 ✔️if->while 等待的实现原理
while 一直判断情况,充当休眠等待
- 当前判断生产条件不满足就把自己挂起,但是这有个问题pthread_cond_wait这是一个函数,只要是函数就有调用失败的可能。
- 另外还存在伪唤醒的情况,假设只有一个消费者,十个生产者。只消费了一个但是却唤醒了一批。但是你这里是if判断,都去push肯定是有问题的。
- 因此充当条件判断的语法必须是while,不能用if
单线程模型->多线程模型的思考✔️
多线程对生产和消费队列加了锁的管控,也是只能有一个人的进/出,但是对于非共享区的准备工作,是可以高并发进行的,真正的优势在于:对于非共享的部分可以多线程的提前执行好
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
std::queue<int> buffer;
const int BUFFER_SIZE = 10;
std::mutex mtx;
std::condition_variable cv;
bool done = false; // 生产者是否完成标志
// 生产者
void producer() {
    for (int i = 0; i < 20; ++i) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [] { return buffer.size() < BUFFER_SIZE; });
        buffer.push(i);
        std::cout << "Produced: " << i << std::endl;
        lock.unlock();
        cv.notify_one();
    }
    done = true; // 生产者完成
    cv.notify_one(); // 通知消费者
}
// 消费者
void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [] { return !buffer.empty() || done; });
        if (done && buffer.empty()) break; // 如果生产者完成且缓冲区为空,则退出
        int item = buffer.front();
        buffer.pop();
        std::cout << "Consumed: " << item << std::endl;
        lock.unlock();
        cv.notify_one();
    }
}
int main() {
    std::thread t1(producer);
    std::thread t2(consumer);
    t1.join();
    t2.join();
    return 0;
}2. 信号量
例如:电影院买票是一种对座位的预定机制
信号量:原子性进行 P(- -)V(++) 操作的计数器
这把计数器的本质是什么?
计数器用来描述资源数目的,把资源是否就绪放在了临界区之外
申请信号量的时候,其实就已经间接的再做判断了
P()---访问资源-- V()
2.1 回忆:多线程对锁的凌乱竞争✔️
解释
当多个线程试图同时访问一个共享资源(如全局变量)时,通常会使用锁(mutex)来确保一次只有一个线程能够访问该资源。然而,如果没有正确的同步机制,线程可能会以一种不可预测的顺序尝试获取锁,导致以下问题:
- 竞态条件:多个线程几乎同时尝试获取同一把锁,但只有一个线程能够成功获取。其他线程需要等待锁释放才能继续执行,这可能导致资源访问的顺序混乱。
- 饥饿:某些线程可能长时间无法获取锁,因为其他线程总是抢先一步获取锁。
- 死锁:两个或多个线程相互等待对方释放锁,导致所有线程都被阻塞。
- 活锁:线程不断尝试获取锁但始终失败,浪费计算资源。
解决方案
为了防止这种“凌乱竞争”,可以采取以下措施:
- 使用原子操作:确保对共享资源的访问是原子的,即不可分割的。
- 使用互斥锁:确保一次只有一个线程可以访问共享资源。
- 使用条件变量:结合互斥锁使用,允许线程在特定条件下等待,直到满足条件后再继续执行。
- 使用读写锁:允许多个线程同时读取共享资源,但一次只能有一个线程写入。
- 使用信号量:控制多个线程对有限资源的访问。
例如对互斥锁进行回忆:
#include <pthread.h>
#include <stdio.h>
int x = 0;
pthread_mutex_t lock;
void* incrementX(void* arg) {
    while (1) {
        pthread_mutex_lock(&lock); // 获取锁
        x++;
        printf("Thread incremented x: %d\n", x);
        pthread_mutex_unlock(&lock); // 释放锁
        usleep(100000); // 等待一段时间
    }
    return NULL;
}
int main() {
    pthread_t thread1, thread2;
    pthread_mutex_init(&lock, NULL);
    pthread_create(&thread1, NULL, incrementX, NULL);
    pthread_create(&thread2, NULL, incrementX, NULL);
    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);
    pthread_mutex_destroy(&lock);
    return 0;
}在这个示例中,每个线程在递增 x 的值之前先获取锁,完成递增后释放锁。这样可以确保任何时候只有一个线程能够修改 x 的值,避免了竞态条件的发生。
总结
- 并发问题:多个线程同时访问同一变量可能导致竞态条件。
- 解决方案:使用互斥锁、原子操作、条件变量等同步机制来保护共享资源。
2.2 信号量函数
信号量是一种用于同步和互斥的机制,在多线程或多进程编程中非常关键。以下是一些常用的信号量操作函数。
1. 初始化信号量
- 原型:int sem_init(sem_t *sem, int pshared, unsigned int value);
- 参数:
-  
  - sem:需要初始化的信号量。
- pshared:传入0表示线程间共享,传入非0表示进程间共享。
- value:信号量的初始值,即计数器的初始值。
 
- 返回值:成功返回0,失败返回-1。
2. 销毁信号量
- 原型:int sem_destroy(sem_t *sem);
- 参数:
-  
  - sem:需要销毁的信号量。
 
- 返回值:成功返回0,失败返回-1。
3. 等待信号量(P操作)
- 原型:int sem_wait(sem_t *sem);
- 参数:
-  
  - sem:需要等待的信号量。
 
- 返回值:成功返回0,信号量的值减一;失败返回-1,信号量的值保持不变。
4. 发布信号量(V操作)
- 原型:int sem_post(sem_t *sem);
- 参数:
-  
  - sem:需要发布的信号量。
 
- 返回值:成功返回0,信号量的值加一;失败返回-1,信号量的值保持不变。
#include <iostream>
#include <thread>
#include <semaphore.h>
// 定义信号量
sem_t sem;
// 生产者线程函数
void producer() {
    for (int i = 0; i < 5; ++i) {
        // 发布信号量(V操作)
        sem_post(&sem);
        std::cout << "Producer incremented semaphore." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}
// 消费者线程函数
void consumer() {
    for (int i = 0; i < 5; ++i) {
        // 等待信号量(P操作)
        sem_wait(&sem);
        std::cout << "Consumer decremented semaphore." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}
int main() {
    // 初始化信号量
    if (sem_init(&sem, 0, 0) == -1) {
        std::cerr << "Failed to initialize semaphore" << std::endl;
        return 1;
    }
    // 创建生产者和消费者线程
    std::thread t1(producer);
    std::thread t2(consumer);
    // 等待线程结束
    t1.join();
    t2.join();
    // 销毁信号量
    sem_destroy(&sem);
    return 0;
}
CP 模型中的锁与信号量使用总结
多生产多消费的意义
-  生产与消费的本质: 
-  
  - 生产:将私有的任务转移到公共空间中。
- 消费:从公共空间中取出任务进行处理。
 
- 意义:多生产多消费模型不仅仅局限于将任务或数据放入交易场所,而是包含了任务生产前和消费后的处理过程,这两个阶段往往是最耗时的。
信号量的本质与意义
- 信号量:本质上是一把计数器。
- 计数器的意义:
-  
  - 与互斥量相比,信号量可以预设临界资源的情况。
- 在执行 PV(Proberen/Vergrendelen,即测试/锁定)操作过程中,可以在不进入临界区的情况下得知资源情况。
- 这样可以减少临界区内部的判断,提高系统效率。
 
3.基于环形队列的生产者消费者模型
常见的环形,例如基于%实现

空和满的时候,tail 和 head 指向的是同一个位置,无法判断是空还是满
解决:
- 空一个位置(temp=head+1)来判断 temp 及下一个位置是不是==tail
- 交给信号量来处理
生产和消费没有指向同一个格子,就可以运行下去,遵循着三个原则
例如:我围着圆桌进行放苹果,你跟在我后面拿
- 指向同一个位置的时候,只能一个人访问
- 你不能超过我
- 我不能把你套个圈
正常追逐游戏,必须满足这三个条件
我们两个什么情况才会指向同一个位置?空或满
空:我,生产者执行
满:你,消费者执行
添加信号量
- P 关注什么资源呢?还有多少剩余空间 SpaceSem:N
- C 关注什么资源呢?还有多少剩余数据 DataSem:0
生产者执行时:P(SpaceSem) V(DataSem)
消费者执行时:P(DataSem) V(SpaceSem)
下篇文章将继续讲解~















![LCP9回文数[leetcode-9-easy]](https://i-blog.csdnimg.cn/direct/dbdc12a643bd4ffdac51b1a0ff5a05ff.png)



