#include "async.h" #include #include #include #include #include "lock.h" #include "trace.h" namespace async { namespace { using namespace std::literals::chrono_literals; struct Stuff { std::coroutine_handle<> h; std::chrono::system_clock::time_point expiration; Stuff* next; }; struct Notification { bool pending; // can only be true if stuff is nullptr Stuff* stuff; }; std::atomic work = nullptr; std::array(AwaitableType::kNumTypes)> notifications = {}; } // namespace void schedule(std::coroutine_handle<> h, int ms) { InterruptLock lock; TRACE(tracing::TraceEvent::kAsyncSchedule); std::chrono::system_clock::time_point exp = std::chrono::system_clock::now() + std::chrono::milliseconds(ms); Stuff* news = new Stuff{ .h = h, .expiration = exp, }; Stuff* stuff = work; if (!stuff || stuff->expiration > exp) { news->next = stuff; work = news; return; } Stuff* s = stuff; while (s->next && s->next->expiration <= exp) { s = s->next; } news->next = s->next; s->next = news; } void step() { Stuff* stuff; // ensure all previous side effects are visible { InterruptLock lock; stuff = work; }; if (stuff == nullptr) { return; } auto now = std::chrono::system_clock::now(); auto dt = stuff->expiration - now; if (dt > 0ms) { return; } int stuffinqueue = 0; for (Stuff* s = stuff; s; s = s->next) stuffinqueue++; TRACE(tracing::TraceEvent::kAsyncTask); stuff->h(); TRACE(tracing::TraceEvent::kAsyncTaskDone); if (stuff->h.done()) { stuff->h.destroy(); } { InterruptLock lock; work = stuff->next; } delete stuff; } void reset() { Stuff* stuff = work; while (stuff) { Stuff* byebye = stuff; stuff = stuff->next; delete byebye; } work = nullptr; } void main_loop(bool (*idle_function)()) { while (1) { if (idle_function != nullptr) { if (idle_function()) { reset(); break; }; } step(); } } void enqueue(std::coroutine_handle<> h, AwaitableType type) { auto ttype = static_cast(type); { InterruptLock lock; TRACE(tracing::TraceEvent::kAsyncEnqueue); const bool was_notified = std::exchange(notifications[ttype].pending, false); if (was_notified) { TRACE(tracing::TraceEvent::kAsyncAwaitWasNotified); schedule(h); return; } Stuff* item = new Stuff{.h = h}; Stuff* stuff = notifications[ttype].stuff; if (stuff == nullptr) { notifications[ttype].stuff = item; return; } while (stuff->next != nullptr) { stuff = stuff->next; } stuff->next = item; } } void resume(AwaitableType type) { auto ttype = static_cast(type); Stuff* stuff = nullptr; { InterruptLock lock; stuff = notifications[ttype].stuff; if (stuff == nullptr) { notifications[ttype].pending = true; return; } notifications[ttype].stuff = stuff->next; schedule(stuff->h); } delete stuff; } } // namespace async