?个人主页:island1314
?个人专栏:Linux—登神长阶
⛺️ 欢迎关注:?点赞 ??留言 ?收藏 ? ? ?
1. 前言 ?
? 下面开始,我们结合我们之前所做的所有封装,进行一个线程池的设计。在写之前,我们要做如下准备
准备 线程 的封装准备 锁 和 条件变量的封装引入日志,对线程进行封装这里用到了我们之前博客用到的头文件及代码 【Linux】:多线程(互斥 && 同步)
2. 日志和策略 ?
?什么是设计模式
T行业这么火,涌入的人很多,俗话说林子大了啥鸟都有,大佬和菜鸡们两极分化的越来越严重。为了让菜鸡们不太拖大佬的后腿,于是大佬们针对一些经典的常见的场景,给定了一些对应的解决方案,这个解决方案就是 设计模式? 认识日志
计算机中的日志是记录系统和软件运行中发生事件的文件,主要作用是监控运行状态、记录异常信息,帮助快速定位问题并支持程序员进行问题修复。它是系统维护、故障排查和安全管理的重要工具?日志格式的指标
必须有的:时间戳、日志等级、日志内容可选的:文件名行号、进程线程相关信息id 等等日志有现成的解决方案,如:spdlog、glog、Boost.Log、Log4cxx 等等,但是我们依旧采用自定义日志的方式。比如这里我们采用 设计模式-策略模式 来进行日志的设计
我们想要的日志格式如下:
[可读性很好的时间] [⽇志等级] [进程pid] [打印对应⽇志的⽂件名][⾏号] - 消息内容,⽀持可变参数[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [16] - hello world[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [17] - hello world[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [18] - hello world[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [20] - hello world[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [21] - hello world[2024-08-04 12:27:03] [WARNING] [202938] [main.cc] [23] - hello world
? 日志模式 详解 见代码注释 Log.hpp
注意:我们这里用到的 "Mutex.hpp" 是我们之前在 【Linux】:多线程(互斥 && 同步) 实现的互斥量封装#pragma once#include <iostream>#include <string>#include <unistd.h>#include <sstream>#include <fstream>#include <memory>#include <filesystem> // C++ 17 后的标准#include <unistd.h>#include <time.h>#include "Mutex.hpp"namespace LogMudule{ // 获取当前系统时间 std::string CurrentTime() { time_t time_stamp = ::time(nullptr); struct tm curr; localtime_r(&time_stamp, &curr); // 时间戳,获取可读性更高的时间信息 char buffer[1024]; // bug -> ? snprintf(buffer, sizeof(buffer), "%4d-%02d-%02d %02d:%02d:%02d", curr.tm_year + 1900, // 这里需要 + 1900 curr.tm_mon + 1, curr.tm_mday, curr.tm_hour, curr.tm_min, curr.tm_sec); return buffer; } using namespace LockModule; // 构成: 1. 构建日志字符串 2. 刷新落盘(①screen ②file) // 1. 日志文件的默认路径 和 文件名 const std::string defaultlogpath = "./log/"; const std::string defaultlogname = "log.txt"; // 2. 日志等级 enum class LogLevel { DEBUG = 1, INFO, WARNING, ERROR, FATAL }; std::string Level2String(LogLevel level) { switch(level) { case LogLevel::DEBUG: return "DEBUG"; case LogLevel::INFO: return "INFO"; case LogLevel::WARNING: return "WARNING"; case LogLevel::ERROR: return "ERROR"; case LogLevel::FATAL: return "FATAL"; default: return "None"; } } // 3. 刷新策略,只进行刷新,不提供方法 class LogStrategy { public: // 基类 需要 析构设成 虚方法 virtual ~LogStrategy() = default; virtual void SyncLog(const std::string &message) = 0; }; // 3.1 控制台策略(screen) class ConsoleLogStrategy : public LogStrategy { public: ConsoleLogStrategy() {} ~ConsoleLogStrategy() {} void SyncLog(const std::string &message) { LockGuard lockguard(_lock); std::cout << message << std::endl; } private: Mutex _lock; }; // 3.2 文件级(磁盘)策略 class FileLogStrategy : public LogStrategy { public: FileLogStrategy(const std::string &logpath = defaultlogpath, const std::string &logname = defaultlogname) : _logpath(logpath), _logname(logname) { // 确认 _logpath 是存在的 LockGuard lockguard(_lock); if(std::filesystem::exists(_logpath)) { return; } try{ std::filesystem::create_directories(_logpath); // 新建 } catch(std::filesystem::filesystem_error &e) { std::cerr << e.what() << "\n"; } } ~FileLogStrategy() {} // 下面用的是 c++ 的文件操作 void SyncLog(const std::string &message) { LockGuard lockguard(_lock); std::string log = _logpath + _logname; // ./log/log.txt std::ofstream out(log, std::ios::app); // 日志写入,一定是追加, app -> append if(!out.is_open()) return ; out << message << "\n"; out.close(); } private: std::string _logpath; std::string _logname; // 锁 Mutex _lock; }; // 日志类:构建日志字符串,根据策略 进行刷新 class Logger { public: Logger() { // 默认采用 ConsoleLogStrategy 策略 _strategy = std::make_shared<ConsoleLogStrategy>(); } void EnableConsoleLog() { _strategy = std::make_shared<ConsoleLogStrategy>(); } void EnableFileLog() { _strategy = std::make_shared<FileLogStrategy>(); } ~Logger() {} // 一条完整的信息: [2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [16] + 日志的可变部分(<< "hello world" << 3.14 << a << b;) class LogMessage { public: LogMessage(LogLevel level, const std::string &filename, int line, Logger &logger) : _currtime(CurrentTime()), _level(level), _pid(::getpid()), _filename(filename), _line(line), _logger(logger) { std::stringstream ssbuffer; ssbuffer << "[" << _currtime << "] " << "[" << Level2String(_level) << "] " // 对日志等级进行转换显示 << "[" << _pid << "] " << "[" << _filename << "] " << "[" << _line << "] - "; _loginfo = ssbuffer.str(); } template<typename T> LogMessage &operator<<(const T&info) { std::stringstream ss; ss << info; _loginfo += ss.str(); return *this; } ~LogMessage() { if (_logger._strategy) { _logger._strategy->SyncLog(_loginfo); } } private: std::string _currtime; // 当前日志的时间 LogLevel _level; // 日志等级 pid_t _pid; // 进程pid std::string _filename; // 源文件名称 int _line; // 日志所在的行号 Logger &_logger; // 负责根据不同的策略进行刷新 std::string _loginfo; // 一条完整的日志记录 }; // 就是要拷贝(临时的 logmessage), 故意的拷贝 LogMessage operator()(LogLevel level, const std::string &filename, int line) { return LogMessage(level, filename, line, *this); } private: std::shared_ptr<LogStrategy> _strategy; // 日志刷新的策略方案 }; Logger logger;#define LOG(Level) logger(Level, __FILE__, __LINE__)#define ENABLE_CONSOLE_LOG() logger.EnableConsoleLog()#define ENABLE_FILE_LOG() logger.EnableFileLog()}
代码剖析:
? 我们这里实现了一个日志模块(LogModule
),通过不同的日志策略(如控制台输出或文件输出)来记录日志。具体来说,它分为以下几个部分:
1. 获取当前时间 (CurrentTime 函数)
该函数通过 C++ 标准库的time
和 localtime_r
获取当前系统时间并格式化为 YYYY-MM-DD HH:MM:SS
的字符串格式。这个时间戳用于日志记录。 2. 日志等级 (LogLevel 枚举)
定义了五个日志等级: DEBUG、INFO、WARNING、ERROR、FATAL ,表示日志的严重性。Level2String 函数根据 LogLevel 转换为对应的字符串形式,用于日志输出。3. 日志策略(LogStrategy 类及其派生类)
LogStrategy 基类:定义了一个纯虚函数 SyncLog,用于实际的日志刷新操作(即将日志信息输出到目标介质)。ConsoleLogStrategy 类:实现了 SyncLog 方法,将日志信息输出到控制台。FileLogStrategy 类:实现了 SyncLog 方法,将日志信息输出到文件中。文件路径和文件名默认设置为 ./log/log.txt 。如果目录不存在,则会尝试创建目录。4. 日志类 (Logger 类)
Logger
类负责管理日志的策略,可以切换控制台输出或文件输出。Logger
提供了两个方法: EnableConsoleLog:切换为控制台输出策略。EnableFileLog:切换为文件输出策略。内部有一个嵌套类 LogMessage,它用来生成具体的日志条目。每次创建一个 LogMessage 对象时,会自动格式化日志信息并最终将其传递给策略进行输出。 5. 日志信息格式化
LogMessage
类的构造函数会根据当前时间、日志等级、进程 ID、文件名和行号等信息来生成一条完整的日志记录。operator<<
被重载,以支持日志信息的追加,可以向日志信息中添加不同类型的内容(如字符串、数字等)。 6. 宏定义
LOG(Level)
:简化日志记录的调用方式,自动记录当前文件名和行号。ENABLE_CONSOLE_LOG()
:设置日志策略为控制台输出。ENABLE_FILE_LOG()
:设置日志策略为文件输出。
? 测试代码 Main.cc 如下:
#include "Log.hpp"using namespace LogMudule;int main(){ ENABLE_FILE_LOG(); // 开启日志文件的文件输出 LOG(LogLevel::DEBUG) << "Hello File"; LOG(LogLevel::DEBUG) << "Hello File"; LOG(LogLevel::DEBUG) << "Hello File"; LOG(LogLevel::DEBUG) << "Hello File"; ENABLE_CONSOLE_LOG(); // 往显示器输出 LOG(LogLevel::DEBUG) << "Hello IsLand"; LOG(LogLevel::DEBUG) << "Hello IsLand"; LOG(LogLevel::DEBUG) << "Hello IsLand"; LOG(LogLevel::DEBUG) << "Hello IsLand"; return 0;}
输出如下:
3. 线程池设计 ?
? 3.1 线程池的基本概念
? 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。
线程池 通过一个线程安全的阻塞任务队列加上一个或一个以上的线程实现,线程池中的线程可以从阻塞队列中获取任务进行任务处理,当线程都处于繁忙状态时可以将任务加入阻塞队列中,等到其它的线程空闲后进行处理。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。可以避免大量线程频繁创建或销毁所带来的时间成本,也可以避免在峰值压力下,系统资源耗尽的风险;并且可以统一对线程池中的线程进行管理,调度监控。? 3.2 线程池应用场景
需要大量的线程来完成任务,且完成任务的时间比较短。 比如 WEB 服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,比如我们可以想象一个热门网站的点击次数。但对于一些长时间的任务,比如一个 Telnet 连接请求,线程池的优点就不明显了。因为 Telnet 会话时间比线程的创建时间大多了。对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。 突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误
? 3.3 线程池种类
创建 固定数量线程池 ,循环从任务队列中获取任务对象,获取到任务对象后,执行任务对象中的任务接口浮动线程池,循环从任务队列中获取任务对象,获取到任务对象后,执行任务对象中的任务接口这里我是选择固定线程个数的线程池来做样例进行实现
? 3.4 线程池实现 (ThreadPool)
Task.hpp
#pragma once#include <iostream>#include <string>#include <string>#include <functional>#include "Log.hpp"using namespace LogMudule;using task_t = std::function<void(std::string name)>;void Push(std::string name){ LOG(LogLevel::DEBUG) << "我是一个推送数据到服务器的一个任务, 我正在被执行" << "[" << name << "]";}
Log.hpp 是我们上面日志那里实现的,"Mutex.hpp" 和 "Cond.hpp" 是我们之前博客【Linux】:多线程(互斥 && 同步) 实现的互斥量和条件变量封装,而 "Thread.hpp" 也是我们之前博客 【Linux】:线程库简单封装 实现的 1 号 版本 ThreadPool.hpp
#pragma once#include <iostream>#include <string>#include "Log.hpp"#include "Mutex.hpp"#include <queue>#include <vector>#include <memory>#include "Cond.hpp"#include "Thread.hpp"namespace ThreadPoolModule{ using namespace LogMudule; using namespace ThreadModule; using namespace CondModule; using namespace LockModule; // 用来做测试的线程方法 void DefaultTest() { while(true) { LOG(LogLevel::DEBUG) << "我是一个测试线程"; sleep(1); } } using thread_t = std::shared_ptr<Thread>; const static int defaultnum = 5; template<typename T> class ThreadPool { private: bool IsEmpty() { return _taskq.empty();} void HandlerTask(std::string name) { LOG(LogLevel::INFO) << "线程" << name << ", 进入HandlerTask 的逻辑"; while(true) { // 1. 拿任务 T t; { LockGuard lockguard(_lock); while(IsEmpty() && _isrunning) { _wait_num++; // 线程等待数量加 1 _cond.Wait(_lock); _wait_num--; } // 2. 任务队列不为空 && 线程池退出 if(IsEmpty() && !_isrunning){ break; } t = _taskq.front(); _taskq.pop(); } // 2. 处理任务 t(name); // 规定,未来所有的任务处理,全部都是必须提供 () 方法 } LOG(LogLevel::INFO) << "线程" << name << "退出"; } public: ThreadPool(int num = defaultnum): _num(num), _wait_num(0), _isrunning(false) { for(int i = 0; i < _num; i++) { // _threads.push_back(std::make_shared<Thread>(DefaultTest)); // 构建 make_shared 对象 _threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1))); // 构建 make_shared 对象,当我们传递名字的时候,需要带上 placeholder // _threads.push_back(std::make_shared<Thread>([this]{ThreadPool::HandlerTask})); // 构建 make_shared 对象 LOG(LogLevel::INFO) << "构建线程" << _threads.back()->Name() << "对象...成功"; } } void Equeue(T &&in) { LockGuard lockguard(_lock); // 保护临界资源,使其安全 if(!_isrunning) return ; _taskq.push(std::move(in)); if(_wait_num > 0){ _cond.Notify(); } } void Start() { if(_isrunning) return ; _isrunning = true; // 注意这里 for(auto &thread_ptr : _threads) { LOG(LogLevel::INFO) << "启动线程" << thread_ptr->Name() << "...成功"; thread_ptr->Start(); } } void Wait() { for(auto &thread_ptr: _threads) { thread_ptr->Join(); LOG(LogLevel::INFO) << "回收线程" << thread_ptr->Name() << "...成功"; } } void Stop() { LockGuard lockguard(_lock); if(_isrunning) { // 3. 不能再入任务了 _isrunning = false; // 1. 让线程自己退出 && 2. 历史的任务被处理完了 if(_wait_num > 0) _cond.NotifyAll(); } } ~ThreadPool() { } private: std::vector<thread_t> _threads; int _num; int _wait_num; std::queue<T> _taskq; // 任务队列 -> 临界资源 Mutex _lock; Cond _cond; bool _isrunning; };}
1. 命名空间
namespace ThreadPoolModule{ using namespace LogMudule; using namespace ThreadModule; using namespace CondModule; using namespace LockModule;}
使用了四个外部命名空间:LogMudule
(日志模块),ThreadModule
(线程模块),CondModule
(条件变量模块),LockModule
(锁模块)。这些模块分别提供日志记录、线程管理、条件变量和互斥锁功能。 2. 测试线程方法
void DefaultTest(){ while(true) { LOG(LogLevel::DEBUG) << "我是一个测试线程"; sleep(1); }}
DefaultTest
是一个模拟的线程任务函数,线程会每秒打印一次日志。sleep(1)
用于让线程每秒钟执行一次。 3. thread_t 类型定义
using thread_t = std::shared_ptr<Thread>;
定义了 thread_t
类型,它是 std::shared_ptr<Thread>
的别名。这样可以方便管理线程对象的生命周期,避免手动管理内存。 4. ThreadPool 类
ThreadPool
类是一个模板类,用来管理和分配线程池中的任务。其主要功能包括任务队列管理、线程创建、启动、停止和等待等。
成员变量
_threads
: 存储线程的容器,类型为 std::vector<thread_t>
,存放线程的共享指针。_num
: 线程池中的线程数量,默认值为 defaultnum
(5)。_wait_num
: 记录当前等待任务的线程数量。_taskq
: 任务队列,存储待执行的任务。类型为 std::queue<T>
。_lock
: 互斥锁,保护任务队列和其他临界资源。_cond
: 条件变量,用于线程之间的同步,帮助线程等待任务或通知线程执行任务。_isrunning
: 布尔标志,表示线程池是否正在运行。 成员函数
① IsEmpty
bool IsEmpty() { return _taskq.empty(); }
判断任务队列是否为空。 ② HandlerTask
void HandlerTask(std::string name){ LOG(LogLevel::INFO) << "线程" << name << ", 进入HandlerTask 的逻辑"; while(true) { // 拿任务 T t; { LockGuard lockguard(_lock); while(IsEmpty() && _isrunning) { _wait_num++; // 线程等待数量加 1 _cond.Wait(_lock); _wait_num--; } if (IsEmpty() && !_isrunning) { break; } t = _taskq.front(); _taskq.pop(); } // 处理任务 t(name); // 假设任务对象可以直接调用 } LOG(LogLevel::INFO) << "线程" << name << "退出";}
HandlerTask 是每个线程执行的函数:
线程不断从任务队列中取任务执行。如果任务队列为空,线程会等待,直到有新任务被加入队列。线程通过条件变量Wait()
等待任务,避免空转浪费 CPU 资源。t(name)
任务执行函数,通过传递线程名称进行日志记录。 ③ Equeue
void Equeue(T &&in){ LockGuard lockguard(_lock); if (!_isrunning) return; _taskq.push(std::move(in)); // 把任务添加到队列中 if (_wait_num > 0) { _cond.Notify(); // 通知等待线程有新任务 }}
Equeue
将任务加入到任务队列 _taskq
中。通过 std::move
移动任务对象来避免不必要的拷贝。如果有线程在等待任务,调用 Notify()
通知这些线程继续工作。 ④ Start
void Start(){ if (_isrunning) return; _isrunning = true; for (auto &thread_ptr : _threads) { LOG(LogLevel::INFO) << "启动线程" << thread_ptr->Name() << "...成功"; thread_ptr->Start(); }}
Start 启动线程池中的所有线程。线程会调用 HandlerTask 开始处理任务。 ⑤ Wait
void Wait(){ for (auto &thread_ptr : _threads) { thread_ptr->Join(); LOG(LogLevel::INFO) << "回收线程" << thread_ptr->Name() << "...成功"; }}
Wait
等待所有线程执行完毕。Join
会阻塞当前线程直到每个线程完成任务。 ⑥ Stop
void Stop(){ LockGuard lockguard(_lock); if (_isrunning) { _isrunning = false; if (_wait_num > 0) _cond.NotifyAll(); // 通知所有等待的线程退出 }}
Stop
用于停止线程池的运行,标记 _isrunning = false
,使线程池不再接收新任务。
_cond.NotifyAll()
唤醒这些线程,让它们退出。 5. 线程池的工作流程
线程池在构造时会创建指定数量的线程。任务通过Equeue
方法提交到任务队列中。线程池通过 Start
启动所有线程,每个线程执行 HandlerTask
,从任务队列中取任务并处理。通过 Wait
等待所有线程执行完毕。通过 Stop
停止线程池并通知所有线程退出。 6. 线程管理
线程池通过std::shared_ptr<Thread>
来管理线程,避免了手动内存管理的问题。使用条件变量来实现线程的等待和通知机制。使用互斥锁 Mutex
确保任务队列的线程安全。 7. 日志记录
通过LOG
宏记录线程池的各种操作,如线程的启动、任务的处理等。这些日志有助于调试和监控线程池的运行状态。
测试代码 ThreadPool.cc:
#include "ThreadPool.hpp"#include "Task.hpp"#include <memory>using namespace ThreadPoolModule;int main(){ ENABLE_CONSOLE_LOG(); // 默认开启 -- 日志显示策略 // ENABLE_FILE_LOG(); // 文件显示 std::unique_ptr<ThreadPool<task_t>> tp = std::make_unique<ThreadPool<task_t>>(); tp->Start(); int cnt = 10; while(cnt) { tp->Equeue(Push); cnt--; sleep(1); } tp->Stop(); sleep(3); tp->Wait(); return 0;}
运行输入如下:
当然我们也可以把我们的输出结果写到文件中,关闭默认开启,打开文件显示就行
4. 线程安全的单例模式 ??
? 4.1 单例模式的概念及性质
单例模式(Singleton Pattern)是一种创建型设计模式,它确保一个类只有一个实例,并提供全局访问点来访问这个实例。在C++中,单例模式通常用于需要控制资源访问或管理全局状态的情况下,比如日志记录器、配置管理器、线程池等其特点如下:
某些类, 只应该具有一个对象(实例), 就称之为单例。 例如一个丈夫只能有一个老婆在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用⼀个单例的类来管理这些数据.? 4.2 饿汉方式和懒汉方式的实现
这里我们打个比方 ?
吃完饭, 立刻洗碗, 这种就是饿汉方式。因为下一顿吃的时候可以立刻拿着碗就能吃饭.吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式.懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度.
? 4.3 饿汉方式实现单例模式
template <typename T>class Singleton {static T data;public:static T* GetInstance() {return &data;}};
? - 只要通过 Singleton 这个包装类来使用 T 对象, 则⼀个进程中只有⼀个 T 对象的实例。
? 4.4 懒汉方式实现单例模式
template <typename T>class Singleton {static T* inst;public:static T* GetInstance() {if (inst == NULL) {inst = new T();} return inst;}};
存在一个严重的问题,线程不安全
第一次调用 Getlnstance 的时候,如果两个线程同时调用,可能会创建出两份 T 对象的实例,但是后续再次调用就没有问题了
因此我们这里再来一个懒汉方式实现单例模式(线程安全版本)
// 懒汉模式, 线程安全template <typename T>class Singleton {volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.static std::mutex lock;public:static T* GetInstance() {if (inst == NULL) { // 双重判定空指针, 降低锁冲突的概率, 提⾼性能.lock.lock(); // 使⽤互斥锁, 保证多线程情况下也只调⽤⼀次 new.if (inst == NULL) {inst = new T();} lock.unlock();} return inst;}}
注意事项:
加锁解锁的位置双重 if 判定,避免不必要的锁竞争volatile 关键字防止过度优化 (指令重排序和从寄存器中读取数据) (可见性和有序性)? 4.5 单例式线程池实现(SigThreadPool)
这里说明一下,下面的代码也用到了我们之前线程池实现的代码,是基于之前的代码的一个改善
ThreadPool.hpp
#pragma once#include <iostream>#include <string>#include "Log.hpp"#include "Mutex.hpp"#include <queue>#include <vector>#include <memory>#include "Cond.hpp"#include "Thread.hpp"namespace ThreadPoolModule{ using namespace LogMudule; using namespace ThreadModule; using namespace CondModule; using namespace LockModule; // 用来做测试的线程方法 void DefaultTest() { while(true) { LOG(LogLevel::DEBUG) << "我是一个测试线程"; sleep(1); } } using thread_t = std::shared_ptr<Thread>; const static int defaultnum = 5; template<typename T> class ThreadPool { private: bool IsEmpty() { return _taskq.empty();} void HandlerTask(std::string name) { LOG(LogLevel::INFO) << "线程" << name << ", 进入HandlerTask 的逻辑"; while(true) { // 1. 拿任务 T t; { LockGuard lockguard(_lock); while(IsEmpty() && _isrunning) { _wait_num++; // 线程等待数量加 1 _cond.Wait(_lock); _wait_num--; } // 2. 任务队列不为空 && 线程池退出 if(IsEmpty() && !_isrunning){ break; } t = _taskq.front(); _taskq.pop(); } // 2. 处理任务 t(name); // 规定,未来所有的任务处理,全部都是必须提供 () 方法 } LOG(LogLevel::INFO) << "线程" << name << "退出"; } ThreadPool(const ThreadPool<T> &) = delete; // 拷贝构造设置为私有 ThreadPool<T> &operator=(const ThreadPool<T> &) = delete; // 拷贝构造设置为私有 ThreadPool(int num = defaultnum): _num(num), _wait_num(0), _isrunning(false) { for(int i = 0; i < _num; i++) { _threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1))); // 构建 make_shared 对象,当我们传递名字的时候,需要带上 placeholder LOG(LogLevel::INFO) << "构建线程" << _threads.back()->Name() << "对象...成功"; } } public: static ThreadPool<T> *getInstance() { if(instance == NULL) { LockGuard lockguard(mutex); if(instance == NULL) { LOG(LogLevel::INFO) << "单例首次被执行,需要加载对象..."; instance = new ThreadPool<T>(); } } return instance; } void Equeue(T &&in) { LockGuard lockguard(_lock); // 保护临界资源,使其安全 if(!_isrunning) return ; _taskq.push(std::move(in)); if(_wait_num > 0){ _cond.Notify(); } } void Start() { if(_isrunning) return ; _isrunning = true; // 注意这里 for(auto &thread_ptr : _threads) { LOG(LogLevel::INFO) << "启动线程" << thread_ptr->Name() << "...成功"; thread_ptr->Start(); } } void Wait() { for(auto &thread_ptr: _threads) { thread_ptr->Join(); LOG(LogLevel::INFO) << "回收线程" << thread_ptr->Name() << "...成功"; } } void Stop() { LockGuard lockguard(_lock); if(_isrunning) { // 3. 不能再入任务了 _isrunning = false; // 1. 让线程自己退出 && 2. 历史的任务被处理完了 if(_wait_num > 0) _cond.NotifyAll(); } } ~ThreadPool() { } private: std::vector<thread_t> _threads; int _num; int _wait_num; std::queue<T> _taskq; // 任务队列 -> 临界资源 Mutex _lock; Cond _cond; bool _isrunning; static ThreadPool<T> *instance; static Mutex mutex; // 用来只保护单例 }; template<typename T> ThreadPool<T> *ThreadPool<T>::instance = NULL; template<typename T> Mutex ThreadPool<T>::mutex; //只用来保护单例}
① ThreadPool 类:
这是一个模板类(ThreadPool<T>
),其中 T
表示任务类型。线程池可以通过管理一组工作线程来异步执行任务。任务被排队等待,工作线程在有任务时会取出并执行。 ② 构造函数与实例管理:
构造函数接受线程数 (_num
),并创建相应数量的工作线程(使用 std::shared_ptr<Thread>
)。getInstance()
函数确保线程池实例是单例的,采用了单例模式,确保只有一个 ThreadPool<T>
实例存在。 ③ 任务管理:
Equeue()
函数将任务添加到任务队列中。如果有任何线程处于等待状态,它会通知这些线程继续执行。HandlerTask()
函数定义了每个工作线程执行的逻辑。线程会持续检查任务队列,处理任务,并在线程池停止时退出。 ④ 线程控制:
Start()
启动所有线程,每个线程会执行 HandlerTask()
。Wait()
等待所有线程处理完任务后才继续执行程序。Stop()
停止线程池,并确保不再接收新的任务。 ⑤ 同步机制:
使用Mutex
来保护临界区(如修改任务队列或停止线程池)。使用 Cond
变量进行线程同步。如果任务队列为空,线程会等待,直到有新任务被添加时被通知。 ⑥ 日志:
使用LOG
函数(可能来自 LogModule
)输出不同严重级别的日志(如 INFO、DEBUG)。这些日志有助于调试和显示线程池的运行状态。 测试代码 ThreadPool.cc
#include "ThreadPool.hpp"#include "Task.hpp"#include <memory>using namespace ThreadPoolModule;int main(){ ENABLE_CONSOLE_LOG(); // 默认开启 -- 日志显示策略 ThreadPool<task_t>::getInstance()->Start(); int cnt = 5; while(cnt) { ThreadPool<task_t>::getInstance()->Equeue(Push); cnt--; sleep(1); } ThreadPool<task_t>::getInstance()->Stop(); ThreadPool<task_t>::getInstance()->Wait(); return 0;}
运行结果如下:
5. 共勉 ?
【*★,°*:.☆( ̄▽ ̄)/$:*.°★* 】那么本篇到此就结束啦,如果有不懂 和 发现问题的小伙伴可以在评论区说出来哦,同时我还会继续更新关于【Linux】的内容,请持续关注我 !!