当前位置:首页 » 《资源分享》 » 正文

【Linux】从多线程同步到生产者消费者模型:多线程编程实践

1 人参与  2024年10月22日 10:40  分类 : 《资源分享》  评论

点击全文阅读


目录

1.线程的同步

1.1.为什么需要线程的同步?

2.2.条件变量的接口函数

2.生产消费模型

2.1 什么是生产消费模型

2.2.生产者消费者模型优点

2.3.为何要使用生产者消费者模型

3.基于BlockingQueue的生产者消费者模型

3.1为什么要将if判断变成while?

3.2.pthread_cond_wait函数调用的作用:

代码:

4.POSIX信号量

4.1.POISX信号量是什么?

4.2.POISX信号量常见接口

4.3.POSIX信号量的核心PV操作

4.3.1、P操作(等待信号量)

4.3.2、V操作(释放信号量)

5.环形队列

5.1.生产消费模型搭建的原理

5.2.环形队列的具体实现

5.3.代码:

5.4.多生产和多消费的并发性体现在:

1.线程的同步

1.1.为什么需要线程的同步?

上面我们讲解了线程的互斥问题,但此时我们又发现了一个问题!

如果某一个线程抢票能力过于强大,把所有的票一个人都抢走了,比如上面的线程4,一个人就抢到了8088张票,而线程2和线程3一张票都没有抢到,这就造成了线程2和线程3的饥饿问题!

在现实世界里,这肯定是不行的,秉持着公平公正的原则,我们应该让这4个线程抢到的票都差不多,才有实际意义。

所以互斥能解决抢票抢到负数的问题,但是不能解决饥饿问题,饥饿问题就需要线程同步去解决!

通过条件变量我们可以实现线程的同步!

2.2.条件变量的接口函数

int pthread_cond_init(pthread_cond_t *restrict cond , const pthread_condattr_t *restrictattr);:初始化接口
int pthread_cond_destroy(pthread_cond_t *cond):销毁接口
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);:在条件不满足时阻塞等待
int pthread_cond_broadcast(pthread_cond_t *cond);:条件满足,唤醒所有线程,开始竞争。
int pthread_cond_signal(pthread_cond_t *cond);:条件满足,唤醒一个线程。

条件变量需要一个线程队列和相应的通知机制,才能保证线程同步!

2.生产消费模型

2.1 什么是生产消费模型

总结一句话就是“321”原则:

一个交易场所(特定数据结构形式存在的一段内存空间)两种角色(生产角色,消费角色):生产线程,消费线程三种关系:生产与生产(互斥关系) , 消费与消费(互斥关系),生产与消费。

1个交易场指的就是共享资源(临界资源),有多个厂商(生产者)和多个用户(消费者),所以这就是我们常说的多线程的同步和互斥问题。

超市是什么?临时保存数据的“内存空间”——某种数据结构对象。

商品是什么?就是数据!

2.2.生产者消费者模型优点

解耦支持并发支持忙闲不均

2.3.为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

3.基于BlockingQueue的生产者消费者模型

 

3.1为什么要将if判断变成while?

如果生产者只生产了一份,但是叫醒了5个消费者,当一个消费者竞争锁结束取走仅有的一份商品,那接下来的4个消费者就会看到空的队列,如果是if,因为之前已经判断过,所以会直接执行下面取空的队列,因此会直接报错,但是如果是while的话,仍需要判断队列是否已经满了,因为当等待的线程被唤醒的时候,继续从当前的位置进行执行代码!

3.2.pthread_cond_wait函数调用的作用:

a. 让调用线程等待

b. 自动释放曾经持有的_mutex锁

c. 当条件满足,线程唤醒,pthread_cond_wait要求线程必须重新竞争_mutex锁,竞争成功,方可返回!!!

代码:

#ifndef __BLOCK_QUEUE_HPP__#define __BLOCK_QUEUE_HPP__#include <iostream>#include <string>#include <queue>#include <pthread.h>template <typename T>class BlockQueue{private:    bool IsFull()    {        return _block_queue.size() == _cap;    }    bool IsEmpty()    {        return _block_queue.empty();    }public:    BlockQueue(int cap) : _cap(cap)    {        _productor_wait_num = 0;        _consumer_wait_num = 0;        pthread_mutex_init(&_mutex, nullptr);        pthread_cond_init(&_product_cond, nullptr);        pthread_cond_init(&_consum_cond, nullptr);    }    void Enqueue(T &in) // 生产者用的接口    {        pthread_mutex_lock(&_mutex);        while(IsFull()) // 保证代码的健壮性        {            // 生产线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!            // 1. pthread_cond_wait调用是: a. 让调用线程等待 b. 自动释放曾经持有的_mutex锁 c. 当条件满足,线程唤醒,pthread_cond_wait要求线性            // 必须重新竞争_mutex锁,竞争成功,方可返回!!!            // 之前:安全            _productor_wait_num++;            pthread_cond_wait(&_product_cond, &_mutex);  //  只要等待,必定会有唤醒,唤醒的时候,就要继续从这个位置向下运行!!            _productor_wait_num--;            // 之后:安全        }        // 进行生产        // _block_queue.push(std::move(in));        // std::cout << in << std::endl;        _block_queue.push(in);        // 通知消费者来消费        if(_consumer_wait_num > 0)            pthread_cond_signal(&_consum_cond); // pthread_cond_broadcast        pthread_mutex_unlock(&_mutex);    }    void Pop(T *out) // 消费者用的接口 --- 5个消费者    {        pthread_mutex_lock(&_mutex);        while(IsEmpty()) // 保证代码的健壮性        {            // 消费线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!            // 1. pthread_cond_wait调用是: a. 让调用进程等待 b. 自动释放曾经持有的_mutex锁            _consumer_wait_num++;            pthread_cond_wait(&_consum_cond, &_mutex);  // 伪唤醒            _consumer_wait_num--;        }        // 进行消费        *out = _block_queue.front();        _block_queue.pop();        // 通知生产者来生产        if(_productor_wait_num > 0)            pthread_cond_signal(&_product_cond);        pthread_mutex_unlock(&_mutex);        // pthread_cond_signal(&_product_cond);    }    ~BlockQueue()    {        pthread_mutex_destroy(&_mutex);        pthread_cond_destroy(&_product_cond);        pthread_cond_destroy(&_consum_cond);    }private:    std::queue<T> _block_queue;   // 阻塞队列,是被整体使用的!!!    int _cap;                     // 总上限    pthread_mutex_t _mutex;       // 保护_block_queue的锁    pthread_cond_t _product_cond; // 专门给生产者提供的条件变量    pthread_cond_t _consum_cond;  // 专门给消费者提供的条件变量    int _productor_wait_num;    int _consumer_wait_num;};#endif

我们之前学习了基于条件变量和阻塞队列实现(空间可以动态分配)的生产消费者模型,今天我们来用POSIX信号量基于固定大小的环形队列重写这个程序。

4.POSIX信号量

4.1.POISX信号量是什么?

信号量本质是一个计数器,可以在初始化时对设置资源数量,进程 / 线程 可以获取信号量来对资源进行操作和结束操作可以释放信号量!

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

4.2.POISX信号量常见接口

信号量初始化:

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数分别为:
sem_t *sem:传入信号量的地址
pshared:传入0值表示线程间共享,传入非零值表示进程间共享。
value:信号量的初始值(计数器的初始值)

信号量销毁:

#include <semaphore.h>
int sem_destroy(sem_t *sem)

4.3.POSIX信号量的核心PV操作

POSIX信号量的PV操作是信号量机制中的核心,它们分别代表了对信号量的等待(P操作)和释放(V操作)。

4.3.1、P操作(等待信号量)

P操作,也称为“申请资源”或“等待信号量”操作,用于尝试减少信号量的值。当线程或进程需要访问某个临界资源时,它会执行P操作来申请信号量。

函数原型

int sem_wait(sem_t *sem);

其中,sem是指向要等待的信号量的指针。

操作过程

如果信号量的当前值大于0,那么P操作会将信号量的值减1,并立即返回,表示申请资源成功。如果信号量的当前值为0,那么执行P操作的线程或进程将被阻塞,直到信号量的值变为大于0(即有其他线程或进程释放了信号量)。此时,被阻塞的线程或进程会重新尝试P操作,如果成功,则信号量的值再次减1。

返回值

成功时,返回0。失败时,返回-1,并设置errno来指示错误类型。

4.3.2、V操作(释放信号量)

V操作,也称为“释放资源”或“发布信号量”操作,用于增加信号量的值。当线程或进程完成对临界资源的访问后,它会执行V操作来释放信号量。

函数原型

int sem_post(sem_t *sem);

其中,sem是指向要释放的信号量的指针。

操作过程

V操作会将信号量的值加1。如果有线程或进程因为信号量的值为0而被阻塞在P操作上,那么V操作会唤醒其中一个被阻塞的线程或进程,使其能够继续执行P操作并访问临界资源。

返回值

总是返回0,表示成功。V操作永远不会阻塞。

注意PV操作是原子的,这意味着它们在执行过程中不会被其他线程或进程的打断。这保证了信号量机制的正确性和可靠性。

5.环形队列

5.1.生产消费模型搭建的原理

环形队列底层也是普通数组,

生产者和消费者指向同一位置有两种情况:

队列为空(让生产者先跑)队列为满(让消费者先跑)

环形队列当队列不为空或者满的时候,真正实现了多线程同步。当然生产者不能把消费者套一个圈,消费者不能超过生产者。这些都可以通过POSIX信号量的特性实现~

5.2.环形队列的具体实现

首先需要区分生产者和消费者,生产者只关注空间,消费者只关注资源。生产者和消费者都需要进行PV操作,生产者对应的将任务加入队列,消费者对应的取出队列里的任务。

Consumer线程不断从环形队列中取出Task对象,执行其操作,并打印消费结果。Productor线程则持续生成新的Task对象并将其放入队列中,同时打印出生产信息。

并且还需要两把锁,分别给生产者和消费者,保证多线程并发的线程安全。

5.3.代码:

#pragma once#include <iostream>#include <string>#include <vector>#include <semaphore.h>#include <pthread.h>template<typename T>class RingQueue{private:    void P(sem_t &sem)    {        sem_wait(&sem);    }    void V(sem_t &sem)    {        sem_post(&sem);    }    void Lock(pthread_mutex_t &mutex)    {        pthread_mutex_lock(&mutex);    }    void Unlock(pthread_mutex_t &mutex)    {        pthread_mutex_unlock(&mutex);    }public:    RingQueue(int cap): _ring_queue(cap), _cap(cap),  _productor_step(0), _consumer_step(0)    {        sem_init(&_room_sem, 0, _cap);        sem_init(&_data_sem, 0, 0);        pthread_mutex_init(&_productor_mutex, nullptr);        pthread_mutex_init(&_consumer_mutex, nullptr);    }    void Enqueue(const T &in)    {        // 生产行为        P(_room_sem);        Lock(_productor_mutex);        // 一定有空间!!!        _ring_queue[_productor_step++] = in; // 生产        _productor_step %= _cap;        Unlock(_productor_mutex);        V(_data_sem);    }    void Pop(T *out)    {        // 消费行为        P(_data_sem);        Lock(_consumer_mutex);        *out = _ring_queue[_consumer_step++];        _consumer_step %= _cap;        Unlock(_consumer_mutex);        V(_room_sem);    }    ~RingQueue()    {        sem_destroy(&_room_sem);        sem_destroy(&_data_sem);        pthread_mutex_destroy(&_productor_mutex);        pthread_mutex_destroy(&_consumer_mutex);    }private:    // 1. 环形队列    std::vector<T> _ring_queue;    int _cap; // 环形队列的容量上限    // 2. 生产和消费的下标    int _productor_step;    int _consumer_step;    // 3. 定义信号量    sem_t _room_sem; // 生产者关心    sem_t _data_sem; // 消费者关心    // 4. 定义锁,维护多生产多消费之间的互斥关系    pthread_mutex_t _productor_mutex;    pthread_mutex_t _consumer_mutex;};

5.4.多生产和多消费的并发性体现在:

消费者在处理任务的时候可以并发,

所以多生产和多消费的意义不在于向队列中生产,再从队列中拿走。而在于生产前我们可以多线程并发获取原始任务,生产后,被我们的消费者拿走任务后,可以多线程并发式的去执行各自的任务。这才是多生产多消费的意义

多生产,多消费的模型主要在于,多个生产者去竞争一个名额然后进行加锁,多个消费者竞争一个名额然后进行加锁,所以最终还是会变成单生产,单消费!


点击全文阅读


本文链接:http://zhangshiyu.com/post/175340.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1