当前位置:首页 » 《随便一记》 » 正文

【C++】异步(并发)实现 线程池 ---附源码+实现步骤(future、async、promise、package_task、任务池原理和框架)

22 人参与  2024年12月30日 14:01  分类 : 《随便一记》  评论

点击全文阅读


请添加图片描述
每日激励:“加油ヾ(◍°∇°◍)ノ゙陌生人”
早关注不迷路,话不多说安全带系好,发车啦(建议电脑观看)。**
思维导图:
在这里插入图片描述

C++11异步操作所需函数

1. std::future:

std::future是C++11标准库中的⼀个模板类,它表⽰⼀个异步操作的结果。当我们在多线程编程中使⽤异步任务时,std::future可以帮助我们在需要的时候获取任务的执⾏结果。std::future的⼀个重要特性是能够阻塞当前线程,直到异步操作完成,从⽽确保我们在获取结果时不会遇到未完成的操作。

应⽤场景

异步任务: 当我们需要在后台执⾏⼀些耗时操作时,如⽹络请求或计算密集型任务等,std::future可以⽤来表⽰这些异步任务的结果。通过将任务与主线程分离,我们可以实现任务的并⾏处理,从⽽提⾼程序的执⾏效率并发控制: 在多线程编程中,我们可能需要等待某些任务完成后才能继续执⾏(需要阻塞等待)其他操作。通过使⽤std::future,我们可以实现线程之间的同步,确保任务完成后再获取结果并继续执⾏后续操作结果获取:std::future提供了⼀种安全的⽅式来获取异步任务的结果 。我们可以 使⽤std::future::get()函数来获取 任务的结果,此函数会阻塞当前线程,直到异步操作完成。这样,在调⽤get()函数时,我们可以确保已经获取到了所需的结果

2. std::async关联异步任务:

std::async关联异步任务,任务结果关联的std::future对象。默认情况下,std::async是否启动⼀个新线程,或者在等待future时,任务是否同步运⾏都取决于你给的参数。
这个参数为std::launch类型:

std::launch::deferred表明该函数会被延迟调⽤,直到在future上调⽤get()或者wait()才会开始执⾏任务std::launch::async 表明函数会在⾃⼰创建的线程上运⾏std::launch::deferred | std::launch::async 内部通过系统等条件⾃动选择策略

2.1 future + async 实操同步和异步操作:

async.cc文件、头文件:thread、future

Add加法函数 打印:加法(测试查看)

主函数:

将加法操作进行异步操作! async(policy,func,…) 策略+回调函数+args…参数:std::lacunch::deferred策略(同步)、Add回调函数、回调函数的参数args…使用std::this_thread::slepp_for(std::chrono::seconds(1)休眠1s)进行测试使用,查看打印的信息。返回值是future<(Add函数的返回值类型)> 接收成fu对象变量 fu调用get函数,获取结果 deferred:在执行get获取异步结果的时候才会执行异步任务 打印结果
在这里插入图片描述

策略换成std::launch::async

此时就能发现async会直接进行异步操作
(内部创建工作线程,异步的完成任务,而非在get处才进行异步)
查看结果 进行了主线程和回调函数的异步操作:在这里插入图片描述

代码:

#include <iostream>#include <thread>#include <future>#include <unistd.h>int Add(int n1,int n2){    std::cout << "------进行异步操作------" << std::endl;    return n1 + n2;}int main(){    //实现将加法函数使用future进行异步操作    //defferred策略    //在future对象get获取时才进行异步操作    std::future<int> fu = std::async(std::launch::async,Add,11,12);    // std::future<int> fu = std::async(std::launch::deferred,Add,11,12)    std::this_thread::sleep_for(std::chrono::seconds(1));//休眠1s    std::cout << "------1------" << std::endl;    //当使用deferred策略时,异步操作会在get处执行    int sum = fu.get();    std::cout << sum << std::endl;    return 0;}

3. std::promise::get_future和std::future配合:

std::promise提供了⼀种设置值的⽅式,它可以在设置之后通过相关联的std::future对象进⾏读取。换种说法就是之前说过std::future可以读取⼀个异步函数(async)的返回值, 但是要等待就绪, ⽽std::promise就提供⼀种⽅式⼿动让std::future就绪。

可能有点抽象,通过代码慢慢品。

3.1 promise + future的实操:

promise.cc、头文件:thread、future
加法函数:

int Add(n1,n2,promise &prom) 调用sleep_for休眠3s(通过睡眠3s,来展现pro设置数据与fu获取数据的同步关系)prom调用set_value设置结果return n1 + n2;

主函数:

promise对象prom调用get_future获取std::future对象给到fu对象使用std::thread类构造线程调用Add函数、n1、n2、std::ref获取prom引用
  std::thread(Add,11,12,&pro);//使用线程调用Add函数,进行异步操作
使用fu对象调用get获取结果 获取的结果其实就是prom设置的结果,也就是在线程中promise对象设置数据,这样就实现了其他线程通过future获取数据的获取异步任务结果的功能。该promise设置结果和future获取结果会存在一个同步关系,也就是必须要等待到promise设置完结果后,才能get获取到结果,保证一定有结果!! 打印查看sum注意线程终止join
在这里插入图片描述

看完实现步骤,在总结下:

promise的使用不像async一样自身去实现同步和异步策略的然后返回结果。promise他是是将自身传递给异步线程,在异步线程中获取参数,而外部的future对象是提前和promise关联的,关联后当future对象想获取数据时通过get获取,但该get是保证和关联的promise是同步的,所以即使没有数据future的get也会等待promise设置完成数据,最终通过future对象成功的获得异步的数据。

具体细节见代码注释:

#include <iostream>#include <thread>#include <future>int Add(int n1,int n2,std::promise<int>& p){    std::cout << "-----等待promise-----" << std::endl;    //睡眠3s    std::this_thread::sleep_for(std::chrono::seconds(3));//chrono英文:计时    //证明是不是 fu的get在等待 pro的set_value    p.set_value(n1+n2);    return n1 + n2;}int main(){    //demo测试:promise 与 future的结合使用//模板内填写的是:调用函数的返回值类型,int Add(...)    std::promise<int> pro;    //将future对象 fu 与 promise对象 pro 进行关联。    std::future<int> fu = pro.get_future();    std::thread th(Add,11,12,std::ref(pro));//使用线程调用Add函数,进行异步操作    //ref函数获取pro的引用    std::cout << "-----1-----" << std::endl;    int sum = fu.get();//此时并没有使用async函数,而是通过pro与fu对关联,fu从pro设置的值中获取异步数据    //并且fu的get与pro的set_value是同步的!!    std::cout << "-----2-----" << std::endl;    std::cout << sum << std::endl;    th.join();//关闭线程    return 0;}

4. packaged_task(任务包)

std::packaged_task就是将任务和 std::future 绑定在⼀起的模板是⼀种对任务的封装。通过std::packaged_task对象获取任务相关联的std::future对象(packaged_task调⽤get_future()⽅法获得)

std::packaged_task的模板参数是函数签名(就是返回值+参数(省略函数名和参数名),如:void(int)、int(double,double))。可以把std::future和std::async看成是分开的, ⽽std::packaged_task则是⼀个整体,将一个函数给封装起来,也能返回一个future对象将函数的结果保存起来。
总结:

packaged_task 可以通过get_future获取一个future对象,并获取获取封装的这个函数的异步结果

4.1 future+paskaged_task实操:

package_task.cc、头文件:thread、future

加法函数

int Add(n1,n2) sleep_for 3sreturn结果(n1+n2)

主函数:

使用packaged_task<回调函数签名> task对象,构造中填回调函数名(Add)进行二次封装到task中使用task的get_future函数获取fu对象

注意不能把task当成正常函数,也就是不能当作async异步执行函数、也不能当作线程入口
task只可以当作可调用来调用执行任务:也就是直接调用函数的方式:如task(…)。
注意,指针指向对象的生命周期问题(不能是局部变量!),所以需要使用指针类型,通过new对象将他放在堆上,也就是用智能指针管理它的生命周期。
这样就把task定义为一个指针,传递到线程中后,进行解引用执行(注意* 与 括号间的优先级问题!要写成:(*task)(n1,n2) )

这样重新写:将上面的局部成员变量的task改成智能指针:

将packaged_task<函数签名>对象用智能指针ptask对象储存(直接使用auto),并且使用make_shared构造回调函数。使用该对象指针,调用get_future生成关联的future函数使用线程 thr 对象调用函数(使用lambda表达式[&] ( ) {…}) 内部调用回调函数(类似C++仿函数调用不过参数是指针,就类似C语言中的函数指针的使用) 使用关联的fu对象的get函数获取结果、打印sum结果关闭线程
在这里插入图片描述

package_task使用的具体代码:

#include <iostream>#include <thread>#include <future>int Add(int n1,int n2){    std::cout << "-----等待promise-----" << std::endl;    std::this_thread::sleep_for(std::chrono::seconds(3));//chrono英文:计时    return n1 + n2;}int main(){    // std::packaged_task<int(int,int)> task(Add);//将Add函数进行 二次封装    // task(11,12);    // std::future<int> fu = task.get_future();    //使用智能指针替代直接创建变量:    // std::shared_ptr<std::packaged_task<int(int,int)>> ptask = std::make_shared<std::packaged_task<int(int,int)>>(Add);    //智能指针的类型:std::packaged_task<int(int,int)>、其中make_shared构造的时候也需要填写该类型!    auto ptask = std::make_shared<std::packaged_task<int(int,int)>>(Add);    std::future<int> fu = ptask->get_future();    std::thread thr([&](){        (*ptask)(11,12);    });    int sum = fu.get();    std::cout << sum << std::endl;    //注意关闭线程    thr.join();    return 0;}

线程池的实现

基于线程池执⾏任务的时候,⼊⼝函数内部执⾏逻辑是固定的,因此选择std::packaged_task加上std::future的组合来实现线程池。
线程池的工作思想:
用户传入要执行的函数,以及处理的数据(参数),有线程池中的工作线程来执行函数完成任务

实现框架:

需要的管理成员 任务池:用vector维护的函数池子实现同步互斥:互斥锁、条件变量一定数量的工作线程:用于不断从任务池取出任务中执行任务结束运行标志 需要的管理操作 入队任务:入队一个函数和参数停止运行:停止线程池

具体实现线程池

1. 先搭线程池的框架:

threadpool.hpp文件、ThreadPool类:

成员变量:

结束标志:_stop(原子的)锁:_mutex条件变量:_cv(conditino_variable)实现同步操作工作线程:_threads(vector储存)线程一定要放在条件变量和锁下面任务池< Functor >:_taskpool

成员函数:

任务池中的任务:using声明 Functor任务:function<void(void)>

构造(thr_count = 1)

析构

加入线程池的push函数

需要在此了解的内容.
模板函数<F,Args不定参>push(F && func,Args&& …args)入队给线程池,push的是一些任务,怎么知道函数的参数?使用模板参数 template<函数类型 F、不定参数 …Args> 获取函数的类型和参数auto push(F && func, Args && …args) - > std::future< decltype ( func ( argc… ) ) >(其中->箭头就是用于返回值类型的说明)其中push传入的首先第一个参数是一个要执行的函数、第二个参数是一个不定参(不定个数的参数,也就是该函数所需要的所有参数)push函数内部,会将这个传入的函数封装成一个异步任务(使用packaged_task任务包),再生成关联的future,将他返回回去这样就能在外部获取最终结果因为不知道push函数的返回值类型,所以返回值写成auto,再使用decltype通过内部的函数推导,推导出返回值的类型给到函数的返回值处。使用lambda生成一个可调用对象(内部执行异步任务),抛入到任务池中(后面由工作线程取出执行)

viod stop

私有函数:

线程入口函数:void entry()(内部不断的从任务池中取出任务进行执行)

2. 线程池的实现流程:

构造(thr_count = 1):

stop(false)创建线程池中的线程:emplace_back原地构造内部创建指定数量的线程(构成线程所要执行的函数,也就是调用线程入口函数)

析构

调用stop

stop:

判断若stop为true了就直接返回了,不要进行重复退出结束符号:_stop修改为true唤醒所有工作线程:_cv调用notify_all() (V操作)等待所有线程的退出:遍历_threads,进行thread的join

auto push(F && func, Args && …args) - > std::future< decltype ( func ( args… ) ) > (模板:template<typename F,typename …Args>)

将传入的函数封装成一个packaged_task任务包 使用bind包装器绑定(forward(func),forward(args)…),生成一个可调用对象tmp_func(并且这个函数是不需传递参数的,他已经提前绑定进去了),其中使用的forward是进行完美转发的(forward会根据传进来的参数,区分右值和左值)),最后的(args)…是一个参数包,能发多个参数using声明函数的return_type返回值的类型,同样通过decltype推导出传递进来的func函数的返回值类型(给到packaged_task封装任务包使用)创建packaged_task任务包对象task,构造把tmp_func函数给到任务,并且封装成智能指针(因为防止局部变量在线程中执行时被销毁)使用make_shared创建,此处不需要写参数(也就是packaged_task<函数类型写成:return_type( )>),因为已经bind绑定进去了。 获取task任务包关联的future对象(使用get_future)构造一个lambda匿名函数(捕获任务对象),函数内执行任务对象。 加锁:{ //使用括号进行限定作用域(为了让锁在局部有效,处作用域后自动解锁)使用 unique_lock< mutex >类lock对象构造为_mutex,直接进行加锁(加锁方法: std::unique_lock<std::mutex> lock(_mutex);//这样就完成了加锁!! 对_mutex变量,直接进行了加锁操作) 将构造出来的匿名函数对象,抛入到任务池中:任务池push_back(vector中的函数)储存匿名对象lambda[ ] ( ){ }表达式(内部执行任务操作,也就是 (*task) (); )_cv唤醒线程(执行entry函数),nodify_one }(此处对应第二点的加锁括号) 返回fu对象(这样外部也能通过future的获得异步数据)

void entry()函数:

加锁 同样使用unique_lock 等待任务池不为空,或_stop被置位返回 cv调用wait进行等待,参数:传递锁lock自动唤醒条件函数:[ this ] ( ) { return _stop || !_taskpool.empty(); } //当stop为真或者线程池不为空的时候 取出任务执行 但直接取每次只能取出一个任务(只取出一个任务,若任务池中,还有就会频繁的去取,而取任务就会重复上诉过程,就会频繁加锁,加锁就会影响整体效率)所以需要改变策略,取任务的方法处限定作用域首先在作用域外创建一个临时的任务池其次在作用域内将零时的任务池和真正的任务池进行swap(这样就能一次将所有的任务都取出来)最后出作用域,遍历临时的任务池,执行所有任务task()(也就是push函数中任务池push_back储存的匿名函数) 任务的循环处理最外层一定要死循环,直到stop被置位(while(!stop))

加法函数(略)

主函数:

ThreadPool pool;//线程池循环10个任务 fu对象 获取 pool调用的push(加法函数)方法返回来的关联futurefu调用get查看结果,并打印查看 poolstop
#include <iostream>#include <mutex>#include <atomic>#include <condition_variable>#include <vector>#include <functional>#include <thread>#include <future>class ThraedPool{public:    ThraedPool(int thr_count = 1):_stop(false){        //创建对应的线程个数        for(int i = 0 ; i < thr_count ;i++){            _threads.emplace_back(std::thread(&ThraedPool::entry,this));//注意函数的 取地址             //线程的初始化构造函数:会自动执行entry,参数就是后面的args        }    }    ~ThraedPool(){        stop();    }        void stop(){        if(_stop == true) return;//防止重复        _stop = true;//设置结束标志        _cv.notify_all();//唤醒所有工作线程,P操作        //等待所有线程退出        for(auto& thread : _threads){            thread.join();        }    }    template<typename F,typename ...Args>//F:函数、Args:参数    auto push(const F &&func,Args && ...args) -> std::future<decltype(func(args...))>    {        //推导出函数的返回值类型,同样使用decltype        using return_type = decltype(func(args...));        //使用bind函数适配器,提前将参数进行绑定,这样就能剩去参数的传递        auto tmp_func = std::bind(std::forward<F>(func),std::forward<Args>(args)...);//注意此处要把...参数包放到外面//其中使用到了 forward 进行完美转发,防止右值引用被当成左值进行传递        //使用智能指针将,packaged_task任务包进行封装,并且把func执行的函数给到任务包,此处传递进行封装后的函数tmp_func        auto task = std::make_shared<std::packaged_task<return_type()>>(tmp_func);        std::future<return_type> fu = task->get_future();//获取关联的future对象        //构造匿名lambda表达式进行捕获对象,并执行        //对于共用成员,需要加锁处理:        {            //加锁方法使用 unique_lock,具体如下:            std::unique_lock<std::mutex> lock(_mutex);//加锁完成!            //进行异步操作            //3. 将构造出来的匿名函数对象,抛入到任务池中            _taskpool.push_back([task](){                (*task)();            });            _cv.notify_one();//唤醒线程        }        return fu; //返回future对象           }    using Factor = std::function<void(void)>;private://线程的入口内部不断的从任务池中取出任务进行执行:    void entry(){        while(!_stop){            std::vector<Factor> tmp_taskpool;            //1. cv 进行等待,等待任务到来,或者 stop结束标志被置位 或者 任务池为空            {//加锁的作用域:                std::unique_lock<std::mutex> lock(_mutex);                _cv.wait(lock,[this](){                    //唤醒条件                    return _stop || !_taskpool.empty();                });                //唤醒后执行,但这样只能执行一个,所以在作用域外创建局部任务池                tmp_taskpool.swap(_taskpool);//与全局的任务池交换,获取所有任务            }            //遍历执行任务            for(auto& task : tmp_taskpool){                task();//执行任务            }        }     }private:    //1. 互斥锁    std::mutex _mutex;    //2. 条件变量    std::condition_variable _cv;    //3. 执行的任务池    std::vector<Factor> _taskpool;    //4. 线程池    std::vector<std::thread> _threads;    //5. 结束标志    std::atomic<bool> _stop;};int Add(int n1, int n2){    return n1 + n2;}int main(){    //创建线程池    ThraedPool pool;    for(int i = 0 ; i < 10 ;i++){        auto fu = pool.push(Add,11,i);        std::cout << fu.get() << std::endl;    }        pool.stop();    return 0;}

总结结合实例,理清下线程池工作的原理:

创建一个线程池TreadPool,他内部会先构造出线程池(多个线程,他们都会不断的执行entry)和任务池执行push将任务交给线程池,push就会将这些任务进行打包(package_take)并丢进任务池中,后唤醒线程(也就是唤醒entry中的wait处,进行从任务池中获取任务执行),并且返回一个future对象给到主函数这样外部的主函数和内部的线程就能实现异步操作,主函数就能直接通过future的get获取到数据(主函数他获取的数据和线程池中执行任务后的数据 是同步的)

最终结果
在这里插入图片描述


本章完。预知后事如何,暂听下回分解。

如果有任何问题欢迎讨论哈!

如果觉得这篇文章对你有所帮助的话点点赞吧!

持续更新大量c++细致内容,早关注不迷路。


点击全文阅读


本文链接:http://zhangshiyu.com/post/209258.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1