当前位置:首页 » 《随便一记》 » 正文

[Linux]带你轻松实现线程池

9 人参与  2023年05月06日 14:57  分类 : 《随便一记》  评论

点击全文阅读


目录

前言

封装

基于RAII思想封装的锁

封装一个自己的Thread类

线程池

线程池概念

线程池的应用场景

模拟实现自己的线程池

测试线程池

测试用任务

使用线程

前言

这是博主有关多线程的第五篇博客,前面没看的这边放上链接,建议去看看。

Linux 多线程(线程概念、线程控制部分)_Sola一轩的博客-CSDN博客

Linux]多线程(线程互斥、线程同步部分)_Sola一轩的博客-CSDN博客

【Linux】生产者消费者模型_Sola一轩的博客-CSDN博客

Linux]信号量及基于环形队列的生产消费模型_Sola一轩的博客-CSDN博客

        这次在实现线程池相关的代码前,我们先封装一下pthread库的锁和线程相关的接口,方便我们的使用和让代码更简洁。

封装

        C++11中有关线程库的部分就是对线程相关的系统调用进行了封装,在这里,我们将尝试自己进行封装,以方便接下来的使用。

基于RAII思想封装的锁

        锁控制不好时,可能会造成死锁,最常见的比如在锁中间代码返回,或者在锁的范围内抛异常,因此我们会基于RAII的思想(智能指针的)来封装一个LockGuard,在方便我们使用的同时,提高代码的健壮性。(C++11采用RAII的方式对锁进行了封装,即lock_guard和unique_lock,我们这里尝试自己进行封装)、

#pragma once#include<pthread.h>#include<iostream>//封装一下锁class Mutex{public:    Mutex(pthread_mutex_t* mutex = nullptr)    :_mutex(mutex)    {           }    void Lock()    {        if(_mutex)        pthread_mutex_lock(_mutex);    }    void UnLock()    {        if(_mutex)        pthread_mutex_unlock(_mutex);    }private:    pthread_mutex_t* _mutex;};class LockGuard{public:    LockGuard(pthread_mutex_t* mutex)    :_mutex(mutex)    {        //构造中加锁        _mutex.Lock();    }    ~LockGuard()    {        //析构中解锁        _mutex.UnLock();    }private:    Mutex _mutex;};

        用LockGuard来接收锁,可以在自动加锁后,如果出了作用域,就会调用析构函数自动解锁。

封装一个自己的Thread类

        由于每次使用pthread库内的线程相关的调用很繁杂,我们这里尝试进行封装一个自己的Thread类。

#pragma once#include <pthread.h>#include <iostream>#include <string>#include <functional>#include <cassert>class Thread{    typedef std::function<void*(void*)> func_t;  //用c++11中的function包装器接收方法private:    static void* start_routine(void* args)      //只能是静态成员的原因    {        Thread* t = static_cast<Thread*>(args);        return t->callback();    } public:    Thread()        //创建时先只给线程名字,等正式要用时再创建线程    {        char namebuff[128];        snprintf(namebuff,sizeof(namebuff),"Thread[%d]",ThreadNum++);        _name = namebuff;    }    void Run(func_t func,void* args)    {        _func = func;        _args = args;        pthread_create(&_tid,nullptr,start_routine,this);     //传this指针的原因        }    void Join()    {        pthread_join(_tid,nullptr);    }    std::string ThreadName()  //获取当前线程名字    {        return _name;    }    void* callback()    {        return _func(_args);    }private:    std::string _name;      //线程名字    pthread_t _tid;    void* _args;    // 调用方法的参数    func_t _func;     //线程执行的方法    static int ThreadNum;};int Thread::ThreadNum = 1;

注意点:

我们选择了默认构造时只初始化线程的名字 ,再调用Run接口时再正式创建我们的线程,传入方法和参数。(方便我们测试观察和让线程在需要时再进行创建)

start_routine是静态成员方法,由于this指针的原因。(pthread_create的第三个参数要求是返回值和参数皆为void*的函数,普通的成员函数参数中含有this指针,无法满足要求)

由于start_routine是静态成员,其没有this指针,所以无法访问成员变量,因此我们通过pthread_create( )函数的第四个参数将this指针传给start_routine。

在start_routine里面,我们通过传入的this指针调用类内的callback(),通过这样来调用我们传给线程的方法。

线程池

线程池概念

一种线程使用模式。

        线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。

        线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

线程池的应用场景

需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。

对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。

接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。

模拟实现自己的线程池

模拟实现的线程池功能

创建固定数量线程池,循环从任务队列中获取任务对象

获取到任务对象后,执行任务对象中的任务接口

#pragma once#include "Thread.hpp"#include "LockGuard.hpp"#include <vector>#include <queue>#include <unistd.h>template<class T>class ThreadPool;template<class T>class ThreadData    //方便传参时能拿到threadpool的this指针和当前线程的指针{public:    ThreadData(ThreadPool<T>* tp,Thread* td)    :_tp(tp),_td(td)    {    }    ThreadPool<T>* _tp;    Thread* _td;};template<class T>class ThreadPool{private:    static void* start_routine(void* args)    {        ThreadData<T>* td = static_cast<ThreadData<T>*>(args);   //拿到threadpool的指针        ThreadPool<T>* tp = td->_tp;            //拿到当前thread的指针        T task;        Thread* t = td->_td;        while(true)        {                {                                     //控制LockGuard的生命周期,拿到任务后就不需要锁保护了。                LockGuard lock(&tp->_mutex);                while(tp->QueueIsEmpty())                {                    tp->QueueWait();                }                tp->QueuePop(task);            }            std::cout<< t->ThreadName()<<":";            task();        }          }   public:    ThreadPool(const int num = 3)   //默认初始创建三个线程    :_num(num),_tp(num)    {        pthread_mutex_init(&_mutex,nullptr);        pthread_cond_init(&_cond,nullptr);        for(int i=0;i<_num;++i)        {            Thread* t = new Thread;            _tp[i] = t;        }    }    ~ThreadPool()    //析构    {        pthread_mutex_destroy(&_mutex);        pthread_cond_destroy(&_cond);        for(const auto& e : _tp)        delete e;    }    void start() //启动线程池获取任务,处理任务    {        for(int i=0;i<_num;++i)        {            ThreadData<T>* Td = new ThreadData<T>(this,_tp[i]); //给start_routine传入线程相关的数据,方便演示            _tp[i]->Run(start_routine,Td);            std::cout<<_tp[i]->ThreadName()<<"启动"<<std::endl;        }    }    void Push(const T& in)      //输入的任务    {        LockGuard lock(&_mutex);        _task.push(in);        QueueSignal();    }    bool QueueIsEmpty()    {        if(_task.empty())        {            return true;        }        return false;    }private:    void QueuePop(T& task)    {        task = _task.front();        _task.pop();    }    void QueueWait()    {        pthread_cond_wait(&_cond,&_mutex);    }    void QueueSignal()    {        pthread_cond_signal(&_cond);    }private:    int _num;         //线程池内创建多少个线程    std::vector<Thread*> _tp;    std::queue<T> _task;    pthread_mutex_t _mutex;    pthread_cond_t _cond;};

注意点:

我们用vector来存储创建好的线程,用queue来存放待执行的任务。

从任务队列中取任务要并发和互斥,因此需要一个互斥量和一个条件变量。

通过Push接口向队列中存放任务,通过QueuePop从任务队列中获取任务。

ThreadData类是用于方便我们向封装的线程传入多个参数,传入this指针方便调用Threadpool类内的成员方法。

LockGuard的生命周期在我们从任务队列中取得任务后就应该结束,线程拿到任务后应该并发的去执行各自拿到的任务,因为拿到任务后,该任务就属于当前线程了,已经与其他线程无关,所以我们用{ }来限LockGuard 的生命周期,没使用LockGuard记得在拿到任务后立即解锁即可。

通过Push放入任务后,通过QueueSignal()接口来唤醒等待中的线程来拿任务,这里不推荐用pthread_cond_broadcast,一瞬间唤醒大量的线程可能会导致系统震荡,这叫做惊群效应。

测试线程池

测试用任务

        之前用了“亿”次的任务:

#pragma once#include <iostream>    class Task    {    public:        Task(int x = 1, int y = 1, char op = '+')            : a(x), b(y), _op(op)        {        }        ~Task()        {        }        void Run()        {            int ret = 0;            switch (_op)            {            case ('+'):                ret = a+b;                break;            case ('-'):                ret = a-b;                break;            case ('*'):                ret = a*b;                 break;            case ('/'):                ret = a/b;//暂不考虑b为0;                break;            case ('%'):                ret = a%b;                break;            default:                std::cout << "未知操作符" << std::endl;                break;            }            std::cout<< a << _op << b << '=' << ret << std::endl;        }        void operator()()           {            Run();        }    private:        int a;        int b;        char _op;    };

        简而言之就是帮助我们两个数进行简单的运算。

使用线程池

        为了简便,我们直接用主线程不断派发任务:

#include "ThreadPool.hpp"#include "Thread.hpp"#include "LockGuard.hpp"#include "Task.hpp"#include <cstdlib>#include <ctime>#include<unistd.h>int main(){    srand((unsigned int)time(nullptr)^pthread_self()); //生成随机数种子    ThreadPool<Task>* tp = new ThreadPool<Task>(8);  //创建了8个线程    tp->start();                                //开始运行    while(true)    {        int x = rand()%20+1;        int y = rand()%10+1;        char op = "+-*/%"[rand()%5];        Task t(x,y,op);        tp->Push(t);                           //放入待执行任务队列        usleep(10000);    }    return 0;}

        演示成功,我们让线程池创造了八个线程,并让他们从任务队列中拿出任务进行了执行。

        知行合一,希望大家也尝试模拟实现出自己的线程池。


点击全文阅读


本文链接:http://zhangshiyu.com/post/61155.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1