厨方网

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 168|回复: 2

async_simple 源码分析(下)

[复制链接]

1

主题

2

帖子

4

积分

新手上路

Rank: 1

积分
4
发表于 2023-4-19 18:11:51 | 显示全部楼层 |阅读模式
前言

书接上回,这篇文章介绍一下async_simple无栈协程部分各组件的实现。


Mutex

源码在这里,协程意义上的锁,在看源码之前可以脑补一下,当多个协程对同一个mutex上锁时,只有一个协程可以成功,其他协程会挂起,并将自己的coroutine_handle挂载在mutex上,当上锁成功的协程执行解锁操作时,会判断当前锁上是否有挂载的协程,如果有则取出其中的一个让其成功上锁,并resume该协程。
需要注意的是,由于调度器可能是多线程的,因此上述操作应该是线程安全的;另外作为一个基础库而言,我们希望加锁解锁等操作导致的协程切换的开销尽量小,因此async_simple采用无锁编程的方式实现了Mutex。
首先看Mutex的两个数据成员:
    // This contains either:
    // - this    => Not locked
    // - nullptr => Locked, no newly queued waiters (ie. empty list of waiters)
    // - other   => Pointer to first LockAwaiter* in a linked-list of newly
    //              queued awaiters in LIFO order.
    std::atomic<void*> _state;

    // Linked-list of waiters in FIFO order.
    // Only the current lock holder is allowed to access this member.
    LockAwaiter* _waiters;
_state是一个原子变量,这里根据取值的不同表示了Mutex的不同状态,_waiters存储着当前挂载在Mutex上的协程对应的awaiter,LockAwaiter中存储着coroutine_handle(因此Mutex可以resume挂载的协程)和next指针(因此挂载的协程可以组织为单链表,实际上_waiters指向这个单链表的表头)。
理解了成员变量的含义,再看其他接口的实现就很简单了:
    bool tryLock() noexcept {
        void* oldValue = unlockedState();
        return _state.compare_exchange_strong(oldValue, nullptr,
                                              std::memory_order_acquire,
                                              std::memory_order_relaxed);
    }
如果当前Mutex处于未锁定状态,则尝试加锁,如果加锁失败则返回false,不挂起。
inline Mutex::LockAwaiter Mutex::coLock() noexcept {
    return LockAwaiter(*this);
}

    class LockAwaiter {
    public:
        explicit LockAwaiter(Mutex& mutex) noexcept : _mutex(mutex) {}

        bool await_ready() noexcept { return _mutex.tryLock(); }

        bool await_suspend(std::coroutine_handle<> awaitingCoroutine) noexcept {
            _awaitingCoroutine = awaitingCoroutine;
            return _mutex.lockAsyncImpl(this);
        }

        void await_resume() noexcept {}

        // FIXME: LockAwaiter should implement coAwait to avoid to fall in
        // ViaAsyncAwaiter.

    protected:
        Mutex& _mutex;

    private:
        friend Mutex;

        std::coroutine_handle<> _awaitingCoroutine;
        LockAwaiter* _next;
    };
下面是真正的加锁实现:
    bool lockAsyncImpl(LockAwaiter* awaiter) {
        void* oldValue = _state.load(std::memory_order_relaxed);
        while (true) {
            if (oldValue == unlockedState()) {
                // It looks like the mutex is currently unlocked.
                // Try to acquire it synchronously.
                void* newValue = nullptr;
                if (_state.compare_exchange_weak(oldValue, newValue,
                                                 std::memory_order_acquire,
                                                 std::memory_order_relaxed)) {
                    // Acquired synchronously, don't suspend.
                    return false;
                }
            } else {
                // It looks like the mutex is currently locked.
                // Try to queue this waiter to the list of waiters.
                void* newValue = awaiter;
                awaiter->_next = static_cast<LockAwaiter*>(oldValue);
                if (_state.compare_exchange_weak(oldValue, newValue,
                                                 std::memory_order_release,
                                                 std::memory_order_relaxed)) {
                    // Queued waiter successfully. Awaiting coroutine should
                    // suspend.
                    return true;
                }
            }
        }
    }
典型的无锁编程,读取原子变量的旧值,然后通过cas操作判断状态并更新,由于可能多个线程在同时修改,因此cas操作可能失败,固将其放在while(true)中不断尝试。
当Mutex处于加锁状态时,将调用协程对应的awaiter挂载在链表头,并更新_state。
下面是解锁的实现:
    void unlock() noexcept {
        assert(_state.load(std::memory_order_relaxed) != unlockedState());
        auto* waitersHead = _waiters;
        if (waitersHead == nullptr) {
            void* currentState = _state.load(std::memory_order_relaxed);
            if (currentState == nullptr) {
                // Looks like there are no waiters waiting to acquire the lock.
                // Try to unlock it - use a compare-exchange to decide the race
                // between unlocking the mutex and another thread enqueueing
                // another waiter.
                const bool releasedLock = _state.compare_exchange_strong(
                    currentState, unlockedState(), std::memory_order_release,
                    std::memory_order_relaxed);
                if (releasedLock) {
                    return;
                }
            }
            // There are some awaiters that have been newly queued.
            // Dequeue them and reverse their order from LIFO to FIFO.
            currentState = _state.exchange(nullptr, std::memory_order_acquire);
            assert(currentState != unlockedState());
            assert(currentState != nullptr);
            auto* waiter = static_cast<LockAwaiter*>(currentState);
            do {
                auto* temp = waiter->_next;
                waiter->_next = waitersHead;
                waitersHead = waiter;
                waiter = temp;
            } while (waiter != nullptr);
        }
        assert(waitersHead != nullptr);
        _waiters = waitersHead->_next;
        waitersHead->_awaitingCoroutine.resume();
    }
判断_state的状态,如果没有挂载的协程,尝试将其恢复为未加锁状态并返回,注意这里可能会失败,因为其他线程可能在读取_state之后、对_state进行cas操作之前修改了_state的值(新挂载了一个协程),不过这种情况可以合并在本身就有挂载的协程的情况中:从链表头取出一个协程并resume它。
ConditionVariable

源码在这里,实现的手法和技巧和Mutex基本上一样,只不过替换成了ConditionVariable的语义而已,本文不赘述了。
SpinLock

源码在这里,和普通的自旋锁相比,async_simple提供了协程版的加锁接口:如果重试一定次数还没有加锁成功,就让出cpu使自己重新参加调度。
SyncAwait

源码在这里,同步阻塞的等待协程执行完毕,一般用在非协程上下文中等待协程执行完毕,实现很简单,基于异步非阻塞的start接口,加上线程版本的条件变量即可。(本地对条件变量执行wait操作,协程的异步回调中对条件变量执行notify操作)
CountingSemaphore

协程版本的std::counting_semaphore,基于MutexConditionVariable实现,EZ:
template <std::size_t LeastMaxValue>
Lazy<void> CountingSemaphore<LeastMaxValue>::acquire() noexcept {
    auto lock = co_await lock_.coScopedLock();
    co_await cv_.wait(lock_, [this] { return count_ > 0; });
    --count_;
}

template <std::size_t LeastMaxValue>
Lazy<void> CountingSemaphore<LeastMaxValue>::release(
    std::size_t update) noexcept {
    // update should be less than LeastMaxValue and greater than 0
    assert(update <= LeastMaxValue && update != 0);
    auto lock = co_await lock_.coScopedLock();
    // internal counter should be less than LeastMaxValue
    assert(count_ <= LeastMaxValue - update);
    count_ += update;
    if (update > 1) {
        // When update is greater than 1, wake up all coroutines.
        // When the counter is decremented to 0, resuspend the coroutine.
        cv_.notifyAll();
    } else {
        cv_.notifyOne();
    }
}
collectAny,collectAll,collectAllPara,collectAllWindowed,collectAllWindowedPara

源码在这里,用来在协程上下文中等待一组协程中的所有/任何一个协程调度完成,基于异步非阻塞接口start完成。脑补一下,依次为一组协程设置异步回调接口并启动运行,所有回调接口都持有一些共享变量,这些变量用来统计完成情况、持有调用协程的coroutine_handle。在回调接口中判断是不是所有的协程都完成(All)/是不是第一个完成的协程(Any),如果满足条件则resume调用协程,以collectAll为例看下代码:
    inline void await_suspend(std::coroutine_handle<> continuation) {
        auto promise_type =
            std::coroutine_handle<LazyPromiseBase>::from_address(
                continuation.address())
                .promise();
        auto executor = promise_type._executor;
        for (size_t i = 0; i < _input.size(); ++i) {
            auto& exec = _input._coro.promise()._executor;
            if (exec == nullptr) {
                exec = executor;
            }
            auto&& func = [this, i]() {
                _input.start([this, i](Try<ValueType>&& result) {
                    _output = std::move(result);
                    auto awaitingCoro = _event.down();
                    if (awaitingCoro) {
                        awaitingCoro.resume();
                    }
                });
            };
            if (Para == true && _input.size() > 1) {
                if (exec != nullptr)
                    AS_LIKELY {
                        exec->schedule(func);
                        continue;
                    }
            }
            func();
        }
        _event.setAwaitingCoro(continuation);
        auto awaitingCoro = _event.down();
        if (awaitingCoro) {
            awaitingCoro.resume();
        }
    }
和我们设想的基本一致,只要你理解了start接口,这些接口就很好理解了。
Generator

源码在这里,std::generator的自定义实现,这个组件在c++23中提供,提案在这里,如果编译期不支持c++23则使用这里的自定义版本。
总的来说 std::generator是一个协程,我们可以在协程中不断co_yield 来产生值。std::generator的工作是将这样的协程封装为满足viewandinput_range 的对象,这样就可以和range库中的其他组件很好的组合使用了,同时它是支持递归调用的。(仔细想一下这不是简单实现的,因为用户持有的是最顶层的协程句柄,而支持递归调用的情况下,resume的应该是最里层的被调协程,这里的实现也很有意思)
  总结

作为上一篇的补充,这篇内容不是很多,因为上层组件的概念都很成熟,实现也是建立在底层组件的基础上,比较简洁,诸位自己去翻源码吧。有空的话会补充Generator的源码解析,其他的应该没有必要。
写技术文章真是挺累的,溜了溜了。

回复

使用道具 举报

0

主题

1

帖子

0

积分

新手上路

Rank: 1

积分
0
发表于 2023-4-19 18:11:58 | 显示全部楼层
谢谢分享
回复

使用道具 举报

0

主题

5

帖子

10

积分

新手上路

Rank: 1

积分
10
发表于 2025-3-21 08:16:58 | 显示全部楼层
我了个去,顶了
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|厨方网

GMT+8, 2025-4-7 16:00 , Processed in 0.462505 second(s), 23 queries .

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表