? 作者:@阿亮joy.
?专栏:《学会Linux》
? 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根
目录
?线程池?什么是线程池线程池的优点线程池的应用场景:线程池的实现 ?日志功能的实现??线程安全的单例模式?什么是单例模式饿汉方式懒汉方式线程安全的单例线程池 ?STL、智能指针与线程安全??其他常见的锁??读者写者问题?读写锁读者写者问题和生产者消费者模型读写锁接口 ?总结?
?线程池?
池化技术是一种资源预分配的机制,先将资源申请好,如果用户需要资源,直接就可以将资源交给用户,不需要再去向系统申请,以提高效率。
什么是线程池
线程池:一种线程的使用模式(以空间换时间的形式)。线程过多会带来调度开销,进而影响缓存局部性和整体性能。线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。
线程池的优点
线程池维护着多个线程,等待着监督管理者分配可并发执行的任务,这就避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量。
线程池的应用场景:
需要大量的线程来完成任务,且完成任务的时间比较短。 Web 服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个 Telnet 连接请求,线程池的优点就不明显了。因为 Telnet 会话时间比线程的创建时间大多了。对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。线程池的实现
线程池中包含一个任务队列和多个线程,主线程向任务队列中 Push 任务,而多个线程则将任务队列中的任务 Pop 出来,然后去执行任务。
LockGuard.hpp
#pragma once#include <pthread.h>class Mutex{public: Mutex(pthread_mutex_t* pmtx) : _pmtx(pmtx) {} ~Mutex() {} void Lock() { pthread_mutex_lock(_pmtx); } void Unlock() { pthread_mutex_unlock(_pmtx); }private: pthread_mutex_t* _pmtx;};// RAII风格的加锁方式// 构造时自动进行加锁// 析构时自动进行解锁class lockGuard{public: lockGuard(pthread_mutex_t* pmtx) : _mtx(pmtx) { _mtx.Lock(); } ~lockGuard() { _mtx.Unlock(); }private: Mutex _mtx;};
Thread.hpp
Thread 类对线程进行了封装,其对象构造是需要传入线程的编号 num,线程的执行例程(回调函数)callBack 和执行参数 args。同时还设计了线程数据 ThreadData,ThreadData 中包含线程执行例程的参数、线程名和线程 ID。如果还需要其他的数据,可以在 ThreadData 中添加。Thread 类主要实现了两个接口 Create 和 Join,Create 是创建一个新的线程,而 Join 是等待线程,将线程占用的资源归还。#pragma once#include <string>#include <pthread.h>#include <cstdio>typedef void*(*func_t)(void*);class ThreadData{public: void *_args; // 线程执行例程的参数 std::string _name; // 线程名 pthread_t _tid; // 线程ID};class Thread{public: Thread(int num, func_t callBack, void* args) : _func(callBack) { char nameBuffer[64]; snprintf(nameBuffer, sizeof nameBuffer, "Thread %d", num); _data._args = args; _data._name = nameBuffer; } // 创建线程 void Create() { pthread_create(&_data._tid, nullptr, _func, (void*)&_data); } // 等待线程 void Join() { pthread_join(_data._tid, nullptr); } // 返回线程的名字 std::string Name() { return _data._name; } ~Thread() {}private: func_t _func; // 线程的执行例程 ThreadData _data; // 线程的属性};
Task.hpp
#pragma once#include <iostream>// 任务类class Task{public: Task() = default; Task(int x, int y, char op) : _x(x) , _y(y) , _op(op) {} ~Task() {} // 处理任务 void Execute(const std::string ThreadName) { int ret = 0; switch(_op) { case '+': ret = _x + _y; break; case '-': ret = _x - _y; break; case '*': ret = _x * _y; break; case '/': if(_y == 0) { std::cerr << "Error: Divided By Zero!" << std::endl; return; } else { ret = _x / _y; } break; case '%': if(_y == 0) { std::cerr << "Error: Modular Division By Zero!" << std::endl; } else { ret = _x % _y; } break; default: std::cerr << "Operation Error!" << std::endl; return; } std::cout << "[" << ThreadName << "]: " << _x << " " << _op << " " << _y << " = " << ret << std::endl; }private: int _x; int _y; char _op;};
ThreadPool.hpp
线程池中的任务队列是被多个执行流访问的临界资源,所以我们需要引入互斥锁来保护任务队列。线程池中的线程是要执行主线程 Push 到任务队列中的任务,如果任务队列中没有任务,那么线程只能进行等待,直至任务队列中有任务。当某个线程被唤醒时,可能是被广播类的唤醒线程操作唤醒的。而被广播唤醒的若干个线程中,只有个别的线程拿到任务。所以当线程被唤醒时,还需要再次判断是否满足被唤醒的条件,所以应该使用 while 来进行判断,而不是 if,否则将会存在伪唤醒的情况。pthread_cond_broadcast 函数的作用是唤醒所有在该条件变量下等待的线程。当提供给线程使用的资源可能不是很多,不应该唤醒使用 pthread_cond_broadcast 来唤醒全部线程,否则将会引起惊群效应影响效率。所以当资源较少时,应该使用 pthread_cond_signal 唤醒一个线程即可。 线程从任务队列中获取到任务后,该任务就属于当前线程了,其他线程无法拿到该任务。因此任务的处理应该在临界区外,而不是在临界区内处理任务。如果将线程在临界区内处理任务,那么其他线程要等待该线程将任务处理完,才能够进入临界区获取任务,这样就无妨让多个线程并发地执行任务了,那么线程池的意义也不大了。线程池的构造函数主要做的工作是将互斥锁和条件变量初始化,还需要创建 num 个线程对象 Thread。而创建线程对象需要传入线程编号、线程要执行的历程以及执行例程的参数线程池指针 this。为什么执行例程 Routine 要加 static 修饰呢?因为类的成员函数的第一个参数默认就是 this 指针,如果 Routine 不加 static 修饰,那么 Routine 将会有两个参数 this 指针和 void* args。这无法满足创建线程时传入返回值为 void*,参数为 void*的函数指针的要求,所以 Routine 需要用 static 修饰。static 修饰的函数属于整个类,而不属于某个类对象,没有隐藏的 this 指针。为什么创建线程对象 Thread 要传入 this 指针作为执行例程 Routine 的参数呢?因为 Routine 是静态的成员函数,在其内部无法调用非静态成员函数。而我们需要在 Routine 中调用该类的非静态成员函数,如 Pop 获取任务。所以在创建线程需要传入 this 指针,那么在 Routine 内部就可以通过 this 指针来调用非静态成员函数了。该线程池是模板类,所以任务队列中存储的任务类型是任意的。但不管是何种任务,这些任务都需要有一个 Execute 函数接口,因为处理任务时需要调用该接口。#pragma once#include <iostream>#include <vector>#include <string>#include <queue>#include <unistd.h>#include "LockGuard.hpp"#include "Thread.hpp"#include "Task.hpp"const int g_thread_num = 3;template <class T>class ThreadPool{private: bool isEmpty() const { return _task_queue.empty(); } void Wait() { pthread_cond_wait(&_cond, &_lock); } // 从任务队列中取任务 void Pop(T &task) { task = _task_queue.front(); _task_queue.pop(); }public: ThreadPool(int num = g_thread_num) : _num(num) { pthread_mutex_init(&_lock, nullptr); pthread_cond_init(&_cond, nullptr); for (int i = 1; i <= _num; ++i) { _threads.push_back(new Thread(i, Routine, this)); } } ~ThreadPool() { pthread_mutex_destroy(&_lock); pthread_cond_destroy(&_cond); for (auto &iter : _threads) { iter->Join(); // 释放线程所占用的资源 delete iter; // 释放new出来的Thread对象 } } // 启动线程池(创建若干个线程) void Run() { for (auto &iter : _threads) { iter->Create(); } } // 线程的执行例程 static void *Routine(void *args) { ThreadData* td = (ThreadData*)args; ThreadPool<T>* self = (ThreadPool<T>*)td->_args; // 不断从任务队列中获取任务 while (true) { T task; { lockGuard lockguard(&self->_lock); // 以下代码全是临界区 while (self->isEmpty()) { self->Wait(); } // 获取任务 self->Pop(task); } // 处理任务 task.Execute(td->_name); } return nullptr; } // 往任务队列中塞任务 void Push(const T &task) { lockGuard lockguard(&_lock); _task_queue.push(task); pthread_cond_signal(&_cond); }private: std::vector<Thread *> _threads; // 保存创建好的线程 int _num; // 线程池中线程的数量 std::queue<T> _task_queue; // 任务队列 pthread_mutex_t _lock; // 保护任务队列 pthread_cond_t _cond; // 确保任务队列中有任务};
Test.cc
主线程不断地向线程池中的任务队列 Push 任务,线程池的线程从任务队列中获取任务,执行 Execute 接口处理任务。目前我们没有任务的来源,所以我们只能通过生成随机数来模拟任务的来源。等我们学习了网络部分,以后的任务就可以从网络中来。#include "ThreadPool.hpp"#include <iostream>#include <ctime>#include <cstdlib>#include <unistd.h>int main(){ srand((unsigned int)time(nullptr)); ThreadPool<Task>* tp = new ThreadPool<Task>(); tp->Run(); const char* options = "+-*/%"; while(true) { int x = rand() % 100; int y = rand() % 100; int index = rand() % 5; Task t(x, y, options[index]); // 将任务推送到线程池的任务队列中 tp->Push(t); sleep(1); } return 0;}
进程启动后,就会有四个线程,分别是一个主线程和线程池中的三个线程。主线程每隔一秒向任务队列中 Push 一个任务,这三个线程只有一个线程能够获取该任务,其他线程都会在等待队列中等待。该线程处理完任务后就会因为任务队列为空而排在队列的尾部,主线程 Push 一个任务后,等待队列头部的线程将会获取到任务并处理,如此周而复始,那么线程处理任务就呈现出一定的顺序性了。
?日志功能的实现?
日志在项目开发中是非常重要的一个功能,它可以帮助我们快速地找出程序的错误等。一个完整的日志,至少有一下功能:日志等级、时间和支持用户自定义日志信息(日志内容、文件行、文件名)等。
时间可以通过 gettimeofday 和 localtime 等函数来获取,在这里就不进行讲解了,自行上网查询。如果想让用户可以自定义日志信息,就需要借助可变参数列表,主要有以下函数。大家自行查询,在这就不赘述了。
将可变参数列表格式化打印的函数,如下图所示:
#pragma once#include <cstdio>#include <cstdarg>#include <string>#include <iostream>#include <ctime>// 日志等级#define DEBUG 0#define NORMAL 1#define WARNING 2#define ERROR 3#define FATAL 4#define LOGFILE "./ThreadPool.log"const char* levelMap[] = { "DEBUG", "NORMAL", "WARNING", "ERROR", "FATAL"};void logMessage(int level, const char* format, ...){ // 只有定义了DEBUG_SHOW,才会打印debug信息 // 利用命令行来定义即可,如-D DEBUG_SHOW#ifndef DEBUG_SHOW if(level == DEBUG) return;#endif char stdBuffer[1024]; // 标准部分 time_t timestamp = time(nullptr); // struct tm *localtime = localtime(×tamp); snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", levelMap[level], timestamp); char logBuffer[1024]; // 自定义部分 va_list args; // va_list就是char*的别名 va_start(args, format); // va_start是宏函数,让args指向参数列表的第一个位置 // vprintf(format, args); // 以format形式向显示器上打印参数列表 vsnprintf(logBuffer, sizeof logBuffer, format, args); va_end(args); // va_end将args弄成nullptr FILE* fp = fopen(LOGFILE, "a"); // printf("%s%s\n", stdBuffer, logBuffer); fprintf(fp, "%s%s\n", stdBuffer, logBuffer); // 向文件中写入日志信息 fclose(fp);}
?线程安全的单例模式?
什么是单例模式
单例模式是一种创建型设计模式,它保证一个类只有一个实例存在,并且提供一个全局访问点来访问该实例。饿汉方式和懒汉方式是两种常见的单例模式的实现方式。
单例模式的主要特点包括:
只能有一个实例。全局访问点,方便访问该实例。在很多服务器开发场景中,经常需要让服务器加载很多的数据 (上百G) 到内存中,此时往往要用一个单例的类来管理这些数据。
饿汉方式
在这种方式下,实例在类加载时就已经创建好了。这种实现方式存在一个明显的缺点,即如果实例一直没有被使用,那么空间被浪费了。不过也有不少的优点,就是简单且没有线程安全问题。饿汉方式实现的单例类在 main 函数执行之前就实例化好了,而 main 函数执行之前不存在多线程,也就不会存在线程安全问题了。
template <typename T>class Singleton {private:static T data;public:static T* GetInstance() {return &data;}};
只要通过 Singleton 这个包装类来使用 T 对象, 则一个进程中只有一个 T 对象的实例。
懒汉方式
懒汉模式的核心思想就是延迟初始化,只有在第一次访问时才会创建实例。懒汉式单例在第一次使用时才会创建实例,可以避免空间浪费的问题,但是需要注意多线程安全问题。在多线程情况下,如果没有加锁,可能会导致创建多个实例。
template <typename T>class Singleton {private:static T* inst;public:static T* GetInstance() {if (inst == nullptr) {inst = new T();} return inst;}};
懒汉方式存在线程安全问题:第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例,但是后续再次调用,就没有问题了。
线程安全的单例线程池
const int g_thread_num = 3;template <class T>class ThreadPool{private: ThreadPool(int num = g_thread_num) : _num(num) { pthread_mutex_init(&_lock, nullptr); pthread_cond_init(&_cond, nullptr); for (int i = 1; i <= _num; ++i) { _threads.push_back(new Thread(i, Routine, this)); } } // 防止拷贝 ThreadPool(const ThreadPool<T>& other) = delete; ThreadPool<T>& operator=(const ThreadPool<T>& other) = delete;public: static ThreadPool<T>* getThreadPool(int num = g_thread_num) { // 第一个判断_threadPtr是否为空,可以有效地减少未来 // 需要先申请锁再来进行判断_threadPtr是否为空的情况 // 拦截大量的已经创建好单例对象,还申请锁进行判断的情况 if(_threadPtr == nullptr) { lockGuard lockguard(&_mutex); // 创建单例对象时,需要加锁进行保护 // 防止多个线程通过了判断条件,然后 // 创建多个线程池对象 if(_threadPtr == nullptr) { _threadPtr = new ThreadPool(num); } } return _threadPtr; } ~ThreadPool() { pthread_mutex_destroy(&_lock); pthread_cond_destroy(&_cond); delete _threadPtr; for (auto &iter : _threads) { iter->Join(); // 释放线程所占用的资源 delete iter; // 释放new出来的Thread对象 } } private: volatile static ThreadPool<T>* _threadPtr; // 需要设置volatile关键字,否则可能被编译器优化 static pthread_mutex_t _mutex;};template <class T>ThreadPool<T>* ThreadPool<T>::_threadPtr = nullptr;template <typename T>pthread_mutex_t ThreadPool<T>::_mutex = PTHREAD_MUTEX_INITIALIZER;
?STL、智能指针与线程安全?
STL 中容器是线程安全的吗?不是。原因是 STL 的设计初衷就是追求性能的极致,而一旦涉及到加锁保证线程安全,会对性能造成巨大的影响。而且对于不同的容器,加锁方式的不同,性能可能也不同(例如哈希表的锁表和锁桶)。因此 STL 默认不是线程安全,如果需要在多线程环境下使用,往往需要调用者自行保证线程安全。
智能指针是线程安全的吗?unique_ptr 是和资源强关联,只是在当前代码块范围内生效,因此不涉及线程安全问题。对于 shared_ptr,多个对象需要共有一个引用计数变量,所以会存在线程安全问题。但是标准库实现的时候也考虑到了这个问题,就基于原子操作(Compare And Swap(CAS))的方式保证 shared_ptr 能够高效原子地操作引用计数。shared_ptr 是线程安全的,但不意味着对其管理的资源进行操作是线程安全的,所以对 shared_ptr 管理的资源进行操作时也可能需要进行加锁保护。
?其他常见的锁?
悲观锁:悲观锁做事比较悲观,它认为多线程同时修改共享资源的概率比较高,于是很容易出现冲突,所以访问贡献资源前,先要进行加锁保护。常见的悲观锁有:互斥锁、自旋锁和读写锁等。乐观锁:乐观锁做事比较乐观,它乐观地认为共享数据不会被其他线程修改,因此不上锁。它的工作方式是:先修改完共享数据,再判断这段时间内有没有发生冲突。如果其他线程没有修改共享数据,那么则操作成功。如果发现其他线程已经修改该共享数据,就放弃本次操作。乐观锁全程并没有加锁,所以它也叫无锁编程。乐观锁主要采取两种方式:版本号机制(Gitee等)和 CAS 操作。乐观锁虽然去除了加锁和解锁的操作,但是一旦发生冲突,重试的成本是很高的,所以只有在冲突概率非常低,且加锁成本非常高的场景下,才考虑使用乐观锁。CAS 操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。自旋锁:使用自旋锁的时候,当多线程发生竞争锁的情况时,加锁失败的线程会忙等待(这里的忙等待可以用 while 循环等待实现),直到它拿到锁。而互斥锁加锁失败后,线程会让出 CPU 资源给其他线程使用,然后该线程会被阻塞挂起。如果临界区代码执行时间过长,自旋的线程会长时间占用 CPU 资源,所以自旋的时间和临界区代码执行的时间是成正比的关系。如果临界区代码执行的时间很短,就不应该使用互斥锁,而应该选用自旋锁。因为互斥锁加锁失败,是需要发生上下文切换的,如果临界区执行的时间比较短,那可能上下文切换的时间会比临界区代码执行的时间还要长。?读者写者问题?
读写锁
读写锁由读锁和写锁两部分构成,如果只读取共享资源用读锁加锁,如果要修改共享资源则用写锁加锁。所以,读写锁适用于能明确区分读操作和写操作的场景。
读写锁的工作原理:
当写锁没有被写线程持有时,多个读线程能够并发地持有读锁,这大大提高了共享资源的访问效率。因为读锁是用于读取贡献资源的场景,所以多个线程同时持有读锁也不会破坏共享资源的数据。但是,一旦写锁被写进程持有后,读线程获取读锁的操作会被阻塞,而其它写线程的获取写锁的操作也会被阻塞。所以说,写锁是独占锁,因为任何时候都只能一个写线程持有写锁,类似于互斥锁和自旋锁,而读锁是共享锁,可以被多个读线程持有。知道了读写锁的工作原理后,我们会发现读写锁在读多写少的场景下能发挥出优势。
根据实现的不同,读写锁可以分为读优先锁和写优先锁。
读者写者问题和生产者消费者模型
读者写者问题和生产者消费者模型的本质区别就是消费者会取走数据,而读者不会取走数据。
读者写者问题代码如下图所示:
读写锁接口
初始化读写锁
pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
销毁读写锁
pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
读加锁
pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
写加锁
pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
解锁
pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
读者写者问题很明显会存在读者优先还是写者优先的问题,如果是读者优先的话,可能就会带来写者饥饿的问题。而写者优先可以保证写线程不会饿死,但如果一直有谢先成获取写锁,那么读者也会被饿死。所以使用读写锁时,需要考虑应用场景。读写锁通常用于数据被读取的频率非常高,而被修改的频率非常低。注:Linux 下的读写锁默认是读者优先的。
?总结?
本篇博客主要讲解了什么是线程池、线程池的优点、应用场景和实现、日志功能的实现以及线程安全的单例模式、悲观锁、乐观锁、自旋锁、读写锁和读者写者问题以及 STL、智能指针与线程安全等等。以上就是本篇博客的全部内容了,如果大家觉得有收获的话,可以点个三连支持一下!谢谢大家啦!??❣️