当前位置:首页 » 《随便一记》 » 正文

zlMediaKit 1 task模块--怎么用异步做到同步,怎么基于任务而非基于线程管理

5 人参与  2022年11月07日 15:05  分类 : 《随便一记》  评论

点击全文阅读


TaskExecutor.h

Task | TaskIn

一个是函数对象,一个是可调用的类对象

using TaskIn = std::function<void()>;

using Task = TaskCancelableImp<void()>;

TaskCancelableImp

一个重载了operator()的类,返回值和参数是对应的模板参数,函数可取消

取消通过一强一弱智能指针实现

同时定义了取消后的默认返回值

template<class R, class... ArgTypes>class TaskCancelableImp<R(ArgTypes...)> : public TaskCancelable {public:    using Ptr = std::shared_ptr<TaskCancelableImp>;    using func_type = std::function<R(ArgTypes...)>;    ~TaskCancelableImp() = default;    template<typename FUNC>    TaskCancelableImp(FUNC &&task) {        _strongTask = std::make_shared<func_type>(std::forward<FUNC>(task));        _weakTask = _strongTask;    }    void cancel() override {        _strongTask = nullptr;    }    operator bool() {        return _strongTask && *_strongTask;    }    void operator=(std::nullptr_t) {        _strongTask = nullptr;    }    R operator()(ArgTypes ...args) const {        auto strongTask = _weakTask.lock();        if (strongTask && *strongTask) {            return (*strongTask)(std::forward<ArgTypes>(args)...);        }        return defaultValue<R>();    }    template<typename T>    static typename std::enable_if<std::is_void<T>::value, void>::type    defaultValue() {}    template<typename T>    static typename std::enable_if<std::is_pointer<T>::value, T>::type    defaultValue() {        return nullptr;    }    template<typename T>    static typename std::enable_if<std::is_integral<T>::value, T>::type    defaultValue() {        return 0;    }protected:    std::weak_ptr<func_type> _weakTask;    std::shared_ptr<func_type> _strongTask;};

TaskExecutorGetterImp

UML

在这里插入图片描述

TaskExecutorGetter

virtual TaskExecutor::Ptr getExecutor() = 0;virtual size_t getExecutorSize() const = 0;

结构

std::vector<TaskExecutor::Ptr> _threads; //线程池,多个任务执行器

因为装了多个TaskExecutor,所以对应的获取执行器,统计负载,等功能都有。

addPoller

根据核数添加对应的poller线程

size_t TaskExecutorGetterImp::addPoller(const string &name, size_t size, int priority, bool register_thread) {    auto cpus = thread::hardware_concurrency();    size = size > 0 ? size : cpus;    for (size_t i = 0; i < size; ++i) {        EventPoller::Ptr poller(new EventPoller((ThreadPool::Priority) priority));        poller->runLoop(false, register_thread);        auto full_name = name + " " + to_string(i);        poller->async([i, cpus, full_name]() {            setThreadName(full_name.data());            setThreadAffinity(i % cpus);        });        _threads.emplace_back(std::move(poller));    }    return size;}

getExecutorDelay 给每一个thread异步加入一个定时器,统计执行的时间。finished里析构是调用回调函数。

void TaskExecutorGetterImp::getExecutorDelay(const function<void(const vector<int> &)> &callback) {    std::shared_ptr<vector<int> > delay_vec = std::make_shared<vector<int>>(_threads.size());    shared_ptr<void> finished(nullptr, [callback, delay_vec](void *) {        //此析构回调触发时,说明已执行完毕所有async任务        callback((*delay_vec));    });    int index = 0;    for (auto &th : _threads) {        std::shared_ptr<Ticker> delay_ticker = std::make_shared<Ticker>();        th->async([finished, delay_vec, index, delay_ticker]() {            (*delay_vec)[index] = (int) delay_ticker->elapsedTime();        }, false);        ++index;    }}

getExecutor

根据线程负载情况,获取最空闲的任务执行器

TaskExecutor::Ptr TaskExecutorGetterImp::getExecutor() {    auto thread_pos = _thread_pos;    if (thread_pos >= _threads.size()) {        thread_pos = 0;    }    TaskExecutor::Ptr executor_min_load = _threads[thread_pos];    auto min_load = executor_min_load->load();    for (size_t i = 0; i < _threads.size(); ++i, ++thread_pos) {        if (thread_pos >= _threads.size()) {            thread_pos = 0;        }        auto th = _threads[thread_pos];        auto load = th->load();        if (load < min_load) {            min_load = load;            executor_min_load = th;        }        if (min_load == 0) {            break;        }    }    _thread_pos = thread_pos;    return executor_min_load;}

TaskExecutor

一个任务执行器的最重要的是实现继承自TaskExecutorInterface异步执行的接口

UML

在这里插入图片描述

ThreadLoadCounter 计算load

统计线程的cpu使用率

|sleep…|run…|sleep…|run…|交替执行,在load里统计run/run+sleep的百分比

!TaskExecutorInterface

继承的子类需要实现一个异步执行的接口!

class TaskExecutorInterface {void sync(const TaskIn &task);           //同步执行就是引用    void sync_first(const TaskIn &task);                             virtual Task::Ptr async_first(TaskIn task, bool may_sync = true); //异步执行涉及到拷贝了    virtual Task::Ptr async(TaskIn task, bool may_sync = true) = 0;}

最终都是调用async,子类来实现async

如何化同步为异步,子类async的返回值可以作为判断是否已经执行任务的根据,如果ret && *ret那么说明任务被放到异步队列中了,sem.wait()等待任务执行时 sem.post()唤醒,否则说明已经执行完毕

EventPoller::async_l的参数说明: 可取消的任务本体,如果已经同步执行,则返回nullptr

void TaskExecutorInterface::sync(const TaskIn &task) {    semaphore sem;    auto ret = async([&]() {        onceToken token(nullptr, [&]() {            //通过RAII原理防止抛异常导致不执行这句代码            sem.post(); //在异步代码中让同步继续进行下去        });        task();    });    if (ret && *ret) {//任务需要被异步执行        sem.wait();    }}

总结

怎么用异步实现同步模板中怎么定义函数对象,模板中怎么用enable_if定义默认的返回值接口类的使用

点击全文阅读


本文链接:http://zhangshiyu.com/post/47274.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1