|
前言
书接上回,这篇文章介绍一下async_simple无栈协程部分各组件的实现。

Mutex
源码在这里,协程意义上的锁,在看源码之前可以脑补一下,当多个协程对同一个mutex上锁时,只有一个协程可以成功,其他协程会挂起,并将自己的coroutine_handle挂载在mutex上,当上锁成功的协程执行解锁操作时,会判断当前锁上是否有挂载的协程,如果有则取出其中的一个让其成功上锁,并resume该协程。
需要注意的是,由于调度器可能是多线程的,因此上述操作应该是线程安全的;另外作为一个基础库而言,我们希望加锁解锁等操作导致的协程切换的开销尽量小,因此async_simple采用无锁编程的方式实现了Mutex。
首先看Mutex的两个数据成员:
// This contains either:
// - this => Not locked
// - nullptr => Locked, no newly queued waiters (ie. empty list of waiters)
// - other => Pointer to first LockAwaiter* in a linked-list of newly
// queued awaiters in LIFO order.
std::atomic<void*> _state;
// Linked-list of waiters in FIFO order.
// Only the current lock holder is allowed to access this member.
LockAwaiter* _waiters;
_state是一个原子变量,这里根据取值的不同表示了Mutex的不同状态,_waiters存储着当前挂载在Mutex上的协程对应的awaiter,LockAwaiter中存储着coroutine_handle(因此Mutex可以resume挂载的协程)和next指针(因此挂载的协程可以组织为单链表,实际上_waiters指向这个单链表的表头)。
理解了成员变量的含义,再看其他接口的实现就很简单了:
bool tryLock() noexcept {
void* oldValue = unlockedState();
return _state.compare_exchange_strong(oldValue, nullptr,
std::memory_order_acquire,
std::memory_order_relaxed);
}
如果当前Mutex处于未锁定状态,则尝试加锁,如果加锁失败则返回false,不挂起。
inline Mutex::LockAwaiter Mutex::coLock() noexcept {
return LockAwaiter(*this);
}
class LockAwaiter {
public:
explicit LockAwaiter(Mutex& mutex) noexcept : _mutex(mutex) {}
bool await_ready() noexcept { return _mutex.tryLock(); }
bool await_suspend(std::coroutine_handle<> awaitingCoroutine) noexcept {
_awaitingCoroutine = awaitingCoroutine;
return _mutex.lockAsyncImpl(this);
}
void await_resume() noexcept {}
// FIXME: LockAwaiter should implement coAwait to avoid to fall in
// ViaAsyncAwaiter.
protected:
Mutex& _mutex;
private:
friend Mutex;
std::coroutine_handle<> _awaitingCoroutine;
LockAwaiter* _next;
};
下面是真正的加锁实现:
bool lockAsyncImpl(LockAwaiter* awaiter) {
void* oldValue = _state.load(std::memory_order_relaxed);
while (true) {
if (oldValue == unlockedState()) {
// It looks like the mutex is currently unlocked.
// Try to acquire it synchronously.
void* newValue = nullptr;
if (_state.compare_exchange_weak(oldValue, newValue,
std::memory_order_acquire,
std::memory_order_relaxed)) {
// Acquired synchronously, don&#39;t suspend.
return false;
}
} else {
// It looks like the mutex is currently locked.
// Try to queue this waiter to the list of waiters.
void* newValue = awaiter;
awaiter->_next = static_cast<LockAwaiter*>(oldValue);
if (_state.compare_exchange_weak(oldValue, newValue,
std::memory_order_release,
std::memory_order_relaxed)) {
// Queued waiter successfully. Awaiting coroutine should
// suspend.
return true;
}
}
}
}
典型的无锁编程,读取原子变量的旧值,然后通过cas操作判断状态并更新,由于可能多个线程在同时修改,因此cas操作可能失败,固将其放在while(true)中不断尝试。
当Mutex处于加锁状态时,将调用协程对应的awaiter挂载在链表头,并更新_state。
下面是解锁的实现:
void unlock() noexcept {
assert(_state.load(std::memory_order_relaxed) != unlockedState());
auto* waitersHead = _waiters;
if (waitersHead == nullptr) {
void* currentState = _state.load(std::memory_order_relaxed);
if (currentState == nullptr) {
// Looks like there are no waiters waiting to acquire the lock.
// Try to unlock it - use a compare-exchange to decide the race
// between unlocking the mutex and another thread enqueueing
// another waiter.
const bool releasedLock = _state.compare_exchange_strong(
currentState, unlockedState(), std::memory_order_release,
std::memory_order_relaxed);
if (releasedLock) {
return;
}
}
// There are some awaiters that have been newly queued.
// Dequeue them and reverse their order from LIFO to FIFO.
currentState = _state.exchange(nullptr, std::memory_order_acquire);
assert(currentState != unlockedState());
assert(currentState != nullptr);
auto* waiter = static_cast<LockAwaiter*>(currentState);
do {
auto* temp = waiter->_next;
waiter->_next = waitersHead;
waitersHead = waiter;
waiter = temp;
} while (waiter != nullptr);
}
assert(waitersHead != nullptr);
_waiters = waitersHead->_next;
waitersHead->_awaitingCoroutine.resume();
}
判断_state的状态,如果没有挂载的协程,尝试将其恢复为未加锁状态并返回,注意这里可能会失败,因为其他线程可能在读取_state之后、对_state进行cas操作之前修改了_state的值(新挂载了一个协程),不过这种情况可以合并在本身就有挂载的协程的情况中:从链表头取出一个协程并resume它。
ConditionVariable
源码在这里,实现的手法和技巧和Mutex基本上一样,只不过替换成了ConditionVariable的语义而已,本文不赘述了。
SpinLock
源码在这里,和普通的自旋锁相比,async_simple提供了协程版的加锁接口:如果重试一定次数还没有加锁成功,就让出cpu使自己重新参加调度。
SyncAwait
源码在这里,同步阻塞的等待协程执行完毕,一般用在非协程上下文中等待协程执行完毕,实现很简单,基于异步非阻塞的start接口,加上线程版本的条件变量即可。(本地对条件变量执行wait操作,协程的异步回调中对条件变量执行notify操作)
CountingSemaphore
协程版本的std::counting_semaphore,基于Mutex和ConditionVariable实现,EZ:
template <std::size_t LeastMaxValue>
Lazy<void> CountingSemaphore<LeastMaxValue>::acquire() noexcept {
auto lock = co_await lock_.coScopedLock();
co_await cv_.wait(lock_, [this] { return count_ > 0; });
--count_;
}
template <std::size_t LeastMaxValue>
Lazy<void> CountingSemaphore<LeastMaxValue>::release(
std::size_t update) noexcept {
// update should be less than LeastMaxValue and greater than 0
assert(update <= LeastMaxValue && update != 0);
auto lock = co_await lock_.coScopedLock();
// internal counter should be less than LeastMaxValue
assert(count_ <= LeastMaxValue - update);
count_ += update;
if (update > 1) {
// When update is greater than 1, wake up all coroutines.
// When the counter is decremented to 0, resuspend the coroutine.
cv_.notifyAll();
} else {
cv_.notifyOne();
}
}
collectAny,collectAll,collectAllPara,collectAllWindowed,collectAllWindowedPara
源码在这里,用来在协程上下文中等待一组协程中的所有/任何一个协程调度完成,基于异步非阻塞接口start完成。脑补一下,依次为一组协程设置异步回调接口并启动运行,所有回调接口都持有一些共享变量,这些变量用来统计完成情况、持有调用协程的coroutine_handle。在回调接口中判断是不是所有的协程都完成(All)/是不是第一个完成的协程(Any),如果满足条件则resume调用协程,以collectAll为例看下代码:
inline void await_suspend(std::coroutine_handle<> continuation) {
auto promise_type =
std::coroutine_handle<LazyPromiseBase>::from_address(
continuation.address())
.promise();
auto executor = promise_type._executor;
for (size_t i = 0; i < _input.size(); ++i) {
auto& exec = _input._coro.promise()._executor;
if (exec == nullptr) {
exec = executor;
}
auto&& func = [this, i]() {
_input.start([this, i](Try<ValueType>&& result) {
_output = std::move(result);
auto awaitingCoro = _event.down();
if (awaitingCoro) {
awaitingCoro.resume();
}
});
};
if (Para == true && _input.size() > 1) {
if (exec != nullptr)
AS_LIKELY {
exec->schedule(func);
continue;
}
}
func();
}
_event.setAwaitingCoro(continuation);
auto awaitingCoro = _event.down();
if (awaitingCoro) {
awaitingCoro.resume();
}
}
和我们设想的基本一致,只要你理解了start接口,这些接口就很好理解了。
Generator
源码在这里,std::generator的自定义实现,这个组件在c++23中提供,提案在这里,如果编译期不支持c++23则使用这里的自定义版本。
总的来说 std::generator是一个协程,我们可以在协程中不断co_yield 来产生值。std::generator的工作是将这样的协程封装为满足viewandinput_range 的对象,这样就可以和range库中的其他组件很好的组合使用了,同时它是支持递归调用的。(仔细想一下这不是简单实现的,因为用户持有的是最顶层的协程句柄,而支持递归调用的情况下,resume的应该是最里层的被调协程,这里的实现也很有意思)
总结
作为上一篇的补充,这篇内容不是很多,因为上层组件的概念都很成熟,实现也是建立在底层组件的基础上,比较简洁,诸位自己去翻源码吧。有空的话会补充Generator的源码解析,其他的应该没有必要。
写技术文章真是挺累的,溜了溜了。
 |
|