前言
项目地址
项目详细介绍
本文章适合刚学习完C++基础知识并尝试实现一个网络编程项目的同学,其中包含了该项目的代码逐行注释和解析以及许多刚学习网络编程中会遇到的疑问。
项目简介:
Linux下C++轻量级Web服务器,助力初学者快速实践网络编程,搭建属于自己的服务器.
在这个项目中,每个设计选择都针对Web服务器的实际需求,确保其能够在真实环境中高效、稳定地运行。让我们逐点来看并了解一些基础概念:
1. 线程池 + 非阻塞socket + epoll(ET和LT均实现) + 事件处理(Reactor和模拟Proactor均实现) 的并发模型
实际运用与意义:
线程池:通过预先创建一组线程,避免了高并发下频繁创建和销毁线程的开销。线程池可以高效地处理多个客户端请求,提升服务器的响应速度和资源利用率。
现实意义:这样设计的服务器可以在面对大量并发请求时依然保持高效稳定,不会因为频繁的线程操作而降低性能。
非阻塞socket:允许服务器在处理客户端请求时,不会因为某个操作(比如等待数据)而卡住整个程序。非阻塞IO能够让服务器同时处理多个连接,从而提高吞吐量。
现实意义:这种设计确保了服务器在等待某些事件(如数据到达)时,不会停下来,而是可以继续处理其他请求,使得服务器能够高效处理多个并发连接。
epoll(ET和LT):epoll是Linux下非常高效的I/O多路复用技术。用我的理解简单来说就是占用一个或几个专用的线程来实现I/O操作的多路复用(即同时监听成百上千个客户端连接的请求和响应,而无需为每个连接创建一个线程。),然后把复杂的I/O操作交给线程池里的其他线程。ET(边缘触发)和LT(水平触发)是两种事件通知模式。ET模式减少了重复通知的开销,提高了性能;LT模式则更为安全,适合稳定运行。
现实意义:通过同时实现ET和LT,程序可以灵活应对不同的需求。ET模式可以在高负载下减少系统调用次数,LT模式则确保事件不遗漏,特别是在复杂场景下更为可靠。
Reactor和模拟Proactor模型:这篇博客讲的比较清楚 Reactor 可以理解为「来了事件操作系统通知应用进程,让应用进程来处理」,而 Proactor 可以理解为「来了事件操作系统来处理,处理完再通知应用进程」
简单列表对比Reactor 和 Proactor 模型
特性 | Reactor 模型 | Proactor 模型 |
---|---|---|
发起 I/O 操作 | 应用程序 | 应用程序 |
处理 I/O 操作 | 应用程序 | 操作系统 |
I/O 操作是否同步 | 同步非阻塞 | 异步 |
通知时机 | I/O 准备就绪 | I/O 操作完成 |
应用场景 | 多数 Unix 系统、Java NIO | Windows IOCP、Boost.Asio |
难易程度 | 相对较简单 | 实现复杂,需要依赖异步 I/O 支持 |
性能表现 | 中等(需要应用程序主动参与 I/O 操作) | 高(I/O 操作由操作系统负责) |
现实意义:通过实现这两种模型,程序可以根据不同的场景选择最合适的并发处理方式。Reactor适合处理事件驱动的应用,模拟Proactor则更能体现服务器在处理完成时再执行相关操作的特点。
2. 使用状态机解析HTTP请求报文,支持解析GET和POST请求
实际运用与意义:
状态机解析HTTP请求:状态机是一种将复杂任务分解为不同状态并根据状态转移来处理的模型。通过状态机,服务器可以精准、有效地解析HTTP请求,不管是简单的GET请求还是复杂的POST请求,都能正确解析处理。
现实意义:使用状态机可以让服务器的请求解析过程更加清晰和高效,减少错误处理,提高响应速度。对于Web服务器来说,这种设计非常重要,因为它能确保服务器在处理不同类型请求时都能准确执行。
3. 访问服务器数据库实现web端用户注册、登录功能,可以请求服务器图片和视频文件
实际运用与意义:
数据库交互:服务器通过与数据库交互,实现用户的注册和登录功能。这是Web应用中最常见的功能之一,也是用户与服务器交互的核心。
现实意义:这不仅展示了服务器的基本功能,还使项目更加实用,能够处理实际Web服务需求,如用户管理和数据存储。能够提供图片和视频文件的请求处理展示了服务器处理静态资源的能力。
4. 实现同步/异步日志系统,记录服务器运行状态
实际运用与意义:
日志系统:日志系统是服务器的重要组成部分,用于记录运行时的各种信息,如错误、请求、状态等。同步日志可以实时记录,而异步日志则可以在不影响主线程的情况下批量处理日志。
现实意义:通过日志系统,开发者可以监控服务器的运行状态,发现和定位问题,提高调试效率。异步日志在高并发场景下尤为重要,能够确保记录日志的同时,不会阻塞主要的请求处理线程。
5. 经Webbench压力测试可以实现上万的并发连接数据交换
实际运用与意义:
Webbench压力测试:Webbench是一种常用的压力测试工具,用于测试Web服务器的性能。通过该工具,项目展示了服务器能够在高并发环境下处理上万连接的能力。
现实意义:这一点直接证明了该服务器的高效性和可靠性。能够在高负载下依然保持良好的性能,体现了这个项目在实际应用中的价值。
接下来便开始正式的源码分析:
源码详细分析
项目路径如下:
1.webserver.cpp
完整注释版:
#include "webserver.h"//构造函数WebServer::WebServer(){ //创建客户端连接数组 users = new http_conn[MAX_FD]; //创建root文件夹路径 char server_path[200]; getcwd(server_path, 200); char root[6] = "/root"; m_root = (char *)malloc(strlen(server_path) + strlen(root) + 1); strcpy(m_root, server_path); strcat(m_root, root); //创建定时器 users_timer = new client_data[MAX_FD];}//析构函数WebServer::~WebServer(){ //释放epoll文件描述符 close(m_epollfd); //释放监听文件描述符 close(m_listenfd); //释放管道读端 close(m_pipefd[1]); //释放管道写端 close(m_pipefd[0]); //删除客户端连接及定时器数组 delete[] users; delete[] users_timer; //删除线程池 delete m_pool;}//初始化函数(服务器相关参数)void WebServer::init(int port, string user, string passWord, string databaseName, int log_write, int opt_linger, int trigmode, int sql_num, int thread_num, int close_log, int actor_model){ //初始化端口 m_port = port; //数据库用户名 m_user = user; //数据库密码 m_passWord = passWord; //数据库名 m_databaseName = databaseName; //数据库连接池数量 m_sql_num = sql_num; //线程池的数量 m_thread_num = thread_num; //日志写入的方式 同步or异步 m_log_write = log_write; //用于控制连接关闭的变量 m_OPT_LINGER = opt_linger; //控制监听和连接采用ET or LT触发模式 m_TRIGMode = trigmode; //日志的开关 0为开 m_close_log = close_log; //控制事件处理的模型为Reactor还是Proactor m_actormodel = actor_model;}//通过m_TRIGMode选择监听和连接的触发模式 LT: Level Triggered ET:Edge Triggeredvoid WebServer::trig_mode(){ //LT + LT if (0 == m_TRIGMode) { m_LISTENTrigmode = 0; m_CONNTrigmode = 0; } //LT + ET else if (1 == m_TRIGMode) { m_LISTENTrigmode = 0; m_CONNTrigmode = 1; } //ET + LT else if (2 == m_TRIGMode) { m_LISTENTrigmode = 1; m_CONNTrigmode = 0; } //ET + ET else if (3 == m_TRIGMode) { m_LISTENTrigmode = 1; m_CONNTrigmode = 1; }}//初始化日志void WebServer::log_write(){ if (0 == m_close_log) { //m_log_write控制异步还是同步 if (1 == m_log_write) Log::get_instance()->init("./ServerLog", m_close_log, 2000, 800000, 800); else //每写一次刷新一次log Log::get_instance()->init("./ServerLog", m_close_log, 2000, 800000, 0); }}//初始化数据库池void WebServer::sql_pool(){ //初始化数据库连接池 m_connPool = connection_pool::GetInstance(); m_connPool->init("localhost", m_user, m_passWord, m_databaseName, 3306, m_sql_num, m_close_log); //初始化数据库读取表 users->initmysql_result(m_connPool);}//初始化线程池void WebServer::thread_pool(){ //(事件相应模型,数据库连接池,线程池中的线程数量) m_pool = new threadpool<http_conn>(m_actormodel, m_connPool, m_thread_num);}//配置监听套接字和管道void WebServer::eventListen(){ //创建套接字 PF_INET=IPV4 SOCK_STREAM=TCP m_listenfd = socket(PF_INET, SOCK_STREAM, 0); //检查是否创建成功 assert(m_listenfd >= 0); //优雅关闭连接 if (0 == m_OPT_LINGER) { struct linger tmp = {0, 1}; //setsockopt函数设置套接字的相关属性 setsockopt(m_listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof(tmp)); } else if (1 == m_OPT_LINGER) { struct linger tmp = {1, 1}; setsockopt(m_listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof(tmp)); } //用于捕获和检查绑定和监听是否成功 int ret = 0; //该结构体定义在头文件 <netinet/in.h> 中 用于存放IP地址和端口号 struct sockaddr_in address; //清除内存中的垃圾,即结构体中的非0随机值 bzero(&address, sizeof(address)); //表示使用的是IPV4地址 address.sin_family = AF_INET; //存放IPV4地址具体的值 这里的htonl(INADDR_ANY)绑定了所有本地IP地址 address.sin_addr.s_addr = htonl(INADDR_ANY); //存放端口号 htons(m_port) 即Host TO Network Short 设置监听的端口号的过程中将主机字节序的端口号转换为网络字节序。 address.sin_port = htons(m_port); //1即启用SO_REUSEEADDR允许重用本地地址 使得在重启服务器后快速重新绑定原来的端口 int flag = 1; setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)); //绑定套接字到本地IP地址 ret = bind(m_listenfd, (struct sockaddr *)&address, sizeof(address)); //是否绑定成功 assert(ret >= 0); //设置套接字为监听模式,最多5个连接请求排队 ret = listen(m_listenfd, 5); //检测是否成功 assert(ret >= 0); //初始化定时器 设置最小超时单位 utils.init(TIMESLOT); //存储事件信息结构体的数组 epoll_event events[MAX_EVENT_NUMBER]; //epoll创建内核事件表 管理着所有的文件描述符及其事件 m_epollfd = epoll_create(5); //检查是否创建成功 assert(m_epollfd != -1); //添加监听描述符到epoll事件表 //这里的这个布尔值决定是否启用 EPOLLONESHOT 模式。 //如果为 true,则在这个文件描述符上触发一个事件后,epoll 不会再次监视这个文件描述符,直到你手动重置它。这在多线程环境中非常有用,避免同一个文件描述符的事件被多个线程处理。 //如果为 false,则 epoll 在每次该文件描述符上有事件发生时都会通知你。 utils.addfd(m_epollfd, m_listenfd, false, m_LISTENTrigmode); //设置客户端连接的epoll http_conn::m_epollfd = m_epollfd; //创建用于信号处理的套接字(管道) 这两个套接字 sv[0] 和 sv[1] 之间可以进行双向通信。socketpair 通常用于进程间通信 ret = socketpair(PF_UNIX, SOCK_STREAM, 0, m_pipefd); assert(ret != -1); //将管道的写段设置为非阻塞 避免写操作阻塞进程:如果 m_pipefd[1] 是阻塞的,当你尝试写入数据时,如果管道已满(即 m_pipefd[0] 没有被读取),写操作将阻塞进程,直到有足够的空间可用。这可能导致程序挂起,等待管道缓冲区变得可写。通过将 m_pipefd[1] 设置为非阻塞模式,如果管道已满,写操作将立即返回失败(通常返回 EAGAIN 或 EWOULDBLOCK),程序可以处理这个情况而不被阻塞。 utils.setnonblocking(m_pipefd[1]); //将管道的读端添加到epoll 使其可以通过信号驱动 utils.addfd(m_epollfd, m_pipefd[0], false, 0); //设置对关闭信号的处理(ignore) utils.addsig(SIGPIPE, SIG_IGN); //设置对定时器触发信号的处理() utils.addsig(SIGALRM, utils.sig_handler, false); utils.addsig(SIGTERM, utils.sig_handler, false); //设置定时器 每隔TIMESLOT秒发送一次SIGALRM用于检查超时 alarm(TIMESLOT); //工具类,信号和描述符基础操作 Utils::u_pipefd = m_pipefd; Utils::u_epollfd = m_epollfd;}//定时器相关函数 参数为(连接描述符,客户端的网络信息结构体)void WebServer::timer(int connfd, struct sockaddr_in client_address){ //初始客户端相关信息 users[connfd].init(connfd, client_address, m_root, m_CONNTrigmode, m_close_log, m_user, m_passWord, m_databaseName); //初始化client_data数据 users_timer[connfd].address = client_address; users_timer[connfd].sockfd = connfd; //创建定时器 util_timer *timer = new util_timer; //绑定用户数据 timer->user_data = &users_timer[connfd]; //设置回调函数,即定时器超时用cb_func来处理 timer->cb_func = cb_func; //获取当前时间 time_t cur = time(NULL); //超时时间为当前时间+3*TIMESLOT timer->expire = cur + 3 * TIMESLOT; //将定时器与用户数据关联 users_timer[connfd].timer = timer; //将定时器添加到定时器链表中,以便定时器可以参与时间轮机制或其他定时器管理机制。 utils.m_timer_lst.add_timer(timer);}//调整定时器void WebServer::adjust_timer(util_timer *timer){ //若有数据传输,则将定时器往后延迟3个单位 time_t cur = time(NULL); timer->expire = cur + 3 * TIMESLOT; //并对新的定时器在链表上的位置进行调整 utils.m_timer_lst.adjust_timer(timer); //写入log LOG_INFO("%s", "adjust timer once");}//回调函数的调用(定时器,客户端的已有连接描述符)void WebServer::deal_timer(util_timer *timer, int sockfd){ //调用回调函数 timer->cb_func(&users_timer[sockfd]); //避免删除一个空指针 if (timer) { utils.m_timer_lst.del_timer(timer); } //写入log LOG_INFO("close fd %d", users_timer[sockfd].sockfd);}//根据监听模式处理客户端连接bool WebServer::dealclientdata(){ struct sockaddr_in client_address; //知道客户端地址结构体大小,便于使用accept() socklen_t client_addrlength = sizeof(client_address); //LT触发模式下的accept if (0 == m_LISTENTrigmode) { int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength); //记录出错的log if (connfd < 0) { LOG_ERROR("%s:errno is:%d", "accept error", errno); return false; } //当前排队的客户端连接已到峰值 if (http_conn::m_user_count >= MAX_FD) { //向客户端发送busy信息 utils.show_error(connfd, "Internal server busy"); LOG_ERROR("%s", "Internal server busy"); return false; } //为这个新连接设置一个定时器 timer(connfd, client_address); } //ET模式处理监听事件 else { //一次性接收所有客户端连接 while (1) { int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength); if (connfd < 0) { LOG_ERROR("%s:errno is:%d", "accept error", errno); break; } if (http_conn::m_user_count >= MAX_FD) { utils.show_error(connfd, "Internal server busy"); LOG_ERROR("%s", "Internal server busy"); break; } timer(connfd, client_address); } return false; } return true;}//信号处理函数(服务器操作系统层面上的通知)bool WebServer::dealwithsignal(bool &timeout, bool &stop_server){ int ret = 0; int sig; char signals[1024]; //从管道中读取信号 将其存储在singals数组中 ret = recv(m_pipefd[0], signals, sizeof(signals), 0); //出错 if (ret == -1) { return false; } //读完 else if (ret == 0) { return false; } // else { for (int i = 0; i < ret; ++i) { //逐个处理信号 switch (signals[i]) { case SIGALRM: { timeout = true; break; } case SIGTERM: { stop_server = true; break; } } } } return true;}//处理客户端读事件void WebServer::dealwithread(int sockfd){ //获取当前客户端的定时器 util_timer *timer = users_timer[sockfd].timer; //reactor模式 以事件驱动 if (1 == m_actormodel) { //调整定时器时间 if (timer) { adjust_timer(timer); } //若监测到读事件,将该事件放入请求队列 m_pool->append(users + sockfd, 0); //不断循环询问客户端该事件是否结束 while (true) { //improv为1代表事件处理完成 if (1 == users[sockfd].improv) { //timer_fkag代表定时器事件触发 需要deal_timer处理如超时或客户端关闭连接 if (1 == users[sockfd].timer_flag) { //调用回调函数 deal_timer(timer, sockfd); users[sockfd].timer_flag = 0; } users[sockfd].improv = 0; break; } } } else //proactor模式 { //服务器让操作系统来完成读取验证 而不是让应用程序自己去做读取 if (users[sockfd].read_once()) { LOG_INFO("deal with the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr)); //将该事件放入请求队列 m_pool->append_p(users + sockfd); //同Reactor if (timer) { adjust_timer(timer); } } else { //服务器读不出来就调用回调函数结束该事件 deal_timer(timer, sockfd); } }}//处理客户断写事件 同上void WebServer::dealwithwrite(int sockfd){ util_timer *timer = users_timer[sockfd].timer; //reactor if (1 == m_actormodel) { if (timer) { adjust_timer(timer); } m_pool->append(users + sockfd, 1); while (true) { if (1 == users[sockfd].improv) { if (1 == users[sockfd].timer_flag) { deal_timer(timer, sockfd); users[sockfd].timer_flag = 0; } users[sockfd].improv = 0; break; } } } else { //proactor if (users[sockfd].write()) { LOG_INFO("send data to the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr)); if (timer) { adjust_timer(timer); } } else { deal_timer(timer, sockfd); } }}//服务器主事件循环void WebServer::eventLoop(){ //控制超时事件 bool timeout = false; //主循环的开关 bool stop_server = false; while (!stop_server) { //epoll_wait阻塞(监听)已注册的套接字 (epoll套接字 epoll内核表,最大事件数量,-1代表无限等待) int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1); //检测epoll_wait是否出错 { LOG_ERROR("%s", "epoll failure"); break; } for (int i = 0; i < number; i++) { //哪个套接字发生了事件 int sockfd = events[i].data.fd; //如果是监听套接字,则处理新的客户端连接 if (sockfd == m_listenfd) { //判断处理是否成功 调用处理客户端连接函数 bool flag = dealclientdata(); if (false == flag) continue; } //客户端有无异常断开、挂起、错误 else if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) { //服务器端关闭连接,移除对应的定时器 util_timer *timer = users_timer[sockfd].timer; deal_timer(timer, sockfd); } //当前事件是否来自管道中的信号 else if ((sockfd == m_pipefd[0]) && (events[i].events & EPOLLIN)) { //处理信号 超时或关闭 bool flag = dealwithsignal(timeout, stop_server); //处理失败时的log if (false == flag) LOG_ERROR("%s", "dealclientdata failure"); } //当前事件是否为读事件 else if (events[i].events & EPOLLIN) { dealwithread(sockfd); } //当前事件是否为写事件 else if (events[i].events & EPOLLOUT) { dealwithwrite(sockfd); } } //超时事件 if (timeout) { utils.timer_handler(); LOG_INFO("%s", "timer tick"); timeout = false; } }}
代码详细解析版:
1.1 头文件和构造函数
#include "webserver.h"WebServer::WebServer(){ // 创建http_conn类对象处理客户端连接 users = new http_conn[MAX_FD]; // 生成root文件夹路径 char server_path[200]; getcwd(server_path, 200); // 获取当前工作目录 char root[6] = "/root"; m_root = (char *)malloc(strlen(server_path) + strlen(root) + 1); strcpy(m_root, server_path); strcat(m_root, root); // 定时器 users_timer = new client_data[MAX_FD];}
#include "webserver.h"
:包含WebServer类的声明。WebServer::WebServer()
:初始化WebServer对象。 users = new http_conn[MAX_FD];
:创建一个http_conn
对象数组,用于存储客户端连接。getcwd(server_path, 200);
:获取当前工作目录。设置root文件夹路径:将当前工作目录和"/root"
拼接成新的字符串,存储在m_root
中。使用strcpy(m_root, server_path)将当前工作目录的路径复制到m_root中。使用strcat(m_root, root)将/root附加到当前工作目录路径的末尾。users_timer = new client_data[MAX_FD];
:创建一个client_data
对象数组,用于管理客户端定时器。 1.2 析构函数
WebServer::~WebServer(){ close(m_epollfd); close(m_listenfd); close(m_pipefd[1]); close(m_pipefd[0]); delete[] users; delete[] users_timer; delete m_pool;}
析构函数WebServer::~WebServer()
:释放资源。 关闭epoll文件描述符、监听文件描述符、管道文件描述符。删除动态分配的users
和users_timer
数组。删除线程池对象m_pool
。 tips:在析构函数中,close(m_pipefd[1]);
和 close(m_pipefd[0]);
的作用是关闭管道的写端和读端。这是清理资源的步骤,以确保在对象销毁时释放操作系统的文件描述符资源,避免文件描述符泄漏。
关于管道的具体分析如下:
1.2.1 管道(Pipe)的作用
管道是一种进程间通信机制,通常用于父子进程或线程之间传递数据。在这段服务器代码中,管道主要用于信号处理(如通过管道传递信号事件)。管道有两个端点:一个是写端(m_pipefd[1]
),另一个是读端(m_pipefd[0]
)。服务器中的信号处理通常是在信号处理程序中将信号信息写入管道,然后主循环通过epoll
监控管道的读端,来处理这些信号。 之所以需要管道进行信号处理是因为信号处理程序是异步执行的,只能做简单的操作,而主循环通常被epoll_wait
阻塞,无法及时响应信号;通过管道,信号处理程序可以安全地通知主循环有信号事件发生,epoll
可以监控管道的读端,当有数据可读时立即返回并处理,从而实现安全、高效的信号与主循环通信。
具体来说,epoll_wait
是一种用于监听多个文件描述符上事件的系统调用,它会阻塞程序的执行,直到至少有一个被监听的文件描述符上有事件发生。阻塞意味着程序在调用epoll_wait
时会暂停执行,直到有事件发生或超时。这种设计有利于提高效率,因为CPU不需要空转等待事件发生。
当程序在epoll_wait
中阻塞时,如果没有发生任何预期的IO事件(如网络数据到达),程序就不会继续执行其他代码。这时,如果有信号到来,信号处理程序虽然会被触发执行,但因为它只能做一些简单的工作,不能直接处理复杂的逻辑。
为了让主循环知道信号已经到来并及时响应(如关闭服务器、处理特殊任务),我们需要打破epoll_wait
的阻塞,让程序能够恢复执行。在信号处理程序中写入管道,可以让管道上产生一个可读事件,epoll_wait
会检测到这个事件,立即返回,从而让主循环有机会处理信号。
因此,管道用于信号处理的目的是为了在epoll_wait
阻塞时,让主循环能够及时响应信号事件。
1.2.2 为什么需要显式关闭?
管道本质上也是一种文件描述符,操作系统会分配有限数量的文件描述符给每个进程。如果不及时关闭,文件描述符的数量会耗尽,从而导致程序无法再创建新的文件描述符(例如,无法接受新的网络连接)。在析构函数中,显式关闭管道可以确保在对象销毁时,管道资源被正确地释放,避免潜在的资源泄露问题。1.3 初始化函数
void WebServer::init(int port, string user, string passWord, string databaseName, int log_write, int opt_linger, int trigmode, int sql_num, int thread_num, int close_log, int actor_model){ m_port = port; m_user = user; m_passWord = passWord; m_databaseName = databaseName; m_sql_num = sql_num; m_thread_num = thread_num; m_log_write = log_write; m_OPT_LINGER = opt_linger; m_TRIGMode = trigmode; m_close_log = close_log; m_actormodel = actor_model;}
初始化函数WebServer::init
:初始化服务器的各项参数。 设置端口号、数据库用户名和密码、数据库名、日志写入方式、关闭连接选项、触发模式、数据库连接池大小、线程池大小、日志关闭选项、事件模型等。 1.4 触发模式函数
void WebServer::trig_mode(){ // LT + LT if (0 == m_TRIGMode) { m_LISTENTrigmode = 0; m_CONNTrigmode = 0; } // LT + ET else if (1 == m_TRIGMode) { m_LISTENTrigmode = 0; m_CONNTrigmode = 1; } // ET + LT else if (2 == m_TRIGMode) { m_LISTENTrigmode = 1; m_CONNTrigmode = 0; } // ET + ET else if (3 == m_TRIGMode) { m_LISTENTrigmode = 1; m_CONNTrigmode = 1; }}
触发模式函数WebServer::trig_mode
:设置监听和连接的触发模式。 LT(水平触发):0ET(边缘触发):1根据m_TRIGMode
的值来设置监听和连接的触发模式。 WebServer::trig_mode
函数用于设置服务器的监听(m_LISTENTrigmode
)和连接(m_CONNTrigmode
)的触发模式。触发模式有两种:LT(水平触发,Level-Triggered)和ET(边缘触发,Edge-Triggered),分别用0
和1
表示。
根据m_TRIGMode
的值,trig_mode
函数将决定监听和连接操作的触发模式:
m_TRIGMode == 0
: 监听和连接均采用LT(水平触发)。m_TRIGMode == 1
: 监听采用LT,连接采用ET(边缘触发)。m_TRIGMode == 2
: 监听采用ET,连接采用LT。m_TRIGMode == 3
: 监听和连接均采用ET。 1.4.1 水平触发(LT)和边缘触发(ET)的区别:
这两种触发模式是针对I/O事件的不同处理方式,通常用于 epoll
或者 select
/poll
等 I/O 多路复用机制。
LT(Level-Triggered,水平触发):
工作方式:在水平触发模式下,只要文件描述符上还有数据未处理,epoll
会反复通知应用程序。因此,只要某个事件没有被处理,下一次调用epoll_wait
时,仍会返回该事件。特点: 容易编程,适合大部分场景。可能导致重复处理同一事件。 场景:适用于要求及时处理事件的场景,编程简单,但效率相对较低。 ET(Edge-Triggered,边缘触发):
工作方式:在边缘触发模式下,epoll
只会在文件描述符状态发生变化时通知应用程序,且只通知一次。如果应用程序没有在第一次通知时处理完所有数据,后续epoll_wait
不会再通知该事件,除非状态再次发生变化。特点: 更高效,减少了系统调用次数。编程复杂,需要确保一次性处理所有数据,否则可能会错过事件。 场景:适用于高性能、高并发服务器,需要精确控制I/O操作。 1.5 日志写入函数
void WebServer::log_write(){ if (0 == m_close_log) { // 初始化日志 if (1 == m_log_write) Log::get_instance()->init("./ServerLog", m_close_log, 2000, 800000, 800); else Log::get_instance()->init("./ServerLog", m_close_log, 2000, 800000, 0); }}
日志写入函数WebServer::log_write
:初始化日志系统。 根据m_log_write
的值来选择不同的日志初始化方式。 Log::get_instance()->init("./ServerLog", m_close_log, 2000, 800000, 800);
如果m_log_write
等于1,调用日志系统的init
函数来初始化日志文件。这里使用的是异步日志模式,参数说明如下: "./ServerLog"
:日志文件的路径,日志将写入到当前目录下的ServerLog
文件中。m_close_log
:传递日志开关变量,这里是0,表示日志功能开启。2000
:日志队列最大长度,代表日志的最大条目数。800000
:日志文件的最大大小,单位是字节。日志文件达到此大小后,可能会进行滚动或创建新日志文件。800
:表示日志的刷新频率,通常用于异步日志模式下的刷新间隔(即多长时间刷新一次日志到文件)。 else Log::get_instance()->init("./ServerLog", m_close_log, 2000, 800000, 0);
如果m_log_write
不等于1,执行这个else
分支,调用init
函数初始化日志文件。这时的最后一个参数是0
,这表示同步日志模式,即每次写入日志都会立即刷新到文件,而不是等待一段时间后批量刷新。 1.6 数据库连接池初始化
void WebServer::sql_pool(){ // 初始化数据库连接池 m_connPool = connection_pool::GetInstance(); m_connPool->init("localhost", m_user, m_passWord, m_databaseName, 3306, m_sql_num, m_close_log); // 初始化数据库读取表 users->initmysql_result(m_connPool);}
数据库连接池初始化函数WebServer::sql_pool
:初始化数据库连接池并读取数据库表。 获取数据库连接池实例,并进行初始化。调用http_conn
对象的initmysql_result
方法,初始化数据库读取表。 tips:在我看来 连接池这个概念的作用就类似于缓存,可以理解为,连接池和缓存都旨在提高系统的性能和效率,但它们处理的对象和应用场景不同。
1.7 线程池初始化
void WebServer::thread_pool(){ // 线程池 m_pool = new threadpool<http_conn>(m_actormodel, m_connPool, m_thread_num);}
线程池初始化函数WebServer::thread_pool
:创建并初始化线程池对象m_pool
。在这段代码中,threadpool<http_conn> 使用了模板类 threadpool,并且将 http_conn 作为模板参数传递给它。
T 是一个模板参数,可以是任何类型。在这段代码中,T 被替换为 http_conn,表示线程池中的任务将处理 http_conn 类型的对象。
1.7 配置监听套接字和信号传输管道
void WebServer::eventListen(){ // 网络编程基础步骤 m_listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(m_listenfd >= 0); // 优雅关闭连接 if (0 == m_OPT_LINGER) { struct linger tmp = {0, 1}; setsockopt(m_listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof(tmp)); } else if (1 == m_OPT_LINGER) { struct linger tmp = {1, 1}; setsockopt(m_listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof(tmp)); } int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; address.sin_addr.s_addr = htonl(INADDR_ANY); address.sin_port = htons(m_port); int flag = 1; setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)); ret = bind(m_listenfd, (struct sockaddr *)&address, sizeof(address)); assert(ret >= 0); ret = listen(m_listenfd, 5); assert(ret >= 0); utils.init(TIMESLOT); // epoll创建内核事件表 epoll_event events[MAX_EVENT_NUMBER]; m_epollfd = epoll_create(5); assert(m_epollfd != -1); utils.addfd(m_epollfd, m_listenfd, false, m_LISTENTrigmode); http_conn::m_epollfd = m_epollfd; ret = socketpair(PF_UNIX, SOCK_STREAM, 0, m_pipefd); assert(ret != -1); utils.setnonblocking(m_pipefd[1]); utils.addfd(m_epollfd, m_pipefd[0], false, 0); utils.addsig(SIGPIPE, SIG_IGN); utils.addsig(SIGALRM, utils.sig_handler, false); utils.addsig(SIGTERM, utils.sig_handler, false); alarm(TIMESLOT); // 工具类,信号和描述符基础操作 Utils::u_pipefd = m_pipefd; Utils::u_epollfd = m_epollfd;}
这段代码是一个Web服务器的事件监听函数 eventListen()
,用于设置网络通信的基础环境,创建监听套接字、设置连接选项、初始化 epoll
事件表,并设置必要的信号处理。下面逐行解释这个函数的工作原理:
1.7.1 创建监听套接字
m_listenfd = socket(PF_INET, SOCK_STREAM, 0);assert(m_listenfd >= 0);
socket(PF_INET, SOCK_STREAM, 0);
:创建一个TCP套接字。 PF_INET
表示使用IPv4协议。SOCK_STREAM
表示使用面向连接的TCP协议。0
表示协议选择默认的传输协议(TCP)。 assert(m_listenfd >= 0);
:检查套接字创建是否成功。如果 m_listenfd
小于0,表示创建失败,程序会在这里终止。 1.7.2 设置优雅关闭连接
if (0 == m_OPT_LINGER){ struct linger tmp = {0, 1}; setsockopt(m_listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof(tmp));}else if (1 == m_OPT_LINGER){ struct linger tmp = {1, 1}; setsockopt(m_listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof(tmp));}
这里的 SO_LINGER
选项用于控制关闭套接字时的行为。 m_OPT_LINGER
是一个标志变量,控制 SO_LINGER
的行为。 struct linger tmp = {0, 1};
:如果 m_OPT_LINGER
为0,SO_LINGER
的延时关闭行为被禁用。struct linger tmp = {1, 1};
:如果 m_OPT_LINGER
为1,则套接字关闭时会在 SO_LINGER
(1秒内) 时间内尝试发送剩余数据。 1.7.3. 绑定地址并监听
int ret = 0;struct sockaddr_in address;//在网络编程中,sockaddr_in 结构体用来存储地址信息(例如IP地址和端口)。为了避免结构体中的某些未初始化的成员包含随机值(即内存中的“垃圾”数据),在使用 address 结构体之前,通常先将它的所有字节清零,这样可以确保结构体中的所有字段初始值为0。bzero(&address, sizeof(address));address.sin_family = AF_INET;address.sin_addr.s_addr = htonl(INADDR_ANY);address.sin_port = htons(m_port);int flag = 1;setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));ret = bind(m_listenfd, (struct sockaddr *)&address, sizeof(address));assert(ret >= 0);ret = listen(m_listenfd, 5);assert(ret >= 0);
sockaddr_in
结构体用于存储IP地址和端口信息。
sin_family = AF_INET;
:设置地址族为IPv4。sin_addr.s_addr = htonl(INADDR_ANY);
:绑定到本地所有IP地址(设置了服务器要监听的IP地址)。 tips:
绑定到本地所有IP地址的含义:
在网络编程中,一个服务器通常需要绑定到一个特定的IP地址和端口,以便能够接收来自客户端的连接请求。INADDR_ANY 是一个特殊的常量,用于表示“本地所有IP地址”。
INADDR_ANY: 当服务器的套接字绑定到 INADDR_ANY 时,意味着服务器会监听本地机器上所有的网络接口(如Wi-Fi、以太网、回环地址等)的IP地址。换句话说,不论客户端连接到本地机器的哪个IP地址,服务器都能接受到请求。
sin_port = htons(m_port);
:设置了服务器要监听的端口号,并在设置的过程中将主机字节序的端口号转换为网络字节序。 这一步的作用是将端口号从主机字节序(通常是小端序)转换为网络字节序(大端序),并将其绑定到套接字,以确保端口号在网络上传输时能够被正确识别。
tips:
sin_port
: 这是 sockaddr_in
结构体中的一个字段,用于存储端口号。这个结构体常用于指定服务器绑定的IP地址和端口。
htons(m_port)
:
htons
是 “Host TO Network Short” 的缩写,它将主机字节序的端口号转换为网络字节序。
不同计算机架构可能使用不同的字节序来表示数据。在网络通信中,数据必须按照统一的字节序进行传输,通常使用大端序(网络字节序)。
通过 htons
函数,确保在不同架构之间传输的端口号能够被正确理解,在网络通信中,统一的字节序是必要的,以确保不同主机之间的互操作性。通过使用 htons
,开发者可以确保本地主机上的端口号在网络上传输时能够被远端主机正确解析。
setsockopt(SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));
:设置 SO_REUSEADDR
选项,允许重用本地地址。 tips:结合场景来理解什么是重用本地地址
场景1:快速重启服务器
你在电脑上运行了一个程序(比如服务器),它占用了一个端口号(比如8080)。当你关闭这个程序时,系统会“保留”这个端口一段时间,防止旧的网络数据混乱。在这段时间里,你无法立即重新启动服务器去占用同样的端口,系统会报错说“端口被占用了”。
但是如果设置了 SO_REUSEADDR,你可以立刻再次使用这个端口,程序可以顺利重启,不用等待系统释放端口。
场景2:多个程序同时监听同一个端口
假设你有两个程序,它们都需要监听同一个端口(比如8080),但分别处理不同的IP地址。一般情况下,系统不允许两个程序同时占用一个端口,但 SO_REUSEADDR 允许这种“共享”情况。
bind()
函数将套接字绑定到指定的IP地址和端口。listen()
函数使套接字进入监听状态,准备接收连接。 1.7.4 初始化定时器
utils.init(TIMESLOT);
初始化定时器工具 utils
,并设置时间间隔 TIMESLOT
,通常用于管理连接的超时事件。 1.7.5 创建 epoll
内核事件表
epoll_event events[MAX_EVENT_NUMBER];m_epollfd = epoll_create(5);assert(m_epollfd != -1);
epoll_create(5);
:创建一个 epoll
事件表,参数 5
是提示内核事件表的大小,但实际上Linux 2.6.8之后的内核忽略了这个参数。assert(m_epollfd != -1);
:确保 epoll
创建成功。 1.7.6 添加监听文件描述符到 epoll
事件表
utils.addfd(m_epollfd, m_listenfd, false, m_LISTENTrigmode);http_conn::m_epollfd = m_epollfd;
将监听套接字 m_listenfd
添加到 epoll
事件表中,以便 epoll
可以监控这个套接字的事件(如新连接到来)。http_conn::m_epollfd = m_epollfd;
:将 epoll
文件描述符存储在静态变量中,以便HTTP连接可以使用。 1.7.7 创建用于信号通信的管道
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, m_pipefd);assert(ret != -1);utils.setnonblocking(m_pipefd[1]);utils.addfd(m_epollfd, m_pipefd[0], false, 0);
socketpair(PF_UNIX, SOCK_STREAM, 0, m_pipefd);
:创建一个双向通信的UNIX域套接字对 m_pipefd
,用于进程内部通信。将管道的读端 m_pipefd[0]
添加到 epoll
事件表中,以便可以通过信号驱动 epoll
的事件循环。 tips:
关于通道的读端和写端
socketpair
函数创建的是一对全双工的套接字,这对套接字之间可以相互通信。尽管每个端都可以同时进行读写,但在实际应用中,通常将一个端用于读取,另一个端用于写入,形成单向数据流。这种方式使得编程和理解逻辑更加清晰。
在常见的使用模式中:
m_pipefd[1]
被用于写入数据。m_pipefd[0]
被用于读取数据。 这种分工明确的方式使得数据流动更容易控制和管理。
为什么要将 m_pipefd[1]
写端设置为非阻塞?
将写端 m_pipefd[1]
设置为非阻塞有以下几个原因:
避免写操作阻塞进程:
如果m_pipefd[1]
是阻塞的,当你尝试写入数据时,如果管道已满(即 m_pipefd[0]
没有被读取),写操作将阻塞进程,直到有足够的空间可用。这可能导致程序挂起,等待管道缓冲区变得可写。通过将 m_pipefd[1]
设置为非阻塞模式,如果管道已满,写操作将立即返回失败(通常返回 EAGAIN
或 EWOULDBLOCK
),程序可以处理这个情况而不被阻塞。 提高程序的响应速度:
非阻塞 I/O 使得程序可以在无法立即完成 I/O 操作时继续执行其他任务,这对于高性能服务器或需要实时响应的系统特别重要。当你使用epoll
等异步 I/O 机制时,非阻塞 I/O 能够很好地与事件驱动模型结合,使得程序可以在数据就绪时立即处理,而不需要因为 I/O 操作而挂起。 配合 epoll
的事件驱动模型:
epoll
本身是一个异步事件驱动机制,常与非阻塞 I/O 一起使用。通过设置写端为非阻塞,程序可以在检测到 epoll
事件时决定如何处理写入操作,而不是在管道满的情况下阻塞写入操作。 将 m_pipefd[1]
设置为非阻塞可以避免程序在管道缓冲区满的情况下挂起,使程序在高负载或复杂并发情况下能够继续处理其他任务,提升响应速度和整体性能。这种设计与 epoll
的异步事件驱动模型相结合,可以高效地处理 I/O 操作。
结合一个场景来理解为什么需要创建双向通信的套接字
假设你有一个餐厅,餐厅里有一个服务员负责处理顾客的点单。这时候有两种情况会让服务员忙起来:
顾客点餐:这类似于服务器处理网络请求。厨房准备好了菜:需要通知服务员去上菜,这就像服务器内部需要处理的事情,比如定时任务或信号。问题
如果厨房想通知服务员“菜好了”,但服务员正在忙着处理顾客点餐,这时候服务员可能没法立刻去处理厨房的通知。
解决方案:创建一个内部通信通道
为了让服务员在处理顾客点餐的同时也能及时收到厨房的通知,你在餐厅内部装了一个“内部电话”(类似于双向套接字)。当厨房准备好菜后,它就会通过这个内部电话告诉服务员。服务员可以一边处理顾客的点单,一边通过电话听到厨房的通知。
代码中的实现
这个“内部电话”就是通过 socketpair
创建的双向套接字对。
epoll
监控这个套接字对。如果有一些内部事件发生,比如需要处理的定时任务,服务器就可以通过写入这个套接字对来通知自己:“嘿,有事情要处理!”。服务器会通过 epoll
监听这个通知,并做出相应的处理。 创建这个内部通信的套接字对,就像给餐厅的服务员装了一个内部电话,确保他在忙着处理顾客时,也能及时接收到厨房的通知,避免漏掉任何重要的事情。这样,服务器既能处理外部网络请求,也能处理自己的内部任务。
1.7.8 初始化信号的处理
//忽略SIGPIPE:不会因为向关闭的连接写数据而崩溃。utils.addsig(SIGPIPE, SIG_IGN);//处理SIGALRM:可以在定时器触发时执行特定任务,比如检查超时。utils.addsig(SIGALRM, utils.sig_handler, false);//处理SIGTERM:在终止程序前可以先完成一些必要的清理工作。utils.addsig(SIGTERM, utils.sig_handler, false);
utils.addsig(SIGPIPE, SIG_IGN);
:忽略 SIGPIPE
信号,防止在向一个已关闭的连接写数据时引发进程终止。utils.addsig(SIGALRM, utils.sig_handler, false);
:添加定时器信号 SIGALRM
的处理函数 sig_handler
。utils.addsig(SIGTERM, utils.sig_handler, false);
:添加终止信号 SIGTERM
的处理函数 sig_handler
。 1.7.9 启动定时器
alarm(TIMESLOT);
设置定时器,每隔 TIMESLOT
秒发送一次 SIGALRM
信号,用于处理定时任务,比如检查超时连接。 1.7.10 初始化工具类中的全局变量
Utils::u_pipefd = m_pipefd;Utils::u_epollfd = m_epollfd;
将管道文件描述符和 epoll
文件描述符传递给工具类 Utils
,使工具类可以访问并处理这些描述符。 1.7.11 eventListen函数总结
这个 eventListen
函数完成了服务器在启动时所需的各项初始化工作,包括创建监听套接字、设置套接字选项、初始化 epoll
、设置信号处理的管道和定时器等。最终,服务器准备好监听来自客户端的连接,并可以处理各种事件和信号。
1.8 定时器相关函数
void WebServer::timer(int connfd, struct sockaddr_in client_address){ users[connfd].init(connfd, client_address, m_root, m_CONNTrigmode, m_close_log, m_user, m_passWord, m_databaseName); // 初始化client_data数据 users_timer[connfd].address = client_address; users_timer[connfd].sockfd = connfd; util_timer *timer = new util_timer; timer->user_data = &users_timer[connfd]; timer->cb_func = cb_func; time_t cur = time(NULL); timer->expire = cur + 3 * TIMESLOT; users_timer[connfd].timer = timer; utils.m_timer_lst.add_timer(timer);}
定时器相关函数WebServer::timer
:为新连接初始化定时器。 初始化http_conn
对象。初始化client_data
对象,并创建新的util_timer
定时器。将定时器加入定时器链表m_timer_lst
。 1.9 调整定时器
void WebServer::adjust_timer(util_timer *timer){ time_t cur = time(NULL); timer->expire = cur + 3 * TIMESLOT; utils.m_timer_lst.adjust_timer(timer); LOG_INFO("%s", "adjust timer once");}
调整定时器函数WebServer::adjust_timer
:调整定时器的过期时间并重新加入定时器链表。 1.10 定时器回调函数
void WebServer::deal_timer(util_timer *timer, int sockfd){ timer->cb_func(&users_timer[sockfd]); if (timer) { utils.m_timer_lst.del_timer(timer); } LOG_INFO("close fd %d", users_timer[sockfd].sockfd);}
定时器回调函数WebServer::deal_timer
:处理定时器过期事件,关闭连接并删除定时器。 1.11 处理客户端连接
bool WebServer::dealclinetdata(){ struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); if (0 == m_LISTENTrigmode) { int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength); if (connfd < 0) { LOG_ERROR("%s:errno is:%d", "accept error", errno); return false; } if (http_conn::m_user_count >= MAX_FD) { utils.show_error(connfd, "Internal server busy"); LOG_ERROR("%s", "Internal server busy"); return false; } timer(connfd, client_address); } else { while (1) { int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength); if (connfd < 0) { LOG_ERROR("%s:errno is:%d", "accept error", errno); break; } if (http_conn::m_user_count >= MAX_FD) { utils.show_error(connfd, "Internal server busy"); LOG_ERROR("%s", "Internal server busy"); break; } timer(connfd, client_address); } return false; } return true;}
dealclinetdata函数代码逐行解释如下:
bool WebServer::dealclinetdata(){ struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address);
struct sockaddr_in client_address;
:定义一个sockaddr_in
结构体变量client_address
,用于存储客户端的地址信息(IP地址和端口)。socklen_t client_addrlength = sizeof(client_address);
:定义client_addrlength
变量,并将其初始化为client_address
的大小。这是为了在accept
函数调用时传递客户端地址结构体的大小。 if (0 == m_LISTENTrigmode) { int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength);
if (0 == m_LISTENTrigmode)
:判断服务器监听模式是否为LT模式(Level Triggered,电平触发)。int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength);
:调用accept
函数从监听套接字m_listenfd
接受一个新的客户端连接。accept
返回一个新的文件描述符connfd
,用于与该客户端通信。如果accept
失败,connfd
将返回-1
。 if (connfd < 0) { LOG_ERROR("%s:errno is:%d", "accept error", errno); return false; }
if (connfd < 0)
:判断accept
是否失败。accept
失败时connfd
会返回-1
。LOG_ERROR("%s:errno is:%d", "accept error", errno);
:记录一个错误日志,说明accept
调用失败,同时输出errno
错误代码,帮助排查问题。return false;
:如果accept
失败,返回false
,表示处理客户端连接失败。 if (http_conn::m_user_count >= MAX_FD) { utils.show_error(connfd, "Internal server busy"); LOG_ERROR("%s", "Internal server busy"); return false; }
if (http_conn::m_user_count >= MAX_FD)
:检查当前活跃的客户端连接数是否已经达到最大值MAX_FD
。m_user_count
是一个静态变量,记录当前的连接数。utils.show_error(connfd, "Internal server busy");
:调用show_error
函数,向客户端发送一个错误信息,提示服务器忙碌,无法处理新的连接。LOG_ERROR("%s", "Internal server busy");
:记录一个错误日志,说明服务器当前无法处理更多的连接。return false;
:返回false
,表示处理客户端连接失败。 timer(connfd, client_address); }
timer(connfd, client_address);
:调用timer
函数,为这个新连接设置一个定时器(通常用于超时处理)。定时器的作用是防止客户端长时间占用资源但不发送数据,从而影响服务器性能。 else { while (1) { int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength); if (connfd < 0) { LOG_ERROR("%s:errno is:%d", "accept error", errno); break; } if (http_conn::m_user_count >= MAX_FD) { utils.show_error(connfd, "Internal server busy"); LOG_ERROR("%s", "Internal server busy"); break; } timer(connfd, client_address); } return false; }
else
:如果服务器的监听模式为ET(Edge Triggered,边缘触发)模式,则执行这个分支。while (1)
:进入一个无限循环,不断调用accept
函数,尝试接受所有可能的客户端连接。int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength);
:与前面相同,接受一个新的客户端连接。if (connfd < 0)
:与前面相同,判断accept
是否失败。如果失败,记录错误并跳出循环。if (http_conn::m_user_count >= MAX_FD)
:与前面相同,检查是否超过最大连接数。如果是,显示错误信息并跳出循环。timer(connfd, client_address);
:与前面相同,为这个新连接设置定时器。return false;
:跳出循环后,返回false
,表示处理客户端连接失败。 return true;}
return true;
:如果在LT模式下(即if
分支中),没有发生错误,则返回true
,表示成功处理客户端连接。 dealclinetdata函数码的主要功能是处理新的客户端连接,根据不同的触发模式(LT或ET)来决定如何接受连接:
在LT模式下:一次只处理一个连接。在ET模式下:使用循环处理所有可能的连接。同时,代码还检查服务器是否达到了最大连接数,并在必要时设置定时器来管理连接的生命周期。
1.12 信号处理函数
bool WebServer::dealwithsignal(bool &timeout, bool &stop_server){ int ret = 0; int sig; char signals[1024]; ret = recv(m_pipefd[0], signals, sizeof(signals), 0); if (ret == -1) { return false; } else if (ret == 0) { return false; } else { for (int i = 0; i < ret; ++i) { switch (signals[i]) { case SIGALRM: { timeout = true; break; } case SIGTERM: { stop_server = true; break; } } } } return true;}
信号处理函数WebServer::dealwithsignal
:处理信号。 接收信号并根据信号类型设置标志位。处理SIGALRM
信号,设置timeout
标志位。SIGALRM信号是由定时器(通过调用alarm()或setitimer()函数设置)触发的,用来通知程序一个时间段已经过去,通常用于超时管理。处理SIGTERM
信号,设置stop_server
标志位。SIGTERM信号是用于请求进程终止的信号。当你在命令行使用kill命令终止某个进程时,默认情况下,操作系统会发送SIGTERM信号给该进程,通常用于优雅关闭服务器。 1.13 处理读事件
void WebServer::dealwithread(int sockfd){ util_timer *timer = users_timer[sockfd].timer; // reactor if (1 == m_actormodel) { if (timer) { adjust_timer(timer); } m_pool->append(users + sockfd, 0); while (true) { if (1 == users[sockfd].improv) { if (1 == users[sockfd].timer_flag) { deal_timer(timer, sockfd); users[sockfd].timer_flag = 0; } users[sockfd].improv = 0; break; } } } else { if (users[sockfd].read_once()) { LOG_INFO("deal with the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr)); // 若监测到读事件,将该事件放入请求队列 m_pool->append_p(users + sockfd); if (timer) { adjust_timer(timer); } } else { deal_timer(timer, sockfd); } }}
这个WebServer::dealwithread
函数用于处理Web服务器中客户端的读事件。根据设置的不同,该函数支持两种不同的并发处理模型:Reactor模型和Proactor模型。
以下为详细分析:
void WebServer::dealwithread(int sockfd){ util_timer *timer = users_timer[sockfd].timer;
dealwithread
函数接受一个参数 sockfd
,这是客户端连接的套接字文件描述符。从一个数组 users_timer
中获取与该套接字相关联的定时器 timer
。定时器通常用于跟踪每个连接的超时时间。 // reactor if (1 == m_actormodel) { if (timer) { adjust_timer(timer); } m_pool->append(users + sockfd, 0);
if (1 == m_actormodel)
判断是否使用Reactor模型。如果m_actormodel
为1,则表示使用Reactor模型。if (timer) { adjust_timer(timer); }
检查定时器是否存在,如果存在则调整定时器。通常这是为了重置定时器的超时时间,确保连接在规定时间内没有任何活动时不会被关闭。m_pool->append(users + sockfd, 0);
将与套接字相关的用户数据指针 users + sockfd
及标志 0
(可能表示读事件)加入线程池队列,等待线程池中的工作线程来处理该事件。 while (true) { if (1 == users[sockfd].improv) { if (1 == users[sockfd].timer_flag) { deal_timer(timer, sockfd); users[sockfd].timer_flag = 0; } users[sockfd].improv = 0; break; } } }
一个无限循环,等待 users[sockfd].improv
标志变为1。这通常表示某个事件已经被处理完毕或完成了某个状态的转换。如果 improv
标志为1,并且 timer_flag
也为1,则处理定时器事件(如关闭连接)。重置 timer_flag
和 improv
标志后,跳出循环。这段代码可能是为了确保在处理事件时没有定时器事件影响。 else { if (users[sockfd].read_once()) { LOG_INFO("deal with the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
如果 m_actormodel
不为1(即使用Proactor模型),则调用 users[sockfd].read_once()
来尝试读取数据。如果读取成功: 记录日志,输出客户端IP地址。 // 若监测到读事件,将该事件放入请求队列 m_pool->append_p(users + sockfd);
将读取的数据处理任务加入线程池(不同于Reactor模型,这里可能是直接处理已经读取的数据)。 if (timer) { adjust_timer(timer); } } else { deal_timer(timer, sockfd); } }}
如果读取成功,并且存在定时器,调整定时器。如果读取失败,则处理定时器事件(如关闭连接)。 1.13.2 Reactor模型与Proactor模型
Reactor模型:
概念:Reactor模型是一种事件驱动的模式,应用程序(Web服务器)注册回调函数到某个事件处理器。当事件发生时(如有数据可读),事件处理器负责调用相应的回调函数处理事件。简单来说,Reactor模型只负责监听事件,并将事件交给工作线程去处理。实际过程:在上述代码中,Reactor模型是通过将事件放入线程池 (m_pool->append(...)
) 来处理的,处理的具体工作在工作线程中执行。特点:事件的处理由多个线程来完成,通常用于高并发的网络编程。应用程序被动等待事件发生,然后响应这些事件。 Proactor模型:
概念:Proactor模型则是另一种模式,应用程序提前将需要执行的操作提交给操作系统(如读取数据)。操作系统完成操作后,通过事件通知应用程序,应用程序再来处理结果。即:操作系统负责事件的处理,应用程序负责后续的操作处理。实际过程:在上述代码中,Proactor模型直接调用users[sockfd].read_once()
来读取数据,然后将处理后的任务加入线程池 (m_pool->append_p(...)
)。Proactor模型中数据的读取和写入由操作系统负责,应用程序只在I/O操作完成后处理结果。特点:这种模型将更多的处理任务交给操作系统,从而可能减少应用程序层面的处理负担,提高效率。 1.13.3 不同模型的区别和目的
区别:
Reactor模型:应用程序主动等待事件发生并处理事件。I/O操作的实际执行和处理是分开的,事件到达时交给应用程序去处理。Proactor模型:应用程序事先提交操作请求,当操作系统完成后再通知应用程序。I/O操作的执行是由操作系统完成的,应用程序只处理结果。目的:
两种模型的目的是相同的:高效地处理高并发的I/O请求。通过异步处理避免阻塞,提高系统的响应速度和吞吐量。Reactor 适合需要更细粒度控制的场景,开发者需要控制事件的处理过程。Proactor 更适合将I/O操作委托给操作系统管理的场景,能够更高效地利用系统资源。1.14 处理写事件
void WebServer::dealwithwrite(int sockfd){ util_timer *timer = users_timer[sockfd].timer; // reactor if (1 == m_actormodel) { if (timer) { adjust_timer(timer); } m_pool->append(users + sockfd, 1); while (true) { if (1 == users[sockfd].improv) { if (1 == users[sockfd].timer_flag) { deal_timer(timer, sockfd); users[sockfd].timer_flag = 0; } users[sockfd].improv = 0; break; } } } else { if (users[sockfd].write()) { LOG_INFO("send data to the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr)); if (timer) { adjust_timer(timer); } } else { deal_timer(timer, sockfd); } }}
处理写事件函数WebServer::dealwithwrite
:处理客户端的写事件。 如果是Reactor模型,则调整定时器并将事件加入线程池处理。如果是Proactor模型,则直接写入数据,并将事件加入线程池处理。 1.15 主循环函数
void WebServer::eventLoop(){ bool timeout = false; bool stop_server = false; while (!stop_server) { int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1); if (number < 0 && errno != EINTR) { LOG_ERROR("%s", "epoll failure"); break; } for (int i = 0; i < number; i++) { int sockfd = events[i].data.fd; // 处理新到的客户连接 if (sockfd == m_listenfd) { bool flag = dealclinetdata(); if (false == flag) continue; } // 处理信号 else if ((sockfd == m_pipefd[0]) && (events[i].events & EPOLLIN)) { bool flag = dealwithsignal(timeout, stop_server); if (false == flag) continue; } // 处理客户连接上接收到的数据 else if (events[i].events & EPOLLIN) { dealwithread(sockfd); } else if (events[i].events & EPOLLOUT) { dealwithwrite(sockfd); } } if (timeout) { utils.timer_handler(); LOG_INFO("%s", "timer tick"); timeout = false; } }}
1.15.1 主循环函数逐行解释
void WebServer::eventLoop(){ bool timeout = false; bool stop_server = false;
这段代码定义了两个布尔变量: timeout
:用于标记是否发生了超时事件(通常与定时器相关)。stop_server
:用于标记是否应该停止服务器的主循环,控制服务器的运行状态。 while (!stop_server) { int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
while (!stop_server)
:进入一个无限循环,只要stop_server
为false
,服务器将一直运行。epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1)
: 这是epoll
机制的核心函数,用于等待事件的发生。m_epollfd
是epoll
实例的文件描述符。events
是用于存储发生的事件的数组。MAX_EVENT_NUMBER
是数组的最大容量。-1
表示这个调用会无限等待,直到有事件发生。 if (number < 0 && errno != EINTR) { LOG_ERROR("%s", "epoll failure"); break; }
number
表示epoll_wait
返回的事件数量。如果number < 0
且错误不是由于信号中断(EINTR
),那么说明epoll_wait
出错了,记录错误日志并退出循环。 for (int i = 0; i < number; i++) { int sockfd = events[i].data.fd;
for
循环遍历所有返回的事件。sockfd
是触发事件的文件描述符。通过 events[i].data.fd
获取触发该事件的对应文件描述符(例如,客户端的套接字或监听套接字)。 // 处理新到的客户连接 if (sockfd == m_listenfd) { bool flag = dealclinetdata(); if (false == flag) continue; }
如果sockfd
等于服务器的监听文件描述符m_listenfd
,说明有新的客户端连接到来。调用dealclinetdata()
函数处理新连接,如果处理失败(flag == false
),则跳过后续代码继续处理下一个事件。 // 处理信号 else if ((sockfd == m_pipefd[0]) && (events[i].events & EPOLLIN)) { bool flag = dealwithsignal(timeout, stop_server); if (false == flag) continue; }
如果sockfd
等于m_pipefd[0]
且事件类型为EPOLLIN
(表示可读),说明有信号事件需要处理。信号通常通过管道传递,m_pipefd[0]
是管道的读取端。调用 dealwithsignal
函数来处理信号,处理信号可能会影响timeout
和stop_server
的状态。如果处理失败,跳过后续代码继续处理下一个事件。 // 处理客户连接上接收到的数据 else if (events[i].events & EPOLLIN) { dealwithread(sockfd); }
如果events[i].events
包含EPOLLIN
标志,说明对应的套接字上有数据可读(来自客户端的请求数据)。调用 dealwithread(sockfd)
处理这个读事件,即读取客户端发送的数据。 else if (events[i].events & EPOLLOUT) { dealwithwrite(sockfd); }
如果events[i].events
包含EPOLLOUT
标志,说明对应的套接字可以写数据(可以向客户端发送响应)。调用 dealwithwrite(sockfd)
处理这个写事件,即将服务器的响应数据发送给客户端。 if (timeout) { utils.timer_handler(); LOG_INFO("%s", "timer tick"); timeout = false; } }}
if (timeout)
:如果timeout
标志被设置为true
,则表示定时器已经超时,需要处理超时事件。调用utils.timer_handler()
来处理超时事件,例如关闭长时间未响应的连接。处理完定时器事件后,将timeout
标志重置为false
,继续循环等待新的事件。 1.15.2 eventLoop总结
这个eventLoop
函数是Web服务器的核心,它使用epoll
机制来处理多种类型的事件,包括新客户端连接、信号处理、客户端数据读写等。通过这种事件驱动的模型,服务器可以高效地处理大量并发连接,而不会阻塞在某个特定的操作上,保证了服务器的高性能和稳定性。 这个主循环的设计允许服务器在接收到不同的事件时,采取相应的处理措施,从而确保服务器能够在高负载下稳定运行。这是编写高性能网络服务器的关键部分。
2. lst_timer.cpp
由于非活跃连接占用了连接资源,严重影响服务器的性能,通过实现一个服务器定时器,处理这种非活跃连接,释放连接资源。利用alarm函数周期性地触发SIGALRM信号,该信号的信号处理函数利用管道通知主循环执行定时器链表上的定时任务.
统一事件源基于升序链表的定时器处理非活动连接完整注释版:
#include "lst_timer.h"#include "../http/http_conn.h"//构造链表sort_timer_lst::sort_timer_lst(){ head = NULL; tail = NULL;}//释放链表sort_timer_lst::~sort_timer_lst(){ util_timer *tmp = head; while (tmp) { head = tmp->next; delete tmp; tmp = head; }}//添加新的定时器void sort_timer_lst::add_timer(util_timer *timer){ if (!timer) { return; } //链表为空则插入链表表头 if (!head) { head = tail = timer; return; } //新插入的定时器过期时间小于表头则也插入表头 if (timer->expire < head->expire) { timer->next = head; head->prev = timer; head = timer; return; } add_timer(timer, head);}//当到期时间变化 调整定时器的位置void sort_timer_lst::adjust_timer(util_timer *timer){ if (!timer) { return; } util_timer *tmp = timer->next; if (!tmp || (timer->expire < tmp->expire)) { return; } if (timer == head) { head = head->next; head->prev = NULL; timer->next = NULL; add_timer(timer, head); } else { timer->prev->next = timer->next; timer->next->prev = timer->prev; add_timer(timer, timer->next); }}//从链表中删除定时器void sort_timer_lst::del_timer(util_timer *timer){ if (!timer) { return; } //若删除的唯一一个定时器 则构造新链表 if ((timer == head) && (timer == tail)) { delete timer; head = NULL; tail = NULL; return; } //删除的是头 这里都是链表基础知识不赘述 if (timer == head) { head = head->next; head->prev = NULL; delete timer; return; } //删除的是尾 if (timer == tail) { tail = tail->prev; tail->next = NULL; delete timer; return; } timer->prev->next = timer->next; timer->next->prev = timer->prev; delete timer;}//定期检查到期定时器void sort_timer_lst::tick(){ if (!head) { return; } //获取当前时间 time_t cur = time(NULL); util_timer *tmp = head; while (tmp) { //未到期 if (cur < tmp->expire) { break; } //否则回调 tmp->cb_func(tmp->user_data); head = tmp->next; if (head) { head->prev = NULL; } //在链表中删除到期的定时器 delete tmp; tmp = head; }}//将新的定时器按升序插入合适位置void sort_timer_lst::add_timer(util_timer *timer, util_timer *lst_head){ util_timer *prev = lst_head; util_timer *tmp = prev->next; while (tmp) { if (timer->expire < tmp->expire) { prev->next = timer; timer->next = tmp; tmp->prev = timer; timer->prev = prev; break; } prev = tmp; tmp = tmp->next; } if (!tmp) { prev->next = timer; timer->prev = prev; timer->next = NULL; tail = timer; }}//初始化定时器间隔void Utils::init(int timeslot){ m_TIMESLOT = timeslot;}//对文件描述符设置非阻塞 主要用于设置管道写端int Utils::setnonblocking(int fd){ int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option;}//将内核事件表注册读事件,监听模式,选择开启EPOLLONESHOTvoid Utils::addfd(int epollfd, int fd, bool one_shot, int TRIGMode){ epoll_event event; event.data.fd = fd; if (1 == TRIGMode) event.events = EPOLLIN | EPOLLET | EPOLLRDHUP; else event.events = EPOLLIN | EPOLLRDHUP; //开启epoll的oneshot if (one_shot) event.events |= EPOLLONESHOT; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd);}// 信号处理函数,用于处理系统信号(如SIGTERM、SIGINT等)void Utils::sig_handler(int sig){ // 为保证函数的可重入性,保存当前的errno值(errno表示最近一次系统调用的错误代码) // 信号处理函数可能会改变errno的值,所以需要保存原来的errno,防止影响主程序的逻辑 int save_errno = errno; // 将接收到的信号值保存到msg变量中 int msg = sig; // 使用send函数将信号值通过管道发送给主程序 // u_pipefd[1]是管道的写端,(char *)&msg是要发送的数据的地址,1是发送的数据长度(一个字节),0表示没有特殊的发送选项 // 这种方式可以避免信号处理函数直接在其中执行复杂逻辑,而是通知主程序来处理信号,避免信号安全性问题 send(u_pipefd[1], (char *)&msg, 1, 0); // 恢复原来的errno值,保证信号处理函数结束后,程序的errno值不变 errno = save_errno;}// 设置信号处理函数// sig: 要处理的信号编号,例如 SIGINT、SIGTERM 等// handler: 信号处理函数的指针,指定信号触发时要执行的函数// restart: 是否自动重启被信号中断的系统调用,若为 true 则设置 SA_RESTART 标志void Utils::addsig(int sig, void(handler)(int), bool restart){ // 定义一个 sigaction 结构体,用于描述信号处理的行为 struct sigaction sa; // 使用 memset 将 sa 清零,以确保结构体中的所有字段被初始化为 0 memset(&sa, '\0', sizeof(sa)); // 设置信号处理函数,将 sa_handler 指向传入的处理函数 handler sa.sa_handler = handler; // 如果 restart 为 true,则设置 SA_RESTART 标志 // SA_RESTART 标志表示当信号中断某些系统调用时,系统会自动重新启动被中断的系统调用 if (restart) sa.sa_flags |= SA_RESTART; // 使用 sigfillset 函数将 sa_mask 设置为阻塞所有信号 // 这意味着在处理该信号时,其他所有的信号都将被阻塞,防止信号处理函数被其他信号中断 sigfillset(&sa.sa_mask); // 使用 sigaction 系统调用为指定的信号(sig)设置处理函数 // 第一个参数是信号编号,第二个参数是指向 sigaction 结构体的指针,表示要设置的新信号处理方式 // 第三个参数为 NULL,表示不关心旧的信号处理方式 // assert 用于检查 sigaction 是否成功执行,若返回值为 -1 表示设置失败 assert(sigaction(sig, &sa, NULL) != -1);}// 定时处理任务,重新设置定时器以不断触发 SIGALRM 信号void Utils::timer_handler(){ // 调用定时器链表的 tick() 方法,处理到期的定时器任务 // m_timer_lst 是一个定时器链表或管理器,tick() 方法通常用于遍历所有定时器, // 找到并执行已经到期的任务,如关闭超时的连接、释放资源等 m_timer_lst.tick(); // 重新设置定时器,以使 SIGALRM 信号在 m_TIMESLOT 秒后再次触发 // alarm() 函数用于设定一个闹钟时间,m_TIMESLOT 是预设的时间间隔(以秒为单位) // 在 m_TIMESLOT 秒后,将触发 SIGALRM 信号,这个信号通常会被定时处理函数捕获并执行相应操作 alarm(m_TIMESLOT);}// 定时处理任务,重新设置定时器以不断触发 SIGALRM 信号void Utils::timer_handler(){ // 调用定时器链表的 tick() 方法,处理到期的定时器任务 // 找到并执行已经到期的任务,如关闭超时的连接、释放资源等 m_timer_lst.tick(); // 重新设置定时器,以使 SIGALRM 信号在 m_TIMESLOT 秒后再次触发 // alarm() 函数用于设定一个闹钟时间,m_TIMESLOT 是预设的时间间隔(以秒为单位) // 在 m_TIMESLOT 秒后,将触发 SIGALRM 信号,这个信号通常会被定时处理函数捕获并执行相应操作 alarm(m_TIMESLOT);}int *Utils::u_pipefd = 0;int Utils::u_epollfd = 0;class Utils;// 回调函数,用于处理客户端连接的超时或关闭// user_data: 指向与客户端相关的数据信息结构体void cb_func(client_data *user_data){ // 从 epoll 实例中删除用户数据对应的文件描述符 sockfd // Utils::u_epollfd 是一个全局的 epoll 文件描述符,代表 epoll 实例 // EPOLL_CTL_DEL 是操作类型,表示删除一个 epoll 监听的文件描述符 // user_data->sockfd 是要删除的客户端 socket 文件描述符 epoll_ctl(Utils::u_epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0); // 断言检查 user_data 是否为非空指针,以确保其有效性 // 如果 user_data 是 NULL,程序将在此处终止运行 assert(user_data); // 关闭客户端的 socket 连接,释放资源 close(user_data->sockfd); // 减少当前活跃连接的用户计数 // m_user_count 是静态变量,记录当前活跃的客户端连接数 http_conn::m_user_count--;}
代码详细解析版:
#include "lst_timer.h"#include "../http/http_conn.h"
引入定时器链表的头文件 lst_timer.h
和 HTTP 连接管理的头文件 http_conn.h
,为后面的实现提供定时器和HTTP连接相关的类和函数声明。 sort_timer_lst
类
sort_timer_lst::sort_timer_lst(){ head = NULL; tail = NULL;}
构造函数:初始化定时器链表,head
和 tail
都设置为 NULL
,表示链表为空。 sort_timer_lst::~sort_timer_lst(){ util_timer *tmp = head; while (tmp) { head = tmp->next; delete tmp; tmp = head; }}
析构函数:释放链表中所有的定时器。逐个遍历链表,删除每个定时器,避免内存泄漏。 void sort_timer_lst::add_timer(util_timer *timer){ if (!timer) { return; } if (!head) { head = tail = timer; return; } if (timer->expire < head->expire) { timer->next = head; head->prev = timer; head = timer; return; } add_timer(timer, head);}
add_timer()
函数: 检查 timer
是否为 NULL
,如果是则直接返回。如果链表为空,将 timer
设置为链表的 head
和 tail
。如果 timer
的到期时间比当前头部的到期时间早,则将 timer
插入到链表头部。否则调用 add_timer(timer, head)
,将 timer
插入到链表中的合适位置。 void sort_timer_lst::adjust_timer(util_timer *timer){ if (!timer) { return; } util_timer *tmp = timer->next; if (!tmp || (timer->expire < tmp->expire)) { return; } if (timer == head) { head = head->next; head->prev = NULL; timer->next = NULL; add_timer(timer, head); } else { timer->prev->next = timer->next; timer->next->prev = timer->prev; add_timer(timer, timer->next); }}
adjust_timer()
函数: 调整定时器 timer
在链表中的位置。如果 timer
的下一个定时器到期时间较晚,则不需要调整。如果 timer
是链表头部,先将 head
移动到下一个定时器,并将 timer
从链表中移除,再将其插入到链表的合适位置。如果 timer
不在头部,将其前后定时器链接起来,再将 timer
插入到合适位置。 void sort_timer_lst::del_timer(util_timer *timer){ if (!timer) { return; } if ((timer == head) && (timer == tail)) { delete timer; head = NULL; tail = NULL; return; } if (timer == head) { head = head->next; head->prev = NULL; delete timer; return; } if (timer == tail) { tail = tail->prev; tail->next = NULL; delete timer; return; } timer->prev->next = timer->next; timer->next->prev = timer->prev; delete timer;}
del_timer()
函数: 删除指定的定时器 timer
。如果 timer
是链表中唯一的定时器,删除后将 head
和 tail
置为 NULL
。如果 timer
是链表的头部或尾部,分别更新链表的 head
或 tail
。如果 timer
在中间,将前后定时器链接起来,删除 timer
。 void sort_timer_lst::tick(){ if (!head) { return; } time_t cur = time(NULL); util_timer *tmp = head; while (tmp) { if (cur < tmp->expire) { break; } tmp->cb_func(tmp->user_data); head = tmp->next; if (head) { head->prev = NULL; } delete tmp; tmp = head; }}
tick()
函数: 定时器链表的核心逻辑,处理所有已到期的定时器。获取当前时间 cur
,从链表头部开始遍历定时器。如果某个定时器的到期时间未到,退出循环。如果定时器到期,调用其回调函数 cb_func
,执行定时任务。从链表中移除已到期的定时器并删除它,继续检查下一个定时器。 void sort_timer_lst::add_timer(util_timer *timer, util_timer *lst_head){ util_timer *prev = lst_head; util_timer *tmp = prev->next; while (tmp) { if (timer->expire < tmp->expire) { prev->next = timer; timer->next = tmp; tmp->prev = timer; timer->prev = prev; break; } prev = tmp; tmp = tmp->next; } if (!tmp) { prev->next = timer; timer->prev = prev; timer->next = NULL; tail = timer; }}
add_timer(util_timer *timer, util_timer *lst_head)
辅助函数: 用于将定时器 timer
插入到链表中的合适位置。遍历链表,找到第一个到期时间大于 timer
的定时器,将 timer
插入它之前。如果没有找到合适位置,将 timer
插入到链表尾部。 工具类 Utils
void Utils::init(int timeslot){ m_TIMESLOT = timeslot;}
初始化定时器间隔 m_TIMESLOT
,定时器将每隔 timeslot
秒触发。 int Utils::setnonblocking(int fd){ int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option;}
设置文件描述符 fd
为非阻塞模式。 调用 fcntl()
获取文件描述符的当前状态标志。将文件描述符状态标志添加 O_NONBLOCK
(非阻塞)。设置新的状态标志,并返回旧的状态标志。 void Utils::addfd(int epollfd, int fd, bool one_shot, int TRIGMode){ epoll_event event; event.data.fd = fd; if (1 == TRIGMode) event.events = EPOLLIN | EPOLLET | EPOLLRDHUP; else event.events = EPOLLIN | EPOLLRDHUP; if (one_shot) event.events |= EPOLLONESHOT; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd);}
向 epoll 实例注册文件描述符 fd
,并设置相应的事件类型: EPOLLIN
:表示文件描述符可读。EPOLLET
:启用边沿触发模式(仅当 TRIGMode 为 1 时)。EPOLLRDHUP
:检测对端连接关闭事件。EPOLLONESHOT
:确保文件描述符只被一个线程处理(如果 one_shot
为真)。注册完毕后将文件描述符设置为非阻塞模式。 Tips:
EPOLLONESHOT
是 epoll
事件机制中的一个选项。启用 EPOLLONESHOT
后,当某个文件描述符上的事件触发一次后,它会从内核的 epoll
事件表中 临时失效,不会再次触发,直到用户重新注册该文件描述符的事件。
作用:
EPOLLONESHOT
常用于多线程环境,尤其是在多个线程同时处理同一个文件描述符的场景中。它能有效避免多个线程同时处理同一事件,从而防止竞态条件(race condition)。
工作机制:
当文件描述符上的事件(如可读、可写)触发时,epoll_wait
会返回事件。如果该文件描述符被设置了 EPOLLONESHOT
标志,事件触发后,epoll
会将该文件描述符临时移出 epoll
监视列表。之后,除非用户手动重新设置该文件描述符的监听事件(通过 epoll_ctl
重新注册),否则即使文件描述符再次有事件发生,epoll
也不会通知。 应用场景:
在多线程环境下,如果多个线程同时处理一个文件描述符的可读或可写事件,可能会发生资源竞争。例如,如果有两个线程都在处理同一个 fd
的读事件,而其中一个线程已经读完数据,另一个线程会因为没有数据可读而发生错误。使用 EPOLLONESHOT
可以确保事件触发后只由一个线程处理,并且在处理完成之前不会再有其他线程接收到该事件。
示例:
假设有一个套接字 fd
,开启了 EPOLLONESHOT
,并且该套接字变得可读:
epoll_wait
返回该事件,某个线程开始处理这个套接字的数据。在处理完数据后,线程需要再次通过 epoll_ctl
重新将该套接字的读事件注册到 epoll
,否则这个套接字的后续可读事件不会再被通知。 总结:
EPOLLONESHOT
确保了同一个文件描述符的某个事件只会被触发并处理一次,防止多个线程重复处理同一事件,避免了竞态条件或数据丢失的情况。
void Utils::sig_handler(int sig){ int save_errno = errno; int msg = sig; send(u_pipefd[1], (char *)&msg, 1, 0); errno = save_errno;}
信号处理函数 sig_handler()
: 保存当前的 errno
,以避免信号处理过程中修改它。将捕获的信号通过管道发送给主循环,用于通知主循环有新的信号发生。恢复 errno
。 void Utils::addsig(int sig, void(handler)(int), bool restart){ struct sigaction sa; memset(&sa, '\0', sizeof(sa)); sa.sa_handler = handler; if (restart) sa.sa_flags |= SA_RESTART; sigfillset(&sa.sa_mask); assert(sigaction(sig, &sa, NULL) != -1);}
设置信号 sig
的处理函数 handler
: 使用 sigaction()
来定义信号处理行为。如果 restart
为真,设置 SA_RESTART
,表示系统调用在被信号中断后自动重启。使用 sigfillset()
屏蔽所有信号在处理当前信号时的干扰。 3. thread_pool.cpp
使用一个工作队列完全解除了主线程和工作线程的耦合关系:主线程往工作队列中插入任务,工作线程通过竞争来取得任务并执行它。
同步I/O模拟proactor模式半同步/半反应堆线程池3.1 解释一下上述文字
3.1.1 线程池和主线程/工作线程的关系
在多线程编程中,主线程(一般负责接收外部请求、处理输入/输出)和工作线程(执行具体的任务)之间的关系决定了它们如何相互交互与协作。如果主线程和工作线程直接相互依赖,处理过程的效率可能会受到影响。下面解释这段话中的关键点:
3.1.2 主线程和工作线程的耦合关系
耦合:简单来说,耦合指的是两个模块之间的依赖程度。如果两个模块(比如主线程和工作线程)紧密相连,它们的修改会互相影响,系统的扩展性和维护性都会降低。耦合关系在传统模型中的体现: 如果没有工作队列的概念,主线程可能需要直接控制每个工作线程,并等待这些线程完成任务。这样,主线程和工作线程之间的依赖性就很强,即有耦合关系。在这种情况下,如果主线程需要同时处理多个任务,它就必须管理多个工作线程,跟踪它们的状态,这就导致了复杂性和依赖性增加。3.1.3 解除耦合关系的意义
使用工作队列可以将主线程和工作线程之间的依赖关系削弱,从而减少耦合性。这里的工作队列就像一个中介,主线程不再需要直接管理工作线程,而是只负责往队列里放任务,工作线程通过竞争机制从队列中取出任务并处理。通过这种模式,主线程和工作线程不再直接依赖对方的具体实现,这就称为解除了它们的耦合关系。
工作队列的优势:
主线程只需要关注将任务加入到队列,而不用关心哪个线程执行了任务。工作线程从队列中取出任务并处理,线程彼此之间通过任务队列解耦,不需要主线程参与调度。这种结构使得主线程和工作线程可以独立工作,提高了并发处理效率。3.1.4 同步I/O模拟Proactor模式
这里的同步I/O模拟Proactor模式是一种事件驱动的编程模式。Proactor模式是一种异步I/O模型,但在这里使用的是同步I/O操作来模拟Proactor。具体来说:
在主线程中,I/O操作是同步完成的(例如等待客户端的输入/输出),但是通过事件通知(即利用工作队列和线程池),这些I/O操作可以交由工作线程去处理,使得看起来像异步I/O(尽管实际是同步)。3.1.5 半同步/半反应堆
半同步:主线程和工作线程并发工作,主线程是同步的,负责接收请求并加入工作队列,而工作线程也是同步的,从队列中取任务执行。半反应堆:主线程负责处理I/O事件,接收到I/O事件后将任务交给工作线程处理,工作线程同步执行任务。这种设计是为了提高系统的并发性和反应速度。3.1.6 总结
通过使用工作队列和线程池,主线程和工作线程之间的耦合关系被解除了,主线程不再直接控制和管理工作线程。主线程只负责接收请求并放入队列,而工作线程负责从队列中取任务并执行,提升了系统的扩展性和并发处理能力。这种结构使得各个部分可以独立工作,减少了模块之间的相互依赖。
完整注释版:
#ifndef THREADPOOL_H#define THREADPOOL_H#include <list>#include <cstdio>#include <exception>#include <pthread.h>#include "../lock/locker.h" // 包含自定义的锁机制(互斥锁和信号量)#include "../CGImysql/sql_connection_pool.h" // 数据库连接池类,用于处理数据库连接// 定义一个线程池类,T是模板类型,表示任务的类型template <typename T>class threadpool{public: /* * 构造函数,初始化线程池 * actor_model 表示工作模式,connPool 是数据库连接池,thread_number 是线程池中的线程数, * max_request 是最大允许的请求队列长度 */ threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000); // 析构函数,释放线程池资源 ~threadpool(); // 将新的请求任务添加到队列中,附带状态 bool append(T *request, int state); // 将新的请求任务添加到队列中,不附带状态 bool append_p(T *request);private: /* 工作线程运行的函数,它会不断从工作队列中取任务执行 */ static void *worker(void *arg); // 实际处理任务的函数,从任务队列中取出任务并处理 void run();private: int m_thread_number; // 线程池中的线程数 int m_max_requests; // 请求队列中允许的最大请求数 pthread_t *m_threads; // 描述线程池的数组,其大小为 m_thread_number std::list<T *> m_workqueue; // 请求队列,用于存储需要处理的任务 locker m_queuelocker; // 保护请求队列的互斥锁,避免多线程同时访问时产生竞态条件 sem m_queuestat; // 信号量,表示是否有任务需要处理 connection_pool *m_connPool; // 数据库连接池,用于任务处理时的数据库操作 int m_actor_model; // 模型切换,表示不同的处理模式};// 构造函数,初始化线程池template <typename T>threadpool<T>::threadpool(int actor_model, connection_pool *connPool, int thread_number, int max_requests) : m_actor_model(actor_model), m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL), m_connPool(connPool){ // 如果线程数或请求数不合法,抛出异常 if (thread_number <= 0 || max_requests <= 0) throw std::exception(); // 动态分配线程数组 m_threads = new pthread_t[m_thread_number]; if (!m_threads) throw std::exception(); // 循环创建线程,每个线程调用 worker 函数来执行任务 for (int i = 0; i < thread_number; ++i) { // 创建线程,worker 是线程的入口函数 if (pthread_create(m_threads + i, NULL, worker, this) != 0) { delete[] m_threads; // 创建线程失败,清理已分配的资源 throw std::exception(); } // 线程分离模式,线程结束后自动释放资源 if (pthread_detach(m_threads[i])) { delete[] m_threads; throw std::exception(); } }}// 析构函数,释放线程数组的资源template <typename T>threadpool<T>::~threadpool(){ delete[] m_threads;}// 将请求任务加入请求队列,并指定任务的状态(读或写)template <typename T>bool threadpool<T>::append(T *request, int state){ // 加锁,确保对队列的操作是线程安全的 m_queuelocker.lock(); // 如果请求队列已满,解锁并返回 false if (m_workqueue.size() >= m_max_requests) { m_queuelocker.unlock(); return false; } // 设置请求的状态 request->m_state = state; // 将请求添加到队列的末尾 m_workqueue.push_back(request); // 解锁 m_queuelocker.unlock(); // 通知有新任务要处理 m_queuestat.post(); return true;}// 将请求任务加入请求队列,不指定状态template <typename T>bool threadpool<T>::append_p(T *request){ // 加锁,确保对队列的操作是线程安全的 m_queuelocker.lock(); // 如果请求队列已满,解锁并返回 false if (m_workqueue.size() >= m_max_requests) { m_queuelocker.unlock(); return false; } // 将请求添加到队列的末尾 m_workqueue.push_back(request); // 解锁 m_queuelocker.unlock(); // 通知有新任务要处理 m_queuestat.post(); return true;}// 线程入口函数,线程从任务队列中取任务处理template <typename T>void *threadpool<T>::worker(void *arg){ // 将传入的参数转换为线程池对象 threadpool *pool = (threadpool *)arg; // 调用线程池的 run 函数,执行任务 pool->run(); return pool;}// 处理任务的主函数,循环从请求队列中取任务并处理template <typename T>void threadpool<T>::run(){ while (true) { // 等待有任务到来,信号量阻塞线程 m_queuestat.wait(); // 加锁以安全访问请求队列 m_queuelocker.lock(); // 如果队列为空,解锁并继续等待 if (m_workqueue.empty()) { m_queuelocker.unlock(); continue; } // 从队列头取出一个请求 T *request = m_workqueue.front(); // 移除取出的请求 m_workqueue.pop_front(); // 解锁 m_queuelocker.unlock(); // 如果请求为空,继续处理下一个请求 if (!request) continue; // 根据不同的 actor 模型处理请求 if (1 == m_actor_model) { // 读操作 if (0 == request->m_state) { // 调用 read_once 尝试读取数据 if (request->read_once()) { // 数据读取成功,标记为需要进一步处理 request->improv = 1; // 使用数据库连接池处理数据库相关任务 connectionRAII mysqlcon(&request->mysql, m_connPool); // 处理请求 request->process(); } else { // 数据读取失败,设置定时器标志 request->improv = 1; request->timer_flag = 1; } } else { // 写操作 if (request->write()) { // 数据写入成功,标记为处理完成 request->improv = 1; } else { // 数据写入失败,设置定时器标志 request->improv = 1; request->timer_flag = 1; } } } else { // 如果使用另一种模型,不区分读写,直接处理 connectionRAII mysqlcon(&request->mysql, m_connPool); request->process(); } }}#endif
代码详细解析版:
3.2. 互斥锁(Mutex)
互斥锁(locker
)是一种同步机制,用于在多线程环境中保护共享资源(如任务队列)不被多个线程同时访问。它确保同一时刻只有一个线程可以访问某个资源,从而防止竞态条件(即多个线程同时读写共享数据,导致数据不一致或错误)。
在代码中的作用:
m_queuelocker.lock(); // 锁定互斥锁m_queuelocker.unlock(); // 解锁互斥锁
在这段代码中,m_queuelocker
是一个互斥锁对象。在对共享资源(如 m_workqueue
,即任务队列)进行操作时,加锁以确保只有一个线程可以同时访问队列,这样可以避免多个线程同时对队列进行读写操作而引发竞态条件。
3.3. 信号量(Semaphore)
信号量是一种控制多个线程访问共享资源的机制。与互斥锁不同的是,信号量可以用来控制线程的等待和唤醒。它常用于限制访问某个资源的线程数量,或者在某些条件满足时让线程开始执行任务。
在代码中的作用:
m_queuestat.wait(); // 信号量等待m_queuestat.post(); // 信号量通知
m_queuestat.wait()
表示当信号量的值为 0 时,线程会被阻塞,直到信号量大于 0。m_queuestat.post()
表示增加信号量的值,通知等待的线程有任务要处理,让线程继续运行。 在线程池的这段代码中,信号量 m_queuestat
是用来表示队列中是否有任务。如果信号量值为 0,线程就会等待;如果信号量值大于 0,线程会继续运行并从任务队列中取任务处理。
3.4. 线程分离模式
线程分离模式(Detached Thread)是一种线程管理方式。在这种模式下,线程结束后会自动释放自己的资源,而无需其他线程通过 pthread_join
来回收它的资源。这种方式减少了线程管理的开销。
在代码中的作用:
if (pthread_detach(m_threads[i])) { ... }
在这段代码中,每个线程创建后通过 pthread_detach
函数设为分离模式,这样当线程结束时,系统会自动释放它的资源,而不需要显式地调用 pthread_join
来等待和回收线程资源。
3.5. 信号量阻塞线程
针对的是这段代码:
while (true){ // 等待有任务到来,信号量阻塞线程 m_queuestat.wait(); // 加锁以安全访问请求队列 m_queuelocker.lock(); // 如果队列为空,解锁并继续等待 if (m_workqueue.empty()) { m_queuelocker.unlock(); continue; } // 其他处理逻辑}
信号量阻塞线程的意思:
当m_queuestat.wait()
执行时,信号量的值被检查。如果信号量的值为 0,表示没有任务可以处理,线程会进入等待状态,即被阻塞,直到有新任务被添加到队列(通过 m_queuestat.post()
增加信号量值),线程才会被唤醒并继续执行任务。如果有任务(即信号量的值大于 0),线程就可以从任务队列中取出任务并执行。 这段代码中的 m_queuestat.wait()
是为了防止线程在没有任务时空转。信号量为 0 时,线程会进入阻塞状态,节省系统资源,等到任务到来(m_queuestat.post()
调用),信号量增加,线程被唤醒。
3.6. root相关
这里主要是服务器的静态资源存放和html网页代码 不赘述
4. log.cpp
同步/异步日志系统主要涉及了两个模块,一个是日志模块,一个是阻塞队列模块,其中加入阻塞队列模块主要是解决异步写入日志做准备.
自定义阻塞队列单例模式创建日志同步日志异步日志实现按天、超行分类完整注释版:
#include <string.h> // 包含字符串处理函数,如 strcpy、strncpy 等#include <time.h> // 包含处理时间的库,如 time、localtime 等#include <sys/time.h> // 包含 gettimeofday 用来获取当前时间#include <stdarg.h> // 处理不定参数函数#include "log.h" // 包含 Log 类的定义#include <pthread.h> // 包含多线程处理函数using namespace std; // 使用标准命名空间// Log 类的构造函数,初始化日志类对象的成员变量Log::Log(){ m_count = 0; // 初始化日志条目计数器为 0 m_is_async = false; // 默认设置日志为同步模式}// Log 类的析构函数,负责清理资源Log::~Log(){ if (m_fp != NULL) // 如果日志文件指针不为空,则关闭文件 { fclose(m_fp); // 关闭日志文件 }}// 初始化日志系统的函数// file_name: 日志文件名,close_log: 是否关闭日志,log_buf_size: 日志缓冲区大小,split_lines: 最大行数,max_queue_size: 阻塞队列大小bool Log::init(const char *file_name, int close_log, int log_buf_size, int split_lines, int max_queue_size){ // 如果设置了 max_queue_size,说明要使用异步日志 if (max_queue_size >= 1) { m_is_async = true; // 设置为异步模式 // 创建一个阻塞队列,队列的最大长度为 max_queue_size m_log_queue = new block_queue<string>(max_queue_size); pthread_t tid; // 定义一个线程 id // 创建一个新线程,用来异步写入日志,flush_log_thread 是回调函数 pthread_create(&tid, NULL, flush_log_thread, NULL); } m_close_log = close_log; // 记录是否关闭日志 m_log_buf_size = log_buf_size; // 设置日志缓冲区大小 m_buf = new char[m_log_buf_size]; // 创建一个日志缓冲区 memset(m_buf, '\0', m_log_buf_size); // 将缓冲区初始化为全 0 m_split_lines = split_lines; // 设置日志文件的最大行数 time_t t = time(NULL); // 获取当前系统时间 struct tm *sys_tm = localtime(&t); // 将系统时间转换为本地时间 struct tm my_tm = *sys_tm; // 复制一份时间结构体 // 获取日志文件名中最后一个 '/' 的位置,判断是否有路径 const char *p = strrchr(file_name, '/'); char log_full_name[256] = {0}; // 创建一个用于存放完整日志文件名的字符串 if (p == NULL) // 如果文件名中没有路径分隔符 '/' { // 直接用时间信息和文件名创建日志文件名 snprintf(log_full_name, 255, "%d_%02d_%02d_%s", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, file_name); } else // 如果文件名中有路径 { strcpy(log_name, p + 1); // 提取文件名部分 strncpy(dir_name, file_name, p - file_name + 1); // 提取目录路径部分 // 使用路径和时间信息创建完整的日志文件名 snprintf(log_full_name, 255, "%s%d_%02d_%02d_%s", dir_name, my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, log_name); } m_today = my_tm.tm_mday; // 记录当天日期 // 打开日志文件,追加模式,如果打开失败则返回 false m_fp = fopen(log_full_name, "a"); if (m_fp == NULL) { return false; // 如果文件打开失败,返回 false } return true; // 初始化成功}// 写入日志的函数// level: 日志级别,format: 格式化字符串,...: 可变参数void Log::write_log(int level, const char *format, ...){ struct timeval now = {0, 0}; // 定义时间结构体 gettimeofday(&now, NULL); // 获取当前时间,精确到微秒 time_t t = now.tv_sec; // 提取秒数 struct tm *sys_tm = localtime(&t); // 转换为本地时间 struct tm my_tm = *sys_tm; // 复制一份本地时间结构体 char s[16] = {0}; // 用于存储日志级别的字符串 // 根据日志级别设置相应的前缀 switch (level) { case 0: strcpy(s, "[debug]:"); // 调试级别 break; case 1: strcpy(s, "[info]:"); // 信息级别 break; case 2: strcpy(s, "[warn]:"); // 警告级别 break; case 3: strcpy(s, "[erro]:"); // 错误级别 break; default: strcpy(s, "[info]:"); // 默认级别 break; } m_mutex.lock(); // 加锁,防止多线程环境下的数据竞争 m_count++; // 日志计数器加 1 // 如果当前日期不是今天或者日志条数超过设定的最大行数 if (m_today != my_tm.tm_mday || m_count % m_split_lines == 0) { char new_log[256] = {0}; // 用于存储新的日志文件名 fflush(m_fp); // 刷新日志文件流,将缓冲区内容写入文件 fclose(m_fp); // 关闭当前日志文件 char tail[16] = {0}; // 用于存储时间后缀 // 创建新的日志文件名,包含日期 snprintf(tail, 16, "%d_%02d_%02d_", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday); if (m_today != my_tm.tm_mday) // 如果日期改变 { snprintf(new_log, 255, "%s%s%s", dir_name, tail, log_name); // 按照日期重新创建日志文件 m_today = my_tm.tm_mday; // 更新当前日期 m_count = 0; // 重置日志计数 } else // 如果只是达到最大行数限制 { snprintf(new_log, 255, "%s%s%s.%lld", dir_name, tail, log_name, m_count / m_split_lines); // 新建分割日志文件 } m_fp = fopen(new_log, "a"); // 打开新的日志文件 } m_mutex.unlock(); // 解锁 va_list valst; // 定义可变参数列表 va_start(valst, format); // 初始化可变参数列表 string log_str; // 用于存储最终的日志字符串 m_mutex.lock(); // 加锁 // 将日志时间、级别等信息格式化到缓冲区 int n = snprintf(m_buf, 48, "%d-%02d-%02d %02d:%02d:%02d.%06ld %s ", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, my_tm.tm_hour, my_tm.tm_min, my_tm.tm_sec, now.tv_usec, s); // 将可变参数的日志内容格式化到缓冲区 int m = vsnprintf(m_buf + n, m_log_buf_size - n - 1, format, valst); m_buf[n + m] = '\n'; // 添加换行符 m_buf[n + m + 1] = '\0'; // 添加字符串结束符 log_str = m_buf; // 将缓冲区内容转换为字符串 m_mutex.unlock(); // 解锁 // 如果是异步模式且日志队列未满,则将日志推入队列 if (m_is_async && !m_log_queue->full()) { m_log_queue->push(log_str); } else // 如果是同步模式或者队列已满,直接写入文件 { m_mutex.lock(); // 加锁 fputs(log_str.c_str(), m_fp); // 将日志写入文件 m_mutex.unlock(); // 解锁 } va_end(valst); // 结束可变参数处理}// 刷新日志函数,强制将缓冲区内容写入日志文件void Log::flush(void){ m_mutex.lock(); // 加锁 fflush(m_fp); // 刷新文件流 m_mutex.unlock(); // 解锁}
代码详细解析版:
#include <string.h>#include <time.h>#include <sys/time.h>#include <stdarg.h>#include "log.h"#include <pthread.h>using namespace std;
引入标准库头文件:处理字符串操作、时间操作、可变参数列表和多线程相关功能。 Log::Log(){ m_count = 0; m_is_async = false;}
Log
类的构造函数,初始化日志计数器m_count
为0,并将m_is_async
设置为false
(表示默认是同步日志)。 Log::~Log(){ if (m_fp != NULL) { fclose(m_fp); }}
Log
类的析构函数,负责关闭日志文件指针m_fp
,防止内存泄漏。 bool Log::init(const char *file_name, int close_log, int log_buf_size, int split_lines, int max_queue_size){ //如果设置了max_queue_size,则设置为异步 if (max_queue_size >= 1) { m_is_async = true; m_log_queue = new block_queue<string>(max_queue_size); pthread_t tid; // flush_log_thread为回调函数, 创建线程用于异步写日志 pthread_create(&tid, NULL, flush_log_thread, NULL); }
init()
函数初始化日志系统。 如果传入的 max_queue_size
大于等于 1,说明需要使用异步日志系统。此时创建一个阻塞队列 m_log_queue
来存储日志消息,并通过 pthread_create
创建一个新线程,用来异步处理日志写入(flush_log_thread
是回调函数,负责在后台写日志)。 m_close_log = close_log; m_log_buf_size = log_buf_size; m_buf = new char[m_log_buf_size]; memset(m_buf, '\0', m_log_buf_size); m_split_lines = split_lines;
初始化日志的基本参数: m_close_log
: 是否关闭日志m_log_buf_size
: 日志缓冲区大小m_buf
: 分配用于存储日志消息的缓冲区m_split_lines
: 指定每个日志文件的最大行数,当超过该行数时,创建一个新文件。 time_t t = time(NULL); struct tm *sys_tm = localtime(&t); struct tm my_tm = *sys_tm; const char *p = strrchr(file_name, '/'); char log_full_name[256] = {0};
获取当前系统时间 t
,并将其转换为本地时间 sys_tm
。查找文件名 file_name
中的最后一个斜杠 /
位置,确定文件名路径和文件名。 if (p == NULL) { snprintf(log_full_name, 255, "%d_%02d_%02d_%s", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, file_name); } else { strcpy(log_name, p + 1); strncpy(dir_name, file_name, p - file_name + 1); snprintf(log_full_name, 255, "%s%d_%02d_%02d_%s", dir_name, my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, log_name); }
根据当前时间和提供的文件名生成完整的日志文件名: 如果没有路径,则只使用文件名。如果有路径,则将路径和日期组合。 m_today = my_tm.tm_mday; m_fp = fopen(log_full_name, "a"); if (m_fp == NULL) { return false; } return true;}
打开生成的日志文件并将指针保存在 m_fp
中,"a"
模式表示追加写入。返回 true
表示初始化成功。 void Log::write_log(int level, const char *format, ...){ struct timeval now = {0, 0}; gettimeofday(&now, NULL); time_t t = now.tv_sec; struct tm *sys_tm = localtime(&t); struct tm my_tm = *sys_tm; char s[16] = {0}; switch (level) { case 0: strcpy(s, "[debug]:"); break; case 1: strcpy(s, "[info]:"); break; case 2: strcpy(s, "[warn]:"); break; case 3: strcpy(s, "[erro]:"); break; default: strcpy(s, "[info]:"); break; }
write_log()
方法用于写入日志,首先获取当前时间,并根据传入的日志级别 level
决定日志级别标签(如 [debug]
, [info]
, [warn]
, [erro]
等)。 m_mutex.lock(); m_count++;
日志写入计数 m_count
增加,并使用互斥锁 m_mutex
来防止多线程竞争条件。 if (m_today != my_tm.tm_mday || m_count % m_split_lines == 0) { char new_log[256] = {0}; fflush(m_fp); fclose(m_fp); char tail[16] = {0}; snprintf(tail, 16, "%d_%02d_%02d_", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday); if (m_today != my_tm.tm_mday) { snprintf(new_log, 255, "%s%s%s", dir_name, tail, log_name); m_today = my_tm.tm_mday; m_count = 0; } else { snprintf(new_log, 255, "%s%s%s.%lld", dir_name, tail, log_name, m_count / m_split_lines); } m_fp = fopen(new_log, "a"); } m_mutex.unlock();
如果当前日期 m_today
与日志写入时间不同,或者日志行数超过最大行数 m_split_lines
,则生成新日志文件,关闭旧文件并打开新文件。 va_list valst; va_start(valst, format); string log_str; m_mutex.lock();
开始处理可变参数日志消息,va_list
获取可变参数 format
。 int n = snprintf(m_buf, 48, "%d-%02d-%02d %02d:%02d:%02d.%06ld %s ", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, my_tm.tm_hour, my_tm.tm_min, my_tm.tm_sec, now.tv_usec, s); int m = vsnprintf(m_buf + n, m_log_buf_size - n - 1, format, valst); m_buf[n + m] = '\n'; m_buf[n + m + 1] = '\0'; log_str = m_buf; m_mutex.unlock();
将日志的时间、日志级别和可变参数内容格式化并写入缓冲区 m_buf
,然后形成完整的日志字符串 log_str
。 if (m_is_async && !m_log_queue->full()) { m_log_queue->push(log_str); } else { m_mutex.lock(); fputs(log_str.c_str(), m_fp); m_mutex.unlock(); } va_end(valst);}
如果是异步日志且日志队列未满,将日志推送到队列 m_log_queue
。如果是同步日志,直接写入日志文件。 void Log::flush(void){ m_mutex.lock(); //强制刷新写入流缓冲区 fflush(m_fp); m_mutex.unlock();}
flush()
函数用于强制刷新文件缓冲区,将日志立即写入磁盘。 5. block_queue.cpp
完整注释版:
/**************************************************************循环数组实现的阻塞队列,m_back = (m_back + 1) % m_max_size; *线程安全,每个操作前都要先加互斥锁,操作完后,再解锁**************************************************************/#ifndef BLOCK_QUEUE_H#define BLOCK_QUEUE_H#include <iostream>#include <stdlib.h>#include <pthread.h>#include <sys/time.h>#include "../lock/locker.h" // 包含自定义的互斥锁类using namespace std;template <class T>class block_queue{public: // 构造函数,初始化循环队列 block_queue(int max_size = 1000) { if (max_size <= 0) // 若传入最大大小无效,程序退出 { exit(-1); } m_max_size = max_size; // 设置队列最大长度 m_array = new T[max_size]; // 动态分配内存创建队列 m_size = 0; // 队列初始大小为0 m_front = -1; // 初始化队头指针 m_back = -1; // 初始化队尾指针 } // 清空队列的函数 void clear() { m_mutex.lock(); // 加锁,保证线程安全 m_size = 0; // 重置队列大小 m_front = -1; // 重置队头指针 m_back = -1; // 重置队尾指针 m_mutex.unlock(); // 解锁 } // 析构函数,释放动态分配的内存 ~block_queue() { m_mutex.lock(); if (m_array != NULL) delete[] m_array; // 释放队列内存 m_mutex.unlock(); } // 判断队列是否已满 bool full() { m_mutex.lock(); // 加锁以确保线程安全 if (m_size >= m_max_size) // 如果队列大小超过或等于最大值,表示已满 { m_mutex.unlock(); // 解锁 return true; } m_mutex.unlock(); return false; // 否则队列未满 } // 判断队列是否为空 bool empty() { m_mutex.lock(); // 加锁 if (0 == m_size) // 如果队列大小为0,表示为空 { m_mutex.unlock(); // 解锁 return true; } m_mutex.unlock(); return false; } // 获取队首元素(不删除),通过引用返回 bool front(T &value) { m_mutex.lock(); if (0 == m_size) // 若队列为空,返回false { m_mutex.unlock(); return false; } value = m_array[m_front]; // 将队首元素赋值给value m_mutex.unlock(); return true; } // 获取队尾元素(不删除) bool back(T &value) { m_mutex.lock(); if (0 == m_size) // 若队列为空,返回false { m_mutex.unlock(); return false; } value = m_array[m_back]; // 将队尾元素赋值给value m_mutex.unlock(); return true; } // 获取当前队列大小 int size() { int tmp = 0; m_mutex.lock(); tmp = m_size; // 获取队列当前大小 m_mutex.unlock(); return tmp; } // 获取队列的最大容量 int max_size() { int tmp = 0; m_mutex.lock(); tmp = m_max_size; // 获取队列最大容量 m_mutex.unlock(); return tmp; } // 向队列中添加元素 bool push(const T &item) { m_mutex.lock(); if (m_size >= m_max_size) // 如果队列已满,广播并返回false { m_cond.broadcast(); m_mutex.unlock(); return false; } // 计算队尾位置,循环数组 m_back = (m_back + 1) % m_max_size; m_array[m_back] = item; // 将元素加入队尾 m_size++; // 队列大小加1 m_cond.broadcast(); // 唤醒等待条件的线程 m_mutex.unlock(); return true; } // 从队列中取出元素,若为空则等待 bool pop(T &item) { m_mutex.lock(); while (m_size <= 0) // 如果队列为空,等待条件变量 { if (!m_cond.wait(m_mutex.get())) // 等待信号量,线程阻塞 { m_mutex.unlock(); return false; // 超时或其他情况返回false } } // 计算队首位置,循环数组 m_front = (m_front + 1) % m_max_size; item = m_array[m_front]; // 将队首元素赋值给item m_size--; // 队列大小减1 m_mutex.unlock(); return true; } // 带超时功能的pop操作 bool pop(T &item, int ms_timeout) { struct timespec t = {0, 0}; struct timeval now = {0, 0}; gettimeofday(&now, NULL); // 获取当前时间 m_mutex.lock(); if (m_size <= 0) { t.tv_sec = now.tv_sec + ms_timeout / 1000; // 设置超时时间秒部分 t.tv_nsec = (ms_timeout % 1000) * 1000; // 设置超时时间毫秒部分 if (!m_cond.timewait(m_mutex.get(), t)) // 等待条件变量超时 { m_mutex.unlock(); return false; } } if (m_size <= 0) // 如果队列仍为空,返回false { m_mutex.unlock(); return false; } m_front = (m_front + 1) % m_max_size; // 计算队首位置 item = m_array[m_front]; // 将队首元素赋值给item m_size--; // 队列大小减1 m_mutex.unlock(); return true; }private: locker m_mutex; // 自定义的互斥锁,确保线程安全 cond m_cond; // 条件变量,用于线程同步 T *m_array; // 队列的数组指针 int m_size; // 当前队列大小 int m_max_size; // 队列最大容量 int m_front; // 队首指针 int m_back; // 队尾指针};#endif
代码详细解析版:
5.1 用循环队列实现异步日志系统的解释:
循环队列的实现:
这段代码实现了一个循环队列(环形缓冲区),即当队列到达末尾时,它会循环回到起点继续存储数据。因此,m_back
和 m_front
通过 % m_max_size
来实现循环。当 m_size
达到最大值时,队列被认为已满。
线程安全:
为了保证多线程环境下的安全性,每次对队列的操作都会加锁和解锁(m_mutex.lock()
和 m_mutex.unlock()
)。条件变量(m_cond
)用于在队列为空或已满时阻塞操作,使线程等待或者唤醒。
异步日志系统中的作用:
循环队列用于缓冲日志信息。在异步日志系统中,主线程可以快速将日志消息写入队列,而专门的日志线程则从队列中异步取出日志并写入日志文件。
这样,主线程无需等待日志写入完成,提高了系统的响应速度。
为什么使用循环队列:
为了更好地理解为什么要使用循环队列(Circular Queue)以及它在异步日志系统中的优势,下面我将详细解释其工作原理,和普通队列进行对比,并深入分析其设计带来的好处。
5.1.1 普通队列的缺点
普通队列(如链式队列或顺序队列)存在几个问题,在特定情况下可能导致性能下降或不适合高效的异步日志处理场景:
链式队列:
动态内存分配:链式队列需要不断地动态分配内存(malloc
或 new
),在高频率插入和删除操作中,这种动态分配和释放会产生开销,导致内存碎片问题,影响性能。指针操作复杂:链式队列的每个节点都需要维护指针,操作相对较复杂,内存消耗更高,可能降低性能。 顺序队列:
空间不连续利用:顺序队列使用线性数组实现,插入操作会使队列从队尾向前推进,而删除操作会从队头开始,删除后的空间不能重新利用,导致**“假溢出”问题**,即使队列还有空间,队列看起来也满了,需要频繁移动元素来释放空间。5.1.2 循环队列的优势
循环队列是一种优化设计,适用于高效处理高频率的插入和删除操作,尤其在异步日志系统等场景中非常有用。以下是循环队列的具体优点:
高效利用内存:
循环队列使用固定大小的数组(在代码中,m_array = new T[max_size];
),在初始化时一次性分配,避免了动态内存分配和释放。队列中的空间通过循环方式重复利用。当队列满了,指针从队尾回到队首,继续插入新数据。这意味着即使队列的指针不断前进,也不会浪费空间。对比顺序队列,不需要移动元素来释放空间,避免了顺序队列中的“假溢出”问题。 无锁高效性(尽量减少锁开销):
锁(m_mutex.lock()
和 m_mutex.unlock()
)用于确保在多线程环境中的线程安全,但锁会带来一定的开销。循环队列可以尽量避免复杂的数据结构操作,只需要在队头和队尾进行加减法和取模运算(% m_max_size
),使得边界处理简单高效,插入和删除操作的开销较低。在绝大多数操作中,只需要通过简单的加减法调整指针位置,不需要复杂的指针操作或内存分配。 异步处理,避免主线程阻塞:
在异步日志系统中,主线程(如服务器的请求处理线程)可能会频繁地产生日志消息。如果采用同步日志写入方式(主线程直接写入日志文件),每次写入都可能造成主线程的阻塞,降低系统性能。使用循环队列可以让主线程在生成日志时快速将日志写入队列,而不是等待日志写入完成。日志写入队列后,专门的日志处理线程从队列中读取日志并写入文件,这种生产者-消费者模式有效提高了系统吞吐量,避免了主线程的阻塞。5.1.3. 与普通队列的具体区别
内存管理:
普通链式队列:每次插入删除都需要动态分配/释放内存,导致频繁的内存操作和内存碎片,影响系统性能。循环队列:只在初始化时分配内存,并通过固定数组重复利用空间,避免频繁的内存操作和碎片问题。边界处理:
普通顺序队列:顺序队列的前端删除数据后,前端的空间无法再次使用,除非移动数据,这带来了额外的开销。循环队列:指针到达末尾时会回绕到数组的开头,从而避免假溢出,提高空间利用率。插入和删除效率:
普通队列:链式队列插入和删除元素时需要管理指针,操作相对复杂。顺序队列则可能需要移动数据以维护队列。循环队列:插入和删除操作只涉及简单的指针加减和取模操作,性能更高且更稳定。5.1.4. 在异步日志系统中的具体应用
循环队列在异步日志系统中的应用场景: 主线程生成日志消息,将日志条目插入循环队列。这是一个快速的操作,能够避免主线程被阻塞。一个独立的日志线程持续从队列中取出日志并将其写入日志文件。如果队列为空,日志线程可以等待队列填充。异步日志系统的关键点:队列实现了生产者-消费者模式,主线程和日志线程之间通过队列解耦,这样主线程可以继续处理其他任务,而不用等待日志写入完成。5.1.5 总结:
循环队列由于其固定内存占用、高效的插入删除操作和循环利用数组空间的特点,特别适合用于异步日志系统。在高并发环境下,它能够显著提高系统性能,避免主线程被日志操作阻塞。而普通队列则可能带来不必要的内存管理开销和效率低下的问题。因此,循环队列是处理高频日志写入等场景的更优选择。
5.2 带超时功能的 pop
5.2.1. pop
方法的含义
pop
是一种从队列中取出(移除)元素的操作。在一个循环队列中,pop
操作通常用于消费者线程来从队列中获取数据。这是异步日志系统中的一个关键功能,因为日志记录通常需要在多个线程之间高效地传递和处理数据。
5.2.2. 带超时功能的 pop
方法
带超时功能的 pop
方法是在等待从队列中取出元素时设定一个时间限制(超时时间)。如果在设定的时间内没有数据可取,那么 pop
方法会超时并返回 false
,表示取数据失败。这种机制避免了消费者线程无限期等待的问题,从而提高系统的响应性和健壮性。
5.2.3. 为什么要实现超时功能
避免死锁:在多线程环境中,如果消费者线程无限期地等待一个永远不会发生的事件(如没有生产者放入数据),会导致死锁情况。超时机制允许消费者线程在指定时间内没有取到数据时退出等待,从而继续其他操作。提高系统响应性:在系统资源紧张或生产者线程无法及时生产数据时,带超时功能的pop
能让消费者线程在合理的时间窗口内恢复运行,而不是卡住。防止资源耗尽:系统中可能有多个消费者线程等待队列中的数据,如果所有线程无限制地阻塞等待,会导致资源(如线程池资源)被耗尽。超时机制可以避免这种情况。 5.2.4. 代码中的实现方式
时间获取与计算:
struct timespec t = {0, 0};struct timeval now = {0, 0};gettimeofday(&now, NULL); // 获取当前时间
gettimeofday
函数获取当前的时间,这个时间将用于计算等待的截止时间。
设置超时时间:
t.tv_sec = now.tv_sec + ms_timeout / 1000; // 设置超时时间秒部分t.tv_nsec = (ms_timeout % 1000) * 1000; // 设置超时时间毫秒部分
根据用户传入的超时参数 ms_timeout
(以毫秒为单位),计算出未来的一个时间点 t
,该时间点是当前时间加上超时时间。
等待条件变量:
if (!m_cond.timewait(m_mutex.get(), t)) // 等待条件变量超时{ m_mutex.unlock(); return false;}
调用 m_cond.timewait(m_mutex.get(), t)
,这是一个带有超时的条件等待操作。线程会在 m_cond
条件变量上等待,直到队列中有数据可取,或者超过指定的时间点 t
。
检查并取出元素:
如果在超时时间内队列变得不为空,线程会继续执行以下代码:
m_front = (m_front + 1) % m_max_size; // 计算队首位置item = m_array[m_front]; // 将队首元素赋值给itemm_size--; // 队列大小减1
更新队首指针,取出队首元素,将其赋值给 item
,并减少队列大小。
5.2.5. 总结
带超时的 pop
方法通过在取数据的过程中设置时间限制,确保了系统的高效性和稳定性,使消费者线程不会因为数据缺失而长期阻塞。这在异步日志系统中尤其重要,因为它确保了日志记录和其他后台任务的流畅运行,避免了线程饥饿和资源浪费的情况。
6. log.h
代码详细解析版:
文件保护机制
#ifndef LOCKER_H#define LOCKER_H
这两行用于防止头文件的重复包含。#ifndef LOCKER_H
检查 LOCKER_H
是否已经定义,如果没有,则定义它。这防止头文件被多次包含,导致编译错误。
信号量类
#include <exception>#include <pthread.h>#include <semaphore.h>
这些头文件包含了异常处理、POSIX 线程库(用于互斥锁和条件变量)以及信号量(semaphore
)的相关功能。
class sem{public: sem() { if (sem_init(&m_sem, 0, 0) != 0) { throw std::exception(); } }
这是信号量类的默认构造函数。sem_init(&m_sem, 0, 0)
初始化一个值为 0 的信号量 m_sem
。第二个参数 0
表示这是一个用于线程间的信号量。如果初始化失败,抛出异常。
sem(int num) { if (sem_init(&m_sem, 0, num) != 0) { throw std::exception(); } }
这是一个重载的构造函数,允许你通过 num
来初始化信号量的初始值。仍然是线程间信号量,若初始化失败,抛出异常。
~sem() { sem_destroy(&m_sem); }
这是析构函数,销毁信号量 m_sem
,释放资源。
bool wait() { return sem_wait(&m_sem) == 0; }
wait()
是阻塞操作,减少信号量的值。如果信号量值为 0,调用线程会被阻塞,直到其他线程调用 post()
增加信号量的值。成功时返回 true
,否则返回 false
。
bool post() { return sem_post(&m_sem) == 0; }
post()
增加信号量的值,唤醒等待的线程。成功时返回 true
。
private: sem_t m_sem;};
m_sem
是一个类型为 sem_t
的信号量,信号量实现了线程间同步。
互斥锁类
class locker{public: locker() { if (pthread_mutex_init(&m_mutex, NULL) != 0) { throw std::exception(); } }
这是互斥锁类的构造函数。pthread_mutex_init(&m_mutex, NULL)
初始化一个默认属性的互斥锁 m_mutex
。如果初始化失败,则抛出异常。
~locker() { pthread_mutex_destroy(&m_mutex); }
析构函数销毁互斥锁,释放资源。
bool lock() { return pthread_mutex_lock(&m_mutex) == 0; }
lock()
是上锁操作,如果互斥锁已经被其他线程锁住,调用线程将被阻塞,直到锁可以被获取。成功时返回 true
。
bool unlock() { return pthread_mutex_unlock(&m_mutex) == 0; }
unlock()
是解锁操作,释放锁。成功时返回 true
。
pthread_mutex_t *get() { return &m_mutex; }private: pthread_mutex_t m_mutex;};
get()
函数返回互斥锁的指针 &m_mutex
,可能在其他类或函数中需要传递这个锁。m_mutex
是一个 POSIX 的 pthread_mutex_t
类型互斥锁。
条件变量类
class cond{public: cond() { if (pthread_cond_init(&m_cond, NULL) != 0) { throw std::exception(); } }
条件变量类的构造函数。pthread_cond_init(&m_cond, NULL)
初始化条件变量 m_cond
,用于线程同步。如果初始化失败,抛出异常。
~cond() { pthread_cond_destroy(&m_cond); }
析构函数销毁条件变量,释放资源。
bool wait(pthread_mutex_t *m_mutex) { int ret = 0; ret = pthread_cond_wait(&m_cond, m_mutex); return ret == 0; }
wait()
用于等待条件变量满足。它要求持有互斥锁 m_mutex
。当条件不满足时,调用线程进入等待状态,并且释放锁。条件满足时,线程被唤醒,重新获得锁。返回 true
表示等待成功。
bool timewait(pthread_mutex_t *m_mutex, struct timespec t) { int ret = 0; ret = pthread_cond_timedwait(&m_cond, m_mutex, &t); return ret == 0; }
timewait()
是带超时的等待,指定时间 t
,如果条件在时间内未满足,返回 false
。
bool signal() { return pthread_cond_signal(&m_cond) == 0; }
signal()
唤醒等待该条件变量的一个线程。成功时返回 true
。
bool broadcast() { return pthread_cond_broadcast(&m_cond) == 0; }
broadcast()
唤醒等待该条件变量的所有线程。成功时返回 true
。
private: pthread_cond_t m_cond;};#endif
条件变量 m_cond
是 POSIX 的 pthread_cond_t
类型,用于线程之间的同步等待。
7. http_conn.cpp
完整注释版:
#include "http_conn.h"#include <mysql/mysql.h>#include <fstream>// 定义HTTP响应的一些状态信息,用于不同HTTP请求返回的状态码和描述const char *ok_200_title = "OK";const char *error_400_title = "Bad Request"; // 客户端请求有语法错误,服务器无法处理const char *error_400_form = "Your request has bad syntax or is inherently impossible to satisfy.\n";const char *error_403_title = "Forbidden"; // 客户端没有访问权限const char *error_403_form = "You do not have permission to get file from this server.\n";const char *error_404_title = "Not Found"; // 请求的资源不存在const char *error_404_form = "The requested file was not found on this server.\n";const char *error_500_title = "Internal Error"; // 服务器内部错误const char *error_500_form = "There was an unusual problem serving the request file.\n";// 用于线程安全操作的锁locker m_lock;// 存储用户名和密码的映射,用于验证登录和注册map<string, string> users;// 初始化数据库结果,将数据库中的用户信息读取到内存中void http_conn::initmysql_result(connection_pool *connPool){ // 从连接池中取出一个MYSQL连接 MYSQL *mysql = NULL; connectionRAII mysqlcon(&mysql, connPool); // 查询user表中的username和passwd字段,获取用户信息 if (mysql_query(mysql, "SELECT username,passwd FROM user")) { LOG_ERROR("SELECT error:%s\n", mysql_error(mysql)); } // 获取查询结果 MYSQL_RES *result = mysql_store_result(mysql); // 获取结果集中字段的数量 int num_fields = mysql_num_fields(result); // 获取所有字段的信息 MYSQL_FIELD *fields = mysql_fetch_fields(result); // 遍历结果集的每一行,将用户名和密码存入map中 while (MYSQL_ROW row = mysql_fetch_row(result)) { string temp1(row[0]); // 用户名 string temp2(row[1]); // 密码 users[temp1] = temp2; // 存入map }}// 设置文件描述符为非阻塞模式int setnonblocking(int fd){ int old_option = fcntl(fd, F_GETFL); // 获取当前的文件描述符状态标志 int new_option = old_option | O_NONBLOCK; // 添加非阻塞标志 fcntl(fd, F_SETFL, new_option); // 设置新的文件描述符状态 return old_option; // 返回旧的文件描述符状态}// 向内核事件表注册读事件,并设置触发模式为ET或LT模式,同时选择是否启用EPOLLONESHOT模式void addfd(int epollfd, int fd, bool one_shot, int TRIGMode){ epoll_event event; event.data.fd = fd; // 绑定fd if (1 == TRIGMode) // ET模式下,添加EPOLLET标志 event.events = EPOLLIN | EPOLLET | EPOLLRDHUP; // EPOLLRDHUP表示对端关闭连接 else event.events = EPOLLIN | EPOLLRDHUP; // LT模式下 if (one_shot) // 是否启用EPOLLONESHOT模式,防止同一个socket被多个线程处理 event.events |= EPOLLONESHOT; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); // 向epoll实例中注册事件 setnonblocking(fd); // 设置非阻塞}// 从内核事件表中删除文件描述符void removefd(int epollfd, int fd){ epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0); // 删除指定文件描述符的事件 close(fd); // 关闭文件描述符}// 修改文件描述符上的注册事件,重置EPOLLONESHOTvoid modfd(int epollfd, int fd, int ev, int TRIGMode){ epoll_event event; event.data.fd = fd; if (1 == TRIGMode) // ET模式 event.events = ev | EPOLLET | EPOLLONESHOT | EPOLLRDHUP; else // LT模式 event.events = ev | EPOLLONESHOT | EPOLLRDHUP; epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event); // 修改事件}int http_conn::m_user_count = 0; // 用户数量初始化为0int http_conn::m_epollfd = -1; // epoll实例文件描述符初始化为-1// 关闭连接,减少用户计数void http_conn::close_conn(bool real_close){ if (real_close && (m_sockfd != -1)) // 只有当real_close为true并且socket存在时才关闭连接 { printf("close %d\n", m_sockfd); removefd(m_epollfd, m_sockfd); // 从epoll中移除该文件描述符 m_sockfd = -1; // 重置socket描述符 m_user_count--; // 用户总量减1 }}// 初始化连接的相关信息void http_conn::init(int sockfd, const sockaddr_in &addr, char *root, int TRIGMode, int close_log, string user, string passwd, string sqlname){ m_sockfd = sockfd; // 保存传入的socket文件描述符 m_address = addr; // 保存客户端的地址信息 addfd(m_epollfd, sockfd, true, m_TRIGMode); // 注册epoll事件,并且启用EPOLLONESHOT模式 m_user_count++; // 新增一个用户 // 初始化一些相关参数,如网站根目录、触发模式、是否关闭日志等 doc_root = root; m_TRIGMode = TRIGMode; m_close_log = close_log; strcpy(sql_user, user.c_str()); // 保存数据库用户名 strcpy(sql_passwd, passwd.c_str()); // 保存数据库密码 strcpy(sql_name, sqlname.c_str()); // 保存数据库名 init(); // 调用init()函数初始化其他成员变量}// 初始化连接的一些内部状态void http_conn::init(){ mysql = NULL; bytes_to_send = 0; bytes_have_send = 0; m_check_state = CHECK_STATE_REQUESTLINE; // 初始状态为解析请求行 m_linger = false; m_method = GET; // 默认请求方法为GET m_url = 0; m_version = 0; m_content_length = 0; m_host = 0; m_start_line = 0; m_checked_idx = 0; m_read_idx = 0; m_write_idx = 0; cgi = 0; // CGI标志初始化为0 m_state = 0; timer_flag = 0; improv = 0; // 初始化读写缓冲区 memset(m_read_buf, '\0', READ_BUFFER_SIZE); memset(m_write_buf, '\0', WRITE_BUFFER_SIZE); memset(m_real_file, '\0', FILENAME_LEN);}// 从状态机,用于逐行解析读取到的数据http_conn::LINE_STATUS http_conn::parse_line(){ char temp; // 遍历缓冲区,从m_checked_idx位置开始逐字符检查 for (; m_checked_idx < m_read_idx; ++m_checked_idx) { temp = m_read_buf[m_checked_idx]; // 如果当前字符为回车符 if (temp == '\r') { // 如果回车符是缓冲区最后一个字符,则表示还没有完整的一行,返回LINE_OPEN if ((m_checked_idx + 1) == m_read_idx) return LINE_OPEN; // 如果回车符后面是换行符,说明读取到了一行完整的请求 else if (m_read_buf[m_checked_idx + 1] == '\n') { m_read_buf[m_checked_idx++] = '\0'; // 将回车符替换为字符串结束符 m_read_buf[m_checked_idx++] = '\0'; // 将换行符替换为字符串结束符 return LINE_OK; // 返回LINE_OK,表示读取到了一行 } return LINE_BAD; // 如果不是换行符,说明请求行格式错误 } // 如果当前字符是换行符 else if (temp == '\n') { // 检查前一个字符是否为回车符 if (m_checked_idx > 1 && m_read_buf[m_checked_idx - 1] == '\r') { m_read_buf[m_checked_idx - 1] = '\0'; // 将回车符替换为字符串结束符 m_read_buf[m_checked_idx++] = '\0'; // 将换行符替换为字符串结束符 return LINE_OK; } return LINE_BAD; // 如果前一个字符不是回车符,则返回LINE_BAD } } return LINE_OPEN; // 如果没有遇到回车换行,表示行不完整,返回LINE_OPEN}// 循环读取客户端数据,直到无数据可读或对方关闭连接bool http_conn::read_once(){ if (m_read_idx >= READ_BUFFER_SIZE) // 如果读缓冲区已满,则返回false { return false; } int bytes_read = 0; // LT模式读取数据 if (0 == m_TRIGMode) { // 从socket中读取数据,存储到读缓冲区 bytes_read = recv(m_sockfd, m_read_buf + m_read_idx, READ_BUFFER_SIZE - m_read_idx, 0); m_read_idx += bytes_read; if (bytes_read <= 0) // 如果读取到的数据为空或发生错误 { return false; } return true; } // ET模式读取数据,需要循环读取,直到没有数据可读 else { while (true) { // 尝试读取数据 bytes_read = recv(m_sockfd, m_read_buf + m_read_idx, READ_BUFFER_SIZE - m_read_idx, 0); if (bytes_read == -1) // 出现错误 { // 如果错误是EAGAIN或者EWOULDBLOCK,表示数据已经全部读取完毕 if (errno == EAGAIN || errno == EWOULDBLOCK) break; return false; // 否则,发生了其他错误 } else if (bytes_read == 0) // 对方关闭了连接 { return false; } m_read_idx += bytes_read; // 更新读索引 } return true; }}// 解析HTTP请求行,获取请求方法、目标URL及HTTP版本号http_conn::HTTP_CODE http_conn::parse_request_line(char *text){ m_url = strpbrk(text, " \t"); // 查找请求行中的第一个空格或制表符,后面是URL if (!m_url) { return BAD_REQUEST; // 如果没有找到空格或制表符,说明请求行格式错误 } *m_url++ = '\0'; // 将空格或制表符替换为字符串结束符,分离出请求方法 char *method = text; // 获取请求方法 if (strcasecmp(method, "GET") == 0) // 比较请求方法是否为GET m_method = GET; else if (strcasecmp(method, "POST") == 0) // 比较请求方法是否为POST { m_method = POST; cgi = 1; // 如果是POST方法,开启CGI处理 } else return BAD_REQUEST; // 如果不是GET或POST,返回BAD_REQUEST m_url += strspn(m_url, " \t"); // 跳过URL前的空格或制表符 m_version = strpbrk(m_url, " \t"); // 查找URL后的空格或制表符,后面是HTTP版本 if (!m_version) return BAD_REQUEST; // 如果没有找到,返回BAD_REQUEST *m_version++ = '\0'; // 将空格或制表符替换为字符串结束符,分离出URL m_version += strspn(m_version, " \t"); // 跳过版本号前的空格或制表符 if (strcasecmp(m_version, "HTTP/1.1") != 0) // 检查是否为HTTP/1.1 return BAD_REQUEST; // 如果URL是以"http://"或"https://"开头,跳过协议部分 if (strncasecmp(m_url, "http://", 7) == 0) { m_url += 7; m_url = strchr(m_url, '/'); // 查找URL路径部分 } if (strncasecmp(m_url, "https://", 8) == 0) { m_url += 8; m_url = strchr(m_url, '/'); } if (!m_url || m_url[0] != '/') // 如果URL无效或不以'/'开头,返回BAD_REQUEST return BAD_REQUEST; if (strlen(m_url) == 1) // 如果URL为"/",显示默认页面 strcat(m_url, "judge.html"); m_check_state = CHECK_STATE_HEADER; // 切换状态到解析请求头 return NO_REQUEST;}// 解析HTTP请求的头部信息http_conn::HTTP_CODE http_conn::parse_headers(char *text){ if (text[0] == '\0') // 如果当前头部信息为空,表示解析完毕 { if (m_content_length != 0) // 如果有消息体,切换到解析消息体的状态 { m_check_state = CHECK_STATE_CONTENT; return NO_REQUEST; } return GET_REQUEST; // 如果没有消息体,说明请求已完整,返回GET_REQUEST } // 解析Connection头部,判断是否为长连接 else if (strncasecmp(text, "Connection:", 11) == 0) { text += 11; text += strspn(text, " \t"); if (strcasecmp(text, "keep-alive") == 0) { m_linger = true; // 如果是keep-alive,保持长连接 } } // 解析Content-Length头部,获取消息体的长度 else if (strncasecmp(text, "Content-length:", 15) == 0) { text += 15; text += strspn(text, " \t"); m_content_length = atol(text); // 将字符串转换为长整型,表示消息体长度 } // 解析Host头部,获取主机名 else if (strncasecmp(text, "Host:", 5) == 0) { text += 5; text += strspn(text, " \t"); m_host = text; // 保存主机名 } else { LOG_INFO("oop!unknow header: %s", text); // 记录未知的头部字段 } return NO_REQUEST; // 继续解析其他头部}// 解析HTTP请求的消息体http_conn::HTTP_CODE http_conn::parse_content(char *text){ if (m_read_idx >= (m_content_length + m_checked_idx)) // 检查是否完整读取了消息体 { text[m_content_length] = '\0'; // 将消息体以\0结束 m_string = text; // 将消息体存储起来,通常是POST请求的参数 return GET_REQUEST; // 消息体解析完成,返回GET_REQUEST } return NO_REQUEST; // 消息体还未解析完整,继续读取}// 主状态机处理入口,依次调用解析请求行、请求头、消息体的函数http_conn::HTTP_CODE http_conn::process_read(){ LINE_STATUS line_status = LINE_OK; // 当前行的解析状态 HTTP_CODE ret = NO_REQUEST; // HTTP请求的解析结果 char *text = 0; // 循环解析HTTP请求,直到完整解析或遇到错误 while ((m_check_state == CHECK_STATE_CONTENT && line_status == LINE_OK) || ((line_status = parse_line()) == LINE_OK)) { text = get_line(); // 获取解析到的一行数据 m_start_line = m_checked_idx; // 更新已解析的起始位置 LOG_INFO("%s", text); // 记录解析到的内容 switch (m_check_state) // 根据当前解析状态,处理不同部分 { case CHECK_STATE_REQUESTLINE: // 解析请求行 { ret = parse_request_line(text); // 调用parse_request_line()函数解析 if (ret == BAD_REQUEST) // 如果解析失败,返回错误 return BAD_REQUEST; break; } case CHECK_STATE_HEADER: // 解析请求头 { ret = parse_headers(text); // 调用parse_headers()函数解析 if (ret == BAD_REQUEST) // 如果解析失败,返回错误 return BAD_REQUEST; else if (ret == GET_REQUEST) // 如果请求完整,执行do_request() { return do_request(); } break; } case CHECK_STATE_CONTENT: // 解析消息体 { ret = parse_content(text); // 调用parse_content()函数解析 if (ret == GET_REQUEST) // 如果解析成功,执行do_request() return do_request(); line_status = LINE_OPEN; // 如果消息体不完整,继续等待数据 break; } default: return INTERNAL_ERROR; // 发生未知错误,返回服务器内部错误 } } return NO_REQUEST; // 如果还未解析完成,返回NO_REQUEST}// 处理HTTP请求,生成相应的响应http_conn::HTTP_CODE http_conn::do_request(){ strcpy(m_real_file, doc_root); // 将网站根目录复制到m_real_file中 int len = strlen(doc_root); // 获取根目录路径的长度 const char *p = strrchr(m_url, '/'); // 查找请求的最后一个'/',区分不同的URL // 如果是POST请求,且URL是登录或注册请求 if (cgi == 1 && (*(p + 1) == '2' || *(p + 1) == '3')) { // 根据请求的类型判断是登录还是注册 char flag = m_url[1]; char *m_url_real = (char *)malloc(sizeof(char) * 200); // 动态分配内存 strcpy(m_url_real, "/"); strcat(m_url_real, m_url + 2); // 构造实际文件路径 strncpy(m_real_file + len, m_url_real, FILENAME_LEN - len - 1); // 拼接完整路径 free(m_url_real); // 释放动态内存 // 解析POST请求的用户名和密码 char name[100], password[100]; int i; for (i = 5; m_string[i] != '&'; ++i) name[i - 5] = m_string[i]; // 提取用户名 name[i - 5] = '\0'; int j = 0; for (i = i + 10; m_string[i] != '\0'; ++i, ++j) password[j] = m_string[i]; // 提取密码 password[j] = '\0'; // 如果是注册请求 if (*(p + 1) == '3') { // 检查数据库中是否存在同名用户 char *sql_insert = (char *)malloc(sizeof(char) * 200); strcpy(sql_insert, "INSERT INTO user(username, passwd) VALUES("); strcat(sql_insert, "'"); strcat(sql_insert, name); strcat(sql_insert, "', '"); strcat(sql_insert, password); strcat(sql_insert, "')"); if (users.find(name) == users.end()) // 如果用户不存在,插入新用户 { m_lock.lock(); // 加锁,防止并发修改 int res = mysql_query(mysql, sql_insert); // 执行插入语句 users.insert(pair<string, string>(name, password)); // 更新内存中的用户表 m_lock.unlock(); // 解锁 if (!res) strcpy(m_url, "/log.html"); // 注册成功,跳转到登录页面 else strcpy(m_url, "/registerError.html"); // 注册失败,跳转到错误页面 } else strcpy(m_url, "/registerError.html"); // 用户已存在,返回错误页面 } // 如果是登录请求 else if (*(p + 1) == '2') { // 检查用户名和密码是否匹配 if (users.find(name) != users.end() && users[name] == password) strcpy(m_url, "/welcome.html"); // 登录成功,跳转到欢迎页面 else strcpy(m_url, "/logError.html"); // 登录失败,跳转到错误页面 } } // 根据URL后缀处理不同的页面请求 if (*(p + 1) == '0') { char *m_url_real = (char *)malloc(sizeof(char) * 200); strcpy(m_url_real, "/register.html"); // 注册页面 strncpy(m_real_file + len, m_url_real, strlen(m_url_real)); free(m_url_real); } else if (*(p + 1) == '1') { char *m_url_real = (char *)malloc(sizeof(char) * 200); strcpy(m_url_real, "/log.html"); // 登录页面 strncpy(m_real_file + len, m_url_real, strlen(m_url_real)); free(m_url_real); } else if (*(p + 1) == '5') { char *m_url_real = (char *)malloc(sizeof(char) * 200); strcpy(m_url_real, "/picture.html"); // 图片页面 strncpy(m_real_file + len, m_url_real, strlen(m_url_real)); free(m_url_real); } else if (*(p + 1) == '6') { char *m_url_real = (char *)malloc(sizeof(char) * 200); strcpy(m_url_real, "/video.html"); // 视频页面 strncpy(m_real_file + len, m_url_real, strlen(m_url_real)); free(m_url_real); } else if (*(p + 1) == '7') { char *m_url_real = (char *)malloc(sizeof(char) * 200); strcpy(m_url_real, "/fans.html"); // 粉丝页面 strncpy(m_real_file + len, m_url_real, strlen(m_url_real)); free(m_url_real); } else strncpy(m_real_file + len, m_url, FILENAME_LEN - len - 1); // 其他请求,拼接实际文件路径 // 检查文件是否存在 if (stat(m_real_file, &m_file_stat) < 0) return NO_RESOURCE; // 文件不存在,返回NO_RESOURCE // 检查文件是否有读取权限 if (!(m_file_stat.st_mode & S_IROTH)) return FORBIDDEN_REQUEST; // 没有权限,返回FORBIDDEN_REQUEST // 检查是否是目录 if (S_ISDIR(m_file_stat.st_mode)) return BAD_REQUEST; // 请求的是目录,返回BAD_REQUEST // 打开文件 int fd = open(m_real_file, O_RDONLY); m_file_address = (char *)mmap(0, m_file_stat.st_size, PROT_READ, MAP_PRIVATE, fd, 0); // 将文件映射到内存 close(fd); // 关闭文件描述符 return FILE_REQUEST; // 返回文件请求}// 解除内存映射void http_conn::unmap(){ if (m_file_address) { munmap(m_file_address, m_file_stat.st_size); // 解除文件的内存映射 m_file_address = 0; // 重置文件地址指针 }}// 向客户端写入HTTP响应bool http_conn::write(){ int temp = 0; if (bytes_to_send == 0) // 如果要发送的字节为0,表示响应已经发送完毕 { modfd(m_epollfd, m_sockfd, EPOLLIN); // 修改epoll事件为读事件,准备处理下一次请求 init(); // 重新初始化连接 return true; } // 循环发送响应数据,直到全部发送完成或遇到错误 while (1) { temp = writev(m_sockfd, m_iv, m_iv_count); // 使用writev函数将响应数据发送给客户端 if (temp < 0) // 发送过程中遇到错误 { // 如果错误是由于非阻塞写导致的缓冲区已满 if (errno == EAGAIN) { modfd(m_epollfd, m_sockfd, EPOLLOUT); // 重新注册写事件 return true; } unmap(); // 如果遇到其他错误,取消文件映射 return false; } bytes_have_send += temp; // 更新已经发送的字节数 bytes_to_send -= temp; // 更新剩余需要发送的字节数 // 如果已经发送完响应头部 if (bytes_have_send >= m_iv[0].iov_len) { m_iv[0].iov_len = 0; // 清空头部的iov结构体长度 m_iv[1].iov_base = m_file_address + (bytes_have_send - m_write_idx); // 设置发送文件的起始地址 m_iv[1].iov_len = bytes_to_send; // 更新剩余需要发送的文件长度 } else { m_iv[0].iov_base = m_write_buf + bytes_have_send; // 更新响应头部的发送位置 m_iv[0].iov_len -= temp; // 更新头部剩余需要发送的长度 } if (bytes_to_send <= 0) // 如果所有数据都已发送完成 { unmap(); // 取消文件映射 modfd(m_epollfd, m_sockfd, EPOLLIN); // 重新注册读事件 if (m_linger) // 如果是长连接 { init(); // 重新初始化连接,等待处理新的请求 return true; } else { return false; // 如果不是长连接,关闭连接 } } }}// 将HTTP响应生成并写入缓冲区bool http_conn::add_response(const char *format, ...){ if (m_write_idx >= WRITE_BUFFER_SIZE) // 如果写入的响应数据超出缓冲区大小,返回false { return false; } va_list arg_list; va_start(arg_list, format); // 开始可变参数处理 int len = vsnprintf(m_write_buf + m_write_idx, WRITE_BUFFER_SIZE - 1 - m_write_idx, format, arg_list); // 格式化输出到缓冲区 if (len >= (WRITE_BUFFER_SIZE - 1 - m_write_idx)) // 如果格式化后的数据超出缓冲区大小,返回false { va_end(arg_list); return false; } m_write_idx += len; // 更新缓冲区索引 va_end(arg_list); LOG_INFO("request:%s", m_write_buf); // 记录生成的响应 return true;}// 向响应中添加状态行bool http_conn::add_status_line(int status, const char *title){ return add_response("%s %d %s\r\n", "HTTP/1.1", status, title); // 将状态行写入响应中}// 向响应中添加头部信息bool http_conn::add_headers(int content_len){ add_content_length(content_len); // 添加Content-Length头部,指定响应内容长度 add_linger(); // 添加Connection头部,指定是否保持连接 add_blank_line(); // 添加空行,表示头部结束 return true;}// 向响应中添加Content-Length头部bool http_conn::add_content_length(int content_len){ return add_response("Content-Length:%d\r\n", content_len); // 写入Content-Length头部}// 向响应中添加Connection头部bool http_conn::add_linger(){ return add_response("Connection:%s\r\n", (m_linger == true) ? "keep-alive" : "close"); // 根据长连接状态写入Connection头部}// 向响应中添加空行bool http_conn::add_blank_line(){ return add_response("%s", "\r\n"); // 写入空行}// 向响应中添加实际内容bool http_conn::add_content(const char *content){ return add_response("%s", content); // 将内容写入响应}// 处理向客户端返回的完整响应bool http_conn::process_write(HTTP_CODE ret){ switch (ret) { case INTERNAL_ERROR: // 内部错误时的响应 { add_status_line(500, error_500_title); // 添加状态行,状态码500 add_headers(strlen(error_500_form)); // 添加响应头 if (!add_content(error_500_form)) // 添加错误内容 return false; break; } case BAD_REQUEST: // 错误请求时的响应 { add_status_line(400, error_400_title); // 添加状态行,状态码400 add_headers(strlen(error_400_form)); // 添加响应头 if (!add_content(error_400_form)) // 添加错误内容 return false; break; } case NO_RESOURCE: // 资源不存在时的响应 { add_status_line(404, error_404_title); // 添加状态行,状态码404 add_headers(strlen(error_404_form)); // 添加响应头 if (!add_content(error_404_form)) // 添加错误内容 return false; break; } case FORBIDDEN_REQUEST: // 没有权限访问时的响应 { add_status_line(403, error_403_title); // 添加状态行,状态码403 add_headers(strlen(error_403_form)); // 添加响应头 if (!add_content(error_403_form)) // 添加错误内容 return false; break; } case FILE_REQUEST: // 正常的文件请求 { add_status_line(200, ok_200_title); // 添加状态行,状态码200 if (m_file_stat.st_size != 0) // 如果请求的文件不为空 { add_headers(m_file_stat.st_size); // 添加响应头,指定内容长度为文件大小 m_iv[0].iov_base = m_write_buf; // 设置第一块内存区域为响应头部 m_iv[0].iov_len = m_write_idx; m_iv[1].iov_base = m_file_address; // 设置第二块内存区域为文件内容 m_iv[1].iov_len = m_file_stat.st_size; m_iv_count = 2; bytes_to_send = m_write_idx + m_file_stat.st_size; // 更新需要发送的总字节数 return true; } else { const char *ok_string = "<html><body></body></html>"; // 如果文件为空,返回一个简单的HTML页面 add_headers(strlen(ok_string)); // 添加响应头 if (!add_content(ok_string)) // 添加空页面的内容 return false; } } default: return false; } m_iv[0].iov_base = m_write_buf; // 设置第一块内存区域为响应头部 m_iv[0].iov_len = m_write_idx; m_iv_count = 1; bytes_to_send = m_write_idx; // 更新需要发送的字节数 return true;}// 主逻辑函数,负责处理HTTP请求并生成响应void http_conn::process(){ HTTP_CODE read_ret = process_read(); // 调用process_read解析HTTP请求 if (read_ret == NO_REQUEST) // 如果请求不完整,继续监听 { modfd(m_epollfd, m_sockfd, EPOLLIN); return; } bool write_ret = process_write(read_ret); // 生成响应 if (!write_ret) { close_conn(); // 如果生成响应失败,关闭连接 } modfd(m_epollfd, m_sockfd, EPOLLOUT); // 修改epoll事件为写事件,准备发送响应}
代码详细解析版:
7.1 主从状态机
主从状态机(Master-Slave State Machine)是一种设计模式,常用于复杂协议的实现,比如HTTP服务器的请求处理。在HTTP服务器中,主从状态机的设计能够将请求解析和状态管理分离,简化代码逻辑,提高代码的可维护性和扩展性。
主从状态机的概念与作用
主状态机(Master State Machine):
作用:主状态机负责控制整个HTTP请求处理的流程和总体逻辑。它通过调用从状态机来逐步解析HTTP请求的不同部分(如请求行、请求头和消息体),并根据从状态机的返回结果决定下一步的操作。工作过程:主状态机从接收到请求开始,根据当前的解析状态(如CHECK_STATE_REQUESTLINE
、CHECK_STATE_HEADER
、CHECK_STATE_CONTENT
)依次调用从状态机的函数,进行具体的数据解析。当从状态机完成某部分的解析后,主状态机根据返回的状态决定接下来应该做什么,比如继续读取、生成响应、或关闭连接。 从状态机(Slave State Machine):
作用:从状态机负责对HTTP请求的具体部分进行解析和处理,如解析请求行、请求头部和消息体。它的主要任务是解析数据并将处理结果返回给主状态机,协助主状态机进行状态转移。工作过程:从状态机每次只解析一部分数据(如一行或一段),并通过状态更新的方式将结果传递给主状态机。每次调用从状态机的函数时,它都会返回一个处理状态,如LINE_OK
(解析一行成功)、LINE_BAD
(解析错误)或LINE_OPEN
(数据不完整,需要继续读取)。这些状态结果将指导主状态机的下一步操作。 主从状态机的设计原因
使用主从状态机的设计模式有以下几个优点:
分离关注点(Separation of Concerns):
将复杂的HTTP请求处理过程分解为多个小的、独立的状态机模块。主状态机专注于控制流程和状态转换,从状态机专注于具体的数据解析。这样分离关注点的设计方式能够简化代码的逻辑结构,使其更容易理解和维护。提高代码的可扩展性(Extensibility):
主状态机和从状态机是解耦的。从状态机只需要专注于数据解析,不需要了解主状态机如何做出决策。这样,如果需要添加新的状态或扩展现有的解析逻辑,可以独立修改从状态机的实现,而不需要大幅度更改主状态机的代码。简化错误处理:
在HTTP请求的解析过程中,可能会出现各种错误(如请求格式不正确、头部不合法等)。主从状态机的设计能够将错误的处理职责集中在一个地方(从状态机),并通过状态返回的方式通知主状态机进行相应的处理。这样就避免了错误处理代码散布在各个地方,降低了代码的复杂性。提高系统的响应性能(Responsiveness):
主从状态机的设计方式能够及时响应客户端的请求。主状态机能够在接收到从状态机的状态结果后,立即做出响应或决定是继续读取数据,这样就避免了阻塞式的等待,使系统能够快速处理多个并发请求。主从状态机在代码中的体现
根据代码片段,主从状态机的设计如下:
主状态机:
在http_conn::process_read()
函数中体现。这个函数是处理HTTP请求的主逻辑,它根据当前的解析状态(m_check_state
)决定调用哪个从状态机函数来处理请求的哪一部分。它的核心逻辑如下:
while ((m_check_state == CHECK_STATE_CONTENT && line_status == LINE_OK) || ((line_status = parse_line()) == LINE_OK)){ text = get_line(); // 获取要解析的一行数据 switch (m_check_state) // 根据当前解析状态,处理不同部分 { case CHECK_STATE_REQUESTLINE: // 解析请求行 ret = parse_request_line(text); // 调用从状态机解析请求行 // 根据解析结果调整主状态机状态 break; case CHECK_STATE_HEADER: // 解析请求头 ret = parse_headers(text); // 调用从状态机解析请求头 // 根据解析结果调整主状态机状态 break; case CHECK_STATE_CONTENT: // 解析消息体 ret = parse_content(text); // 调用从状态机解析消息体 // 根据解析结果调整主状态机状态 break; default: return INTERNAL_ERROR; // 处理未知错误 }}
逻辑流程:主状态机依次检查解析状态(如请求行、请求头、消息体),并调用相应的从状态机函数来解析数据。如果从状态机返回LINE_OK
,主状态机继续解析下一个部分;如果从状态机返回BAD_REQUEST
等错误状态,主状态机则返回错误并生成相应的HTTP错误响应。 从状态机:
在parse_request_line()
、parse_headers()
、parse_content()
等函数中体现。这些函数是从状态机的具体实现,负责对HTTP请求的不同部分进行具体的解析和处理。
parse_request_line()
函数:解析请求行,如“GET /index.html HTTP/1.1”。parse_headers()
函数:解析请求头部,如“Content-Length: 1024”。parse_content()
函数:解析请求的消息体,如POST请求的数据。 为什么这样设计?
清晰的职责划分:
主状态机负责流程控制和状态管理,从状态机负责具体的数据解析,明确的职责划分使代码逻辑清晰,易于维护和扩展。
灵活应对不同的请求:
主从状态机的设计使得服务器能够灵活应对各种HTTP请求类型(如GET、POST等)和不同的错误情况,动态调整解析和响应策略。
高效的请求处理:
使用主从状态机模型,可以有效避免重复解析和不必要的等待,使得每次解析和处理尽可能快地完成,提升系统的响应速度和性能。
良好的扩展性和可维护性:
这种设计模式下,如果需要支持新的HTTP方法或者新的解析逻辑,只需对相应的从状态机函数进行扩展或修改,而不需要大幅度调整主状态机的逻辑,极大提高了代码的可维护性和扩展性。
总之,主从状态机模式在HTTP服务器中的应用,通过分离控制逻辑和数据解析逻辑,使代码结构更加清晰、模块化,易于维护和扩展,并且在应对复杂协议时显得更加高效。
这段代码中的 文件描述符 (File Descriptor, FD) 是一个整数,表示一个打开的文件或设备的句柄。在Linux或类Unix系统中,几乎所有的I/O操作(例如文件、网络套接字等)都是通过文件描述符来操作的。它不仅仅指文件,还可以代表任何类型的I/O资源,包括文件、网络连接(套接字)、管道等。
7.2 设置为非阻塞模式的意义
setnonblocking(int fd)
函数的作用是将文件描述符设置为非阻塞模式。在非阻塞模式下,I/O操作(如读取数据read
或写入数据write
)不会导致程序等待数据的到来或空间的可用性。
阻塞模式(默认模式)
在默认的 阻塞模式 下,I/O 操作会使程序进入“等待”状态,直到数据可以被读取或写入。例如:
如果你通过套接字调用read(fd, buf, len)
来读取数据,而套接字的接收缓冲区中没有数据,那么程序会被阻塞,等待数据到来。也就是说,在数据被完全接收到之前,程序的执行会暂停在这一行,无法继续执行其他操作。同样,write(fd, buf, len)
写入数据时,如果缓冲区已满,程序会阻塞,直到系统能够将数据写入缓冲区。 阻塞模式的好处是逻辑简单,程序可以假定I/O操作是同步的。但缺点是如果等待时间太长,程序可能会浪费大量时间,无法处理其他任务。
非阻塞模式
在 非阻塞模式 下,I/O 操作会立即返回,而不会让程序等待。例如:
调用read(fd, buf, len)
时,如果没有数据可以读取,它会立即返回 -1
,并设置 errno
为 EAGAIN
或 EWOULDBLOCK
,表示此时没有数据可读取。同样地,如果 write(fd, buf, len)
不能立即将数据写入缓冲区,它也会立即返回,而不是阻塞。 非阻塞模式的好处是程序不会因为等待I/O而挂起,服务器可以继续处理其他任务。例如,在高并发的网络服务器中,服务器需要同时处理多个客户端连接,非阻塞模式允许服务器同时处理多个连接,而不是等待某个连接的 I/O 完成。
为什么要设置非阻塞模式?
非阻塞模式通常用于实现高效的、响应迅速的服务器程序,特别是在需要同时处理多个连接或多个任务时。非阻塞模式的常见使用场景包括:
事件驱动模型(如epoll、select、poll等):
在服务器编程中,通常会使用某种 I/O多路复用机制(如epoll
、select
、poll
)来同时监听多个文件描述符(即多个客户端连接)。如果文件描述符处于阻塞模式,服务器在处理某个客户端连接时,可能会被长时间阻塞,无法处理其他连接。通过设置非阻塞模式,服务器可以通过轮询多个文件描述符来处理不同的I/O事件,而不必被单个文件描述符的阻塞操作所影响。 提高并发性:
现代网络服务器为了支持高并发,通常会使用非阻塞套接字结合多路复用机制,这样可以避免创建大量线程(每个线程处理一个客户端),从而提高性能。例如,在非阻塞模式下,当read
函数读取不到数据时,它会立即返回,服务器可以继续处理其他任务,而不是等待该客户端发送数据。 提高响应性:
非阻塞I/O可以让程序在I/O操作无法立即完成时不必等待,而是去做其他有意义的工作。对于高并发的应用,如Web服务器,这种模式非常重要。阻塞模式 vs 非阻塞模式
阻塞模式 | 非阻塞模式 |
---|---|
I/O 操作会等待完成才返回。 | I/O 操作立即返回,若无法完成则返回错误。 |
适合简单的、顺序执行的程序。 | 适合高并发、需要快速响应的程序。 |
在高并发场景下,容易导致程序被阻塞。 | 能够在高并发场景下避免阻塞,提高性能。 |
逻辑简单,程序执行顺序容易理解。 | 需要处理更多的错误和异常情况,逻辑较为复杂。 |
总结
将文件描述符设置为 非阻塞模式 是为了避免在处理I/O操作时程序被挂起等待,从而提高并发处理能力。阻塞模式 简单直观,但在高并发环境下效率较低;非阻塞模式 适合需要处理多个任务或多个连接的场景,特别是在服务器编程中,通过配合事件驱动的I/O多路复用机制,非阻塞I/O可以极大提高系统性能和响应速度。8. sql_connection.cpp
完整注释版:
这段代码实现了一个数据库连接池,通过管理多个 MySQL 连接,可以高效处理多个数据库请求。
#include <mysql/mysql.h> // 包含 MySQL 库,用于和 MySQL 数据库交互#include <stdio.h> // 标准输入输出库#include <string> // C++ 字符串库#include <string.h> // C 字符串处理库#include <stdlib.h> // 标准库,包含内存分配、进程控制等#include <list> // C++ STL 的 list 容器,用于存储 MySQL 连接#include <pthread.h> // 线程库,用于多线程同步#include <iostream> // C++ 标准输入输出流#include "sql_connection_pool.h" // 连接池头文件using namespace std; // 使用标准命名空间// 构造函数,初始化成员变量connection_pool::connection_pool(){m_CurConn = 0; // 当前已使用的连接数m_FreeConn = 0; // 当前空闲的连接数}// 获取连接池实例(单例模式)避免重复创建连接池对象,进而管理和复用 MySQL 连接。connection_pool *connection_pool::GetInstance(){static connection_pool connPool; // 静态实例,保证全局唯一return &connPool; // 返回连接池的指针}// 构造初始化函数,设置连接池的相关配置void connection_pool::init(string url, string User, string PassWord, string DBName, int Port, int MaxConn, int close_log){// 保存数据库连接的参数m_url = url; // 数据库主机地址m_Port = Port; // 端口号m_User = User; // 数据库用户名m_PassWord = PassWord; // 数据库密码m_DatabaseName = DBName; // 数据库名m_close_log = close_log; // 是否关闭日志// 创建最大连接数个 MySQL 连接并加入连接池for (int i = 0; i < MaxConn; i++){MYSQL *con = NULL;con = mysql_init(con); // 初始化 MySQL 连接if (con == NULL) // 如果初始化失败,打印错误日志并退出程序{LOG_ERROR("MySQL Error");exit(1);}// 连接到 MySQL 数据库con = mysql_real_connect(con, url.c_str(), User.c_str(), PassWord.c_str(), DBName.c_str(), Port, NULL, 0);if (con == NULL) // 如果连接失败,打印错误日志并退出程序{LOG_ERROR("MySQL Error");exit(1);}// 将成功连接的 MySQL 连接加入连接池列表connList.push_back(con);++m_FreeConn; // 增加空闲连接数}// 初始化信号量,用于控制连接的获取和释放reserve = sem(m_FreeConn); m_MaxConn = m_FreeConn; // 设置最大连接数}// 从连接池中获取一个可用的 MySQL 连接MYSQL *connection_pool::GetConnection(){MYSQL *con = NULL;if (0 == connList.size()) // 如果连接池为空,返回 NULLreturn NULL;reserve.wait(); // 等待信号量,如果没有空闲连接则等待lock.lock(); // 加锁,防止多线程同时操作连接池// 从连接池中取出一个连接con = connList.front(); connList.pop_front(); // 移除最前面的连接--m_FreeConn; // 空闲连接数减少++m_CurConn; // 使用中的连接数增加lock.unlock(); // 解锁return con; // 返回获取的连接}// 释放当前使用的 MySQL 连接,将其放回连接池bool connection_pool::ReleaseConnection(MYSQL *con){if (NULL == con) // 如果连接为空,返回 falsereturn false;lock.lock(); // 加锁,防止多线程同时操作连接池// 将连接放回连接池connList.push_back(con);++m_FreeConn; // 空闲连接数增加--m_CurConn; // 使用中的连接数减少lock.unlock(); // 解锁reserve.post(); // 释放信号量,通知等待的线程有连接可用return true;}// 销毁连接池,关闭所有 MySQL 连接void connection_pool::DestroyPool(){lock.lock(); // 加锁,防止其他线程操作连接池if (connList.size() > 0){// 遍历连接池中的所有连接,逐一关闭list<MYSQL *>::iterator it;for (it = connList.begin(); it != connList.end(); ++it){MYSQL *con = *it;mysql_close(con); // 关闭 MySQL 连接}m_CurConn = 0; // 重置已使用连接数m_FreeConn = 0; // 重置空闲连接数connList.clear(); // 清空连接池列表}lock.unlock(); // 解锁}// 返回当前空闲的连接数int connection_pool::GetFreeConn(){return this->m_FreeConn; // 返回空闲连接数}// 析构函数,销毁连接池,释放所有连接connection_pool::~connection_pool(){DestroyPool(); // 调用销毁连接池的函数}// RAII 机制,自动管理 MySQL 连接的获取和释放connectionRAII::connectionRAII(MYSQL **SQL, connection_pool *connPool){*SQL = connPool->GetConnection(); // 获取一个数据库连接并赋值给传入的指针conRAII = *SQL; // 保存获取到的连接poolRAII = connPool; // 保存连接池的引用}// 析构函数,自动释放连接回连接池connectionRAII::~connectionRAII(){poolRAII->ReleaseConnection(conRAII); // 将连接放回连接池}
代码解释
连接池概念:
连接池 是一种资源池技术,它维护了一组连接(例如数据库连接),避免在每次需要访问数据库时重新建立连接,从而提高系统的效率。连接池的作用是管理多个数据库连接,减少重复创建和销毁连接的开销。主要类和函数:
connection_pool
:这是连接池类,用来管理 MySQL 数据库连接。init()
:用于初始化连接池,创建并维护一组 MySQL 连接。GetConnection()
:用于获取一个空闲的 MySQL 连接。ReleaseConnection()
:将使用完的连接释放回连接池。DestroyPool()
:销毁连接池,关闭所有 MySQL 连接。connectionRAII
:使用 RAII 技术自动管理 MySQL 连接的获取和释放,避免手动管理。 线程安全:
代码使用了**锁(lock
)和信号量(sem
)**来保证多线程环境下的线程安全,确保不会同时有多个线程操作连接池。 RAII 模式:
RAII (Resource Acquisition Is Initialization) 是一种资源管理的惯用手法,connectionRAII
类通过构造函数获取连接,并在析构函数中自动释放连接,简化了资源管理,防止资源泄漏。 代码详细解析版:
8.1 关于CGI
cgimysql
是一个用来在服务器端通过 CGI (Common Gateway Interface) 脚本来与 MySQL 数据库进行交互的程序或模块。
在传统的 web 开发中,CGI 是一种常用的技术,用于处理用户的 HTTP 请求,并生成动态的 web 页面。通过 CGI 脚本,服务器能够执行外部程序(如 Python、Perl、C 或 C++ 程序),将用户请求的数据传递给这些程序,然后将程序的输出返回给客户端。cgimysql
就是一个通过 CGI 脚本与 MySQL 数据库进行交互的示例或工具。
cgimysql
的使用场景
CGI 脚本的作用:CGI 脚本通常用于处理客户端的输入(如表单提交),然后根据输入执行特定的操作。与 MySQL 的结合使用,可以让 CGI 脚本连接到数据库,执行 SQL 查询(如插入、更新、删除或选择操作),并将查询结果返回给客户端。例如,用户在网页表单中提交查询条件,CGI 脚本将条件发送到 MySQL 数据库,查询后返回结果。
实现动态网页:cgimysql
可以用来创建动态网页内容。例如,展示数据库中的用户信息列表,或者处理用户登录请求。CGI 脚本读取用户输入,将其传递给 MySQL 数据库,获取查询结果,并将结果格式化为 HTML 页面返回给客户端。
cgimysql
的典型结构
一个典型的 cgimysql
脚本通常包括以下几个步骤:
解析用户输入:CGI 脚本从客户端接收到的请求中提取数据,例如从 URL 的查询参数或 HTTP POST 请求体中获取用户输入的内容。
连接 MySQL 数据库:使用一个 MySQL 客户端库(如 MySQL C API、MySQL++ 或者第三方的 MySQL 库)来建立与 MySQL 数据库的连接。
执行 SQL 查询:根据用户输入的数据,构造 SQL 语句,并通过连接执行查询(如 SELECT
、INSERT
、UPDATE
、DELETE
等操作)。
处理查询结果:将查询结果格式化为客户端可以理解的形式(例如,将结果转换为 HTML 表格),然后输出给客户端。
关闭数据库连接:在处理完成后,关闭与数据库的连接,释放资源。
示例:一个简单的 cgimysql
脚本
以下是一个简单的 CGI 脚本,用 C++ 编写,通过 MySQL C API 连接到数据库并执行查询:
#include <mysql/mysql.h>#include <stdio.h>#include <stdlib.h>int main() { // 输出 HTTP 响应头 printf("Content-type: text/html\n\n"); // 初始化 MySQL MYSQL *conn; MYSQL_RES *res; MYSQL_ROW row; conn = mysql_init(NULL); if (conn == NULL) { printf("mysql_init() failed\n"); return EXIT_FAILURE; } // 连接 MySQL 数据库 if (mysql_real_connect(conn, "localhost", "username", "password", "database_name", 0, NULL, 0) == NULL) { printf("mysql_real_connect() failed\n"); mysql_close(conn); return EXIT_FAILURE; } // 执行 SQL 查询 if (mysql_query(conn, "SELECT * FROM users")) { printf("SELECT * FROM users failed. Error: %s\n", mysql_error(conn)); mysql_close(conn); return EXIT_FAILURE; } res = mysql_store_result(conn); if (res == NULL) { printf("mysql_store_result() failed. Error: %s\n", mysql_error(conn)); mysql_close(conn); return EXIT_FAILURE; } // 处理结果并输出 HTML printf("<html><body><table border=\"1\">\n"); while ((row = mysql_fetch_row(res)) != NULL) { printf("<tr>"); for (int i = 0; i < mysql_num_fields(res); i++) { printf("<td>%s</td>", row[i] ? row[i] : "NULL"); } printf("</tr>\n"); } printf("</table></body></html>\n"); // 清理并关闭 mysql_free_result(res); mysql_close(conn); return EXIT_SUCCESS;}
cgimysql
的意义
实现动态内容生成:cgimysql
使得服务器端可以根据用户请求的动态数据生成内容,从而实现个性化和交互式的网页。与数据库交互:可以处理各种数据库操作,如用户注册、登录、数据查询等。适用于简单场景:虽然 CGI 技术比较老旧,且在性能上不如现代的 Web 框架(如 Django、Flask、Node.js),但它简单直接,适用于一些小型或简单的 Web 应用程序。 总结
cgimysql
是一个基于 CGI 技术来与 MySQL 数据库进行交互的程序或脚本,通过这种方式,可以让 Web 服务器在接收到客户端请求时,动态生成内容,响应客户端。虽然这种方法较为基础,但在理解和构建简单 Web 应用时依然有一定的学习和参考价值。
RALL机制
RAII(Resource Acquisition Is Initialization) 机制是一种资源管理的编程习惯,主要用于在对象的生命周期中自动管理资源的分配和释放。RAII的核心思想是:资源的获取与对象的生命周期绑定,资源的释放在对象销毁时自动进行。这在 C++ 中尤为常见,常用于管理动态内存、文件句柄、锁、数据库连接等资源。
RAII 的核心思想:
资源获取即初始化:资源在对象创建时分配(如文件打开、内存分配、数据库连接等)。自动释放:当对象的生命周期结束时(如超出作用域或被显式删除),资源会在对象的析构函数中自动释放,避免了资源泄露。代码解析:
connectionRAII::connectionRAII(MYSQL **SQL, connection_pool *connPool) { *SQL = connPool->GetConnection(); // 获取一个数据库连接并赋值给传入的指针 conRAII = *SQL; // 保存获取到的连接 poolRAII = connPool; // 保存连接池的引用}
代码解释:
构造函数 connectionRAII
:
MYSQL **SQL
:这是一个指向 MySQL 连接的指针,传递给该函数的目的是接收一个有效的数据库连接。connection_pool *connPool
:这是一个指向连接池的指针,用于从连接池中获取数据库连接。 *SQL = connPool->GetConnection();
:
connPool->GetConnection()
从连接池中获取一个 MySQL 连接,并将其赋值给 *SQL
。这样,调用者可以通过 SQL
访问这个连接。 conRAII = *SQL;
:
conRAII
,这是一个私有成员变量,目的是在对象的整个生命周期内维护这个连接。 poolRAII = connPool;
:
poolRAII
,也是一个私有成员变量,确保在对象存在期间保持对连接池的引用。 RAII 的优点在这个上下文中的应用:
自动管理资源:通过 RAII 机制,connectionRAII
对象会在构造时获取 MySQL 连接,并在对象的析构时自动释放该连接,避免了手动释放的复杂性。
防止资源泄漏:如果程序异常退出或忘记手动释放资源,RAII 会在对象销毁时自动调用析构函数,确保资源被正确释放,从而防止资源泄漏。
安全性与简化代码:代码变得更简单且易于维护,因为分配和释放都被自动处理,减少了错误的可能性。
在析构函数中的典型实现:
connectionRAII::~connectionRAII() { poolRAII->ReleaseConnection(conRAII); // 自动释放连接回连接池}
在析构函数中,ReleaseConnection()
被调用,将连接释放回连接池,这确保了每个通过 RAII 获取的连接在其对象生命周期结束时都能正确释放。 总结:
RAII 机制在这个代码中主要用于自动管理 MySQL 连接的获取和释放,确保连接的安全、有效使用并减少手动管理的复杂性和出错的风险。通过绑定资源的获取和释放到对象的生命周期,RAII 提供了一种简单而安全的资源管理方式。
相关疑问总结
1. 为什么网络编程需要套接字(Socket)
套接字(Socket)是计算机网络编程中的基础概念和工具,它的作用和必要性可以从以下几个方面理解:
1.1通信抽象
统一的接口:套接字提供了一个统一的接口,使程序员能够通过相同的方式进行网络通信,无论底层使用的是哪种协议(例如TCP、UDP)。这就像是一种抽象层,屏蔽了底层实现的复杂性。跨平台:套接字在不同操作系统上表现一致,提供了跨平台的通信能力,使开发者能够编写具有良好可移植性的网络应用程序。1.2 网络通信的基础
网络通信的端点:在网络通信中,套接字扮演的是“通信端点”的角色。任何网络通信都是在两个端点(一个客户端和一个服务器端)之间进行的。套接字就是这个端点,它代表了一个IP地址和端口的组合。支持多种协议:套接字不仅仅支持TCP(面向连接的通信),还支持UDP(无连接的通信)等协议,能够满足不同类型的网络通信需求。1.3 数据传输的机制
数据收发:套接字提供了发送(send)和接收(recv)数据的机制,通过这些函数,程序可以在网络中传输数据。这是实现网络功能的核心部分。流控制和连接管理:对于TCP套接字,套接字还提供了连接的管理(例如监听、接受连接)以及流控制等功能,使得数据能够可靠地传输。1.4 操作系统的支持
操作系统接口:在操作系统中,套接字是与操作系统网络栈交互的接口。通过套接字,应用程序可以与操作系统内核进行通信,进而通过网络适配器与外部世界通信。资源管理:套接字作为一种系统资源,由操作系统管理,能够确保资源的合理分配和回收。这避免了网络资源的浪费和冲突。1.5 总结
套接字在网络编程中是不可或缺的,因为它提供了网络通信的基础设施和统一的接口,使得复杂的网络操作变得可管理和可操作。通过套接字,开发者能够构建出跨平台、可扩展的网络应用程序。没有套接字,程序将无法直接与网络进行通信,网络编程也就无从谈起。
2 epoll是什么
epoll
是 Linux 内核提供的一种高效的 I/O 多路复用机制,用于监控多个文件描述符,以便在这些文件描述符上发生事件时通知应用程序进行相应处理。相比于传统的 select
和 poll
,epoll
在处理大量文件描述符时表现更为高效,特别是在高并发场景下。
2.1 epoll
的主要特点:
高效性:
epoll
使用的是基于事件通知的机制,只有发生事件的文件描述符才会被返回,因此在大量文件描述符中只有少数有事件发生时,epoll
的性能优势显著。epoll
在内核空间维护了一个事件表,避免了每次调用都要传递整个文件描述符集合,减少了内核与用户态之间的数据拷贝。 水平触发和边缘触发:
水平触发(Level-triggered, LT):默认模式,只要某个文件描述符上有事件发生,epoll_wait
就会返回该文件描述符,直到事件被处理。边缘触发(Edge-triggered, ET):更为高效,但要求更细致的处理。当文件描述符状态从无事件变为有事件时才会通知,适用于减少系统调用频率,提高程序效率。 对文件描述符数量的支持:
epoll
能够支持大规模的文件描述符集合,理论上上限是系统的最大文件描述符数,而 select
和 poll
通常有较小的文件描述符限制。 2.2 epoll
的工作流程:
创建 epoll
实例:
epoll_create
或 epoll_create1
函数创建一个 epoll
实例,返回一个 epoll
文件描述符。 注册事件:
使用epoll_ctl
函数将需要监控的文件描述符添加到 epoll
实例中,并指定要监听的事件类型(如可读、可写、异常等)。 等待事件发生:
使用epoll_wait
函数等待事件的发生,当某个或多个文件描述符上的事件满足条件时,epoll_wait
会返回这些文件描述符。 处理事件:
处理返回的事件,执行相应的读写操作,或根据应用程序逻辑进行其他处理。使用场景:
epoll
特别适合用于高并发的网络服务器中,比如 Web 服务器、聊天服务器等。这些应用通常需要处理大量并发连接,并且每个连接可能频繁进行 I/O 操作。epoll
能够有效地提升这些应用的性能。
总之,epoll
是在 Linux 环境下构建高性能网络服务器的重要工具,它通过高效的事件通知机制帮助开发者更好地管理大量并发 I/O 操作。
2.3 为什么在项目实现中,总是将类的定义(包括成员变量和成员函数的声明)放在头文件(.h文件)中,而将成员函数的实现放在源文件(.cpp文件)中?
在C++编程中,通常将类的定义(包括成员变量和成员函数的声明)放在头文件(.h
文件)中,而将成员函数的实现放在源文件(.cpp
文件)中。这种做法有几个重要的原因:
代码分离与清晰度:
头文件:主要用于定义类的接口(即类的定义),包括类的成员函数声明和成员变量。这使得头文件的内容比较简洁,便于其他开发者了解类的结构和使用方法,而不需要关注具体的实现细节。源文件:用于实现类的具体行为,即成员函数的实现。这种分离使得代码更容易维护和阅读,因为实现细节被隐藏在源文件中,头文件仅展示类的接口。编译时间优化:
当类的实现放在.cpp
文件中时,只有在实现发生变化时才需要重新编译 .cpp
文件。如果类的实现都放在 .h
文件中,那么每次该头文件发生变化时,所有包含该头文件的文件都需要重新编译,这可能会大幅增加编译时间。 信息隐藏与封装性:
将实现细节隐藏在.cpp
文件中可以更好地实现信息隐藏(encapsulation),这是一种面向对象编程的关键原则。外部代码只需要了解类的接口(即头文件中的内容),而不需要知道类是如何实现的。这也有助于保护类的实现不被意外修改或依赖。 防止重复定义:
如果将类的定义和实现都放在头文件中,那么在多个源文件中包含这个头文件时,可能会导致重复定义的问题。而将实现放在.cpp
文件中,每个源文件只会包含一次相应的实现,从而避免了这个问题。 3 epoll和线程池之间的关系
你提到的很有道理,主线程通过多路复用机制(如epoll
)来接收任务,而工作线程从队列中取任务并执行。要回答这个问题,我们需要理解 epoll
和 线程池 在系统中的不同作用,它们分别解决不同层次的问题。
3.1 epoll
的作用
epoll
是 Linux 提供的高效 I/O 事件通知机制,主要用于解决多路复用的问题。它让主线程能够高效地监视多个文件描述符(如 socket 连接),并在这些文件描述符的状态发生变化时(如数据可读、可写)通知应用程序。因此,epoll
的核心功能是:
在高并发的网络服务器中,epoll
极大地提升了主线程处理 I/O 请求的效率,因为它可以让主线程高效地接收并管理大量的 I/O 事件。
3.2 线程池的作用
线程池的作用是并行处理任务。当主线程(例如通过 epoll
)接收到一个 I/O 事件后,通常只是将任务放入队列,而不直接处理这个任务。工作线程池从队列中取出任务并执行,这样主线程可以继续监视其他 I/O 事件,而工作线程处理具体的业务逻辑。
线程池的优势包括:
任务的并行执行:允许多个工作线程并行处理任务,避免任务的串行执行,提升系统的吞吐量。资源复用:线程池中的线程是复用的,减少了频繁创建和销毁线程的开销。解耦主线程与工作线程:主线程负责事件的接收和任务的分发,工作线程负责任务的实际执行。3.3 epoll
和 线程池的协同工作
epoll
和线程池解决的是两个不同层面的问题:
epoll
负责事件的管理,让主线程能够高效地接收大量的 I/O 事件。线程池负责任务的处理,当主线程从 epoll
中得到事件后,往往只是将该事件或任务分发到任务队列,然后让工作线程从队列中取任务并处理。 这两者的协同工作是高效服务器架构的典型模式:
主线程通过epoll
来管理多个 socket 连接或文件描述符,监听事件的变化。一旦有事件发生,主线程将该事件(或任务)放入队列。工作线程池从队列中取出任务并处理,执行具体的 I/O 处理或业务逻辑。 两者的联系
如果你已经实现了主线程通过多路复用机制(如 epoll
)接收任务,而工作线程通过线程池取任务并执行,那 epoll
和线程池是各司其职,解决不同的问题。
epoll
用于高效管理 I/O 事件,让主线程能够同时处理大量连接。线程池 则用于并发处理任务,让多个任务可以并行执行,避免阻塞主线程。 所以,尽管主线程已经通过 epoll
实现了多路复用的任务接收机制,线程池仍然是必需的,它提供了并发处理任务的能力,使得任务的执行不会阻塞主线程,且能高效处理大量任务。
不使用 epoll
的问题
如果没有 epoll
,而是依赖传统的阻塞 I/O 模式,主线程将需要分别处理每一个连接的 I/O 操作,这会造成主线程的低效,并且无法处理大量的并发连接。
3. 复现过程中遇到的问题
3.1 解决“E: 无法定位软件包 mysql-workbench-community”问题
用这个指令:
sudo apt install mysql-workbench-community
会报错“E: 无法定位软件包 mysql-workbench-community”问题
解决方法为改用这个指令:
apt-get install mysql-workbench
成功:
分析下可能的原因:使用 mysql-workbench 是因为它在 Ubuntu 默认的软件源中,而 mysql-workbench-community 需要从 MySQL 官方仓库中获取。如果没有配置 MySQL 官方仓库,系统会找不到 mysql-workbench-community 包,导致错误信息的出现。
3.2 解决"正在设定ttf-mscorefonts-installer"
这里如果直接关了会导致后续包安装时会出现非法占用
解决方案:
按tab将光标移动到确定键上 然后回车就完事了