当前位置:首页 » 《随便一记》 » 正文

zlMediaKit 2 event-poller模块--reactor+管道回调执行异步任务队列+红黑树执行定时任务

20 人参与  2022年11月07日 14:33  分类 : 《随便一记》  评论

点击全文阅读


EventPoller.h

WorkThreadPool.h

EventPoller

UML

在这里插入图片描述

继承自TaskExecutor类,最重要的是实现async方法

结构

    std::thread::id _loop_thread_id //执行事件循环的线程id//async相关//内部事件管道    PipeWrap _pipe;    //从其他线程切换过来的任务    std::mutex _mtx_task;    List<Task::Ptr> _list_task;//epoll相关    int _epoll_fd = -1;    unordered_map<int, std::shared_ptr<PollEventCB> > _event_map;//定时器相关    std::multimap<uint64_t, DelayTask::Ptr> _delay_task_map;

async/async_l

同步任务&&本对象的轮询线程调用 直接执行

异步任务 加入poller管理的任务队列,写入管道通知主线程

Task::Ptr EventPoller::async_l(TaskIn task, bool may_sync, bool first) {    TimeTicker();    if (may_sync && isCurrentThread()) {        task();        return nullptr;    }    auto ret = std::make_shared<Task>(std::move(task));    {        lock_guard<mutex> lck(_mtx_task);        if (first) {            _list_task.emplace_front(ret);        } else {            _list_task.emplace_back(ret);        }    }    //写数据到管道,唤醒主线程    _pipe.write("", 1);    return ret;}

构造函数

自己维护一个红黑树,初始化时就加入对**管道事件(执行异步任务)**的监听

_loop_thread_id,后面会看到构造函数只会被TaskExecutorGetterImp::addPoller调用,且此时绑定的线程id并非执行epoll_wait的线程,在构造之后,会在在执行runLoop时分配一个新线程再次绑定_loop_thread_id

EventPoller::EventPoller(ThreadPool::Priority priority) {    _priority = priority;    SockUtil::setNoBlocked(_pipe.readFD());    SockUtil::setNoBlocked(_pipe.writeFD());#if defined(HAS_EPOLL)    _epoll_fd = epoll_create(EPOLL_SIZE);    if (_epoll_fd == -1) {        throw runtime_error(StrPrinter << "创建epoll文件描述符失败:" << get_uv_errmsg());    }    SockUtil::setCloExec(_epoll_fd);#endif //HAS_EPOLL    _logger = Logger::Instance().shared_from_this();    _loop_thread_id = this_thread::get_id();    //添加内部管道事件    if (addEvent(_pipe.readFD(), Event_Read, [this](int event) { onPipeEvent(); }) == -1) {        throw std::runtime_error("epoll添加管道失败");    }}

onPipeEvent

没错,管道事件回调即把异步任务队列中的任务执行个编

inline void EventPoller::onPipeEvent() {    char buf[1024];    int err = 0;    do {        if (_pipe.read(buf, sizeof(buf)) > 0) {            continue;        }        err = get_uv_error(true);    } while (err != UV_EAGAIN);    decltype(_list_task) _list_swap;    {        lock_guard<mutex> lck(_mtx_task);        _list_swap.swap(_list_task);    }    _list_swap.for_each([&](const Task::Ptr &task) {        try {            (*task)();        } catch (ExitException &) {            _exit_flag = true;        } catch (std::exception &ex) {            ErrorL << "EventPoller执行异步任务捕获到异常:" << ex.what();        }    });}

epoll_ctl

socket中可以看到,对应的回调函数都有一个socket实例的弱指针,回调中可以操作socket的所有

EPOLL_CTL_ADD

typedef union epoll_data{  void *ptr;  int fd;  uint32_t u32;  uint64_t u64;} epoll_data_t;struct epoll_event{  uint32_t events;/* Epoll events */  epoll_data_t data;/* User data variable */} __EPOLL_PACKED;

并没有使用epoll_event.data.ptr来存放回调的数据,而是_event_map保存对应的fd对应的回调

int EventPoller::addEvent(int fd, int event, PollEventCB cb) {    TimeTicker();    if (!cb) {        WarnL << "PollEventCB 为空!";        return -1;    }    if (isCurrentThread()) {#if defined(HAS_EPOLL)        struct epoll_event ev = {0};        ev.events = (toEpoll(event)) | EPOLLEXCLUSIVE;        ev.data.fd = fd;        int ret = epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &ev);        if (ret == 0) {            _event_map.emplace(fd, std::make_shared<PollEventCB>(std::move(cb)));        }        return ret;#endif //HAS_EPOLL    }    async([this, fd, event, cb]() {        addEvent(fd, event, std::move(const_cast<PollEventCB &>(cb)));    });    return 0;}

看到这里回顾之前的socket部分,socket连接之后进入工作状态的回调onRead|onWrite|emitErr,其实也是同理,事件来了,来了做什么,socket对象自己是知道的。实际上socket对象真正对应的读写等事件的cb,是我们根据session具体类型指定的。

这就是大多数网络框架留出来的让用户自定义的onXXX回调部分,即处理业务逻辑的部分

bool Socket::attachEvent(const SockFD::Ptr &sock, bool is_udp) {    weak_ptr<Socket> weak_self = shared_from_this();    weak_ptr<SockFD> weak_sock = sock;    _enable_recv = true;    _read_buffer = _poller->getSharedBuffer();    int result = _poller->addEvent(sock->rawFd(), EventPoller::Event_Read | EventPoller::Event_Error | EventPoller::Event_Write, [weak_self,weak_sock,is_udp](int event) {        auto strong_self = weak_self.lock();        auto strong_sock = weak_sock.lock();        if (!strong_self || !strong_sock) {            return;        }        if (event & EventPoller::Event_Read) {            strong_self->onRead(strong_sock, is_udp);        }        if (event & EventPoller::Event_Write) {            strong_self->onWriteAble(strong_sock);        }        if (event & EventPoller::Event_Error) {            strong_self->emitErr(getSockErr(strong_sock));        }    });    return -1 != result;}

EPOLL_CTL_DEL/EPOLL_CTL_MOD

和epoll_ctl_add相同,只不过没有del的回调没有存,没必要。mod对应的也只是修改event类型,没有对_event_map进行操作

runLoop

跑不了reactor模型的范式,在就绪事件里调用对应的回调(回调放在了_event_map中,没有给内核event.data.ptr传指针,自己管理自己维护)

特别的是,blocked设置调用是否阻塞,master线程调用不能阻塞,则会else中新开一个线程,开好之后,通过信号量通知进度

void EventPoller::runLoop(bool blocked, bool ref_self) {    if (blocked) {        ThreadPool::setPriority(_priority);        lock_guard<mutex> lck(_mtx_running);        _loop_thread_id = this_thread::get_id();        if (ref_self) {            s_current_poller = shared_from_this();        }        _sem_run_started.post();        _exit_flag = false;        uint64_t minDelay;#if defined(HAS_EPOLL)        struct epoll_event events[EPOLL_SIZE];        while (!_exit_flag) {            minDelay = getMinDelay();            startSleep();//用于统计当前线程负载情况            int ret = epoll_wait(_epoll_fd, events, EPOLL_SIZE, minDelay ? minDelay : -1);            sleepWakeUp();//用于统计当前线程负载情况            if (ret <= 0) {                //超时或被打断                continue;            }            for (int i = 0; i < ret; ++i) {  //reactor范式                struct epoll_event &ev = events[i];                int fd = ev.data.fd;                auto it = _event_map.find(fd);                if (it == _event_map.end()) {                    epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, nullptr);                    continue;                }                auto cb = it->second;                try {                    (*cb)(toPoller(ev.events));                } catch (std::exception &ex) {                    ErrorL << "EventPoller执行事件回调捕获到异常:" << ex.what();                }            }        }    } else {        _loop_thread = new thread(&EventPoller::runLoop, this, true, ref_self);        _sem_run_started.wait();    }}

static thread_local std::weak_ptr<EventPoller> s_current_poller; ref_self是否引用自己,给线程中执行的开了一个口子,可以操作当前的线程的poller.

thread_local 关键字修饰的变量具有线程(thread)周期,这些变量在线程开始的时候被生成,在线程结束的时候被销毁,并且每一个线程都拥有一个独立的变量实例。

C++ 11 关键字:thread_local

EventPoller::Ptr EventPoller::getCurrentPoller() {    return s_current_poller.lock();}

getMinDelay 执行定时任务/设定epoll的超时等待时间

flushDelayTask

执行到期的的定时任务,如果需要循环执行,再次加入一个定时任务

距离下一个定时任务还有多久设计为epoll_wait的超时等待时间

uint64_t EventPoller::flushDelayTask(uint64_t now_time) {    decltype(_delay_task_map) task_copy;    task_copy.swap(_delay_task_map);    for (auto it = task_copy.begin(); it != task_copy.end() && it->first <= now_time; it = task_copy.erase(it)) {        //已到期的任务        try {            auto next_delay = (*(it->second))();            if (next_delay) {                //可重复任务,更新时间截止线                _delay_task_map.emplace(next_delay + now_time, std::move(it->second));            }        } catch (std::exception &ex) {            ErrorL << "EventPoller执行延时任务捕获到异常:" << ex.what();        }    }    task_copy.insert(_delay_task_map.begin(), _delay_task_map.end());    task_copy.swap(_delay_task_map);    auto it = _delay_task_map.begin();    if (it == _delay_task_map.end()) {        //没有剩余的定时器了        return 0;    }    //最近一个定时器的执行延时    return it->first - now_time;}

添加定时任务,会异步执行,引起管道事件,即往定时列表中添加定时任务。poller一直在runloop怎么添加的,所以异步执行时,会判断是否是当前poller线程,不是就写管道通知来活了

EventPoller::DelayTask::Ptr EventPoller::doDelayTask(uint64_t delay_ms, function<uint64_t()> task) {    DelayTask::Ptr ret = std::make_shared<DelayTask>(std::move(task));    auto time_line = getCurrentMillisecond() + delay_ms;    async_first([time_line, ret, this]() {        //异步执行的目的是刷新select或epoll的休眠时间        _delay_task_map.emplace(time_line, ret);    });    return ret;}

Any/AnyStorage

任意对象void*类型的问题在于,当delete时,能不能获得对应的类型,正确的执行析构。shared_ptr的第二个参数允许我们传入deletor,解决这个问题

//可以保存任意的对象class Any{public:    using Ptr = std::shared_ptr<Any>;    Any() = default;    ~Any() = default;    template <typename C,typename ...ArgsType>    void set(ArgsType &&...args){        _data.reset(new C(std::forward<ArgsType>(args)...),[](void *ptr){            delete (C*) ptr;        });    }    template <typename C>    C& get(){        if(!_data){            throw std::invalid_argument("Any is empty");        }        C *ptr = (C *)_data.get();        return *ptr;    }    operator bool() {        return _data.operator bool ();         //std::shared_ptr<T>::operator bool  true if *this stores a pointer, false otherwise.    }    bool empty(){        return !bool();    }private:    std::shared_ptr<void> _data;};//用于保存一些外加属性class AnyStorage : public std::unordered_map<std::string,Any>{public:    AnyStorage() = default;    ~AnyStorage() = default;    using Ptr = std::shared_ptr<AnyStorage>;};

std::shared_ptr的工作原理

shared_ptr<void>确实会记录传入的类型,因此析构时能够正确执行。

但是如果传入的是父类指针,就体现了析构函数虚函数的重要性

#include <memory>#include <iostream>#include <vector>class test {public:  test() {    std::cout << "Test created" << std::endl;  }  virtual ~test() {    std::cout << "Test destroyed" << std::endl;  }};class test1: public test{public:test1(){std::cout << "Test1 created" << std::endl;}~test1(){std::cout<< "Test1 destroyed"<< std::endl;}};int main() {  std::cout << "At begin of main.\ncreating std::vector<std::shared_ptr<void>>"             << std::endl;  std::vector<std::shared_ptr<void>> v;  {    std::cout << "Creating test" << std::endl;    v.push_back( std::shared_ptr<test>( (test*)new test1() ) );//传入的是test类型的指针,如果没有virtual不会执行test1的析构    v.push_back( std::shared_ptr<test>( new test1() ) ); //传入的是test1类型的指针,即使没有virtual也能正确调用    std::cout << "Leaving scope" << std::endl;  }  std::cout << "Leaving main" << std::endl;  return 0;}

EventPollerPool

UML

在这里插入图片描述

INSTANCE_IMP(EventPollerPool)    #define INSTANCE_IMP(class_name, ...) \class_name &class_name::Instance() { \    static std::shared_ptr<class_name> s_instance(new class_name(__VA_ARGS__)); \    static class_name &s_instance_ref = *s_instance; \    return s_instance_ref; \}// 声明一个单例,智能指针管理生命周期EventPollerPool &EventPollerPool::Instance() {     static std::shared_ptr<EventPollerPool> s_instance(new EventPollerPool());     static EventPollerPool &s_instance_ref = *s_instance;     return s_instance_ref; }EventPollerPool::EventPollerPool() {    auto size = addPoller("event poller", s_pool_size, ThreadPool::PRIORITY_HIGHEST, true);    InfoL << "创建EventPoller个数:" << size;}

在池单例的构造函数中,调用TaskExecutorGetter的addPoller方法

EventPoller::Ptr EventPollerPool::getFirstPoller() {    return dynamic_pointer_cast<EventPoller>(_threads.front());}//根据负载情况获取轻负载的实例 如果优先返回当前线程,那么会返回当前线程 返回当前线程的目的是为了提高线程安全性EventPoller::Ptr EventPollerPool::getPoller(bool prefer_current_thread) {    auto poller = EventPoller::getCurrentPoller();    if (prefer_current_thread && _prefer_current_thread && poller) {        return poller;    }    return dynamic_pointer_cast<EventPoller>(getExecutor());}/** * 设置 getPoller() 是否优先返回当前线程 * 在批量创建Socket对象时,如果优先返回当前线程, * 那么将导致负载不够均衡,所以可以暂时关闭然后再开启 * @param flag 是否优先返回当前线程 */void EventPollerPool::preferCurrentThread(bool flag) {    _prefer_current_thread = flag;}

WorkThreadPool

长得和EventPoller一样…唯一不同在于线程优先级不同 PRIORITY_LOWEST ,EventPollerPool对应的线程优先级PRIORITY_HIGHEST

总结

如何实现一个可以存储任意类型Any结构,对应的资源释放怎么管理,shared_ptr, 析构函数虚函数的重要性

thread_local 具有线程周期,这些变量在线程开始的时候被生成,在线程结束的时候被销毁,并且每一个线程都拥有一个独立的变量实例

对于异步任务执行的实现,pipe事件监听+带锁的异步任务队列

如果添加定时任务,如何在epoll_wait的循环中不错过定时任务,如何处理定时任务

多个poller线程的负载均衡怎么做的 (没有什么高大上,记录wait和run的时间,选择空闲比最小的)


点击全文阅读


本文链接:http://zhangshiyu.com/post/47264.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1