#include "async.h" #include #include #include #include #include "trace.h" #ifdef __x86_64__ #include struct InterruptLock { static std::mutex m; InterruptLock() { m.lock(); } ~InterruptLock() { m.unlock(); } }; std::mutex InterruptLock::m; #else // __x86_64__ #include "lock.h" using tracing::trace; #endif // __x86_64__ namespace async { namespace { using namespace std::literals::chrono_literals; struct Stuff { std::coroutine_handle<> h; std::chrono::system_clock::time_point expiration; Stuff* next; }; struct Notification { bool pending; // can only be true if stuff is nullptr Stuff* stuff; }; std::atomic work; std::array(AwaitableType::kNumTypes)> notifications = {}; } // namespace void schedule(std::coroutine_handle<> h, int ms) { TRACE(tracing::TraceEvent::kAsyncSchedule); std::chrono::system_clock::time_point exp = std::chrono::system_clock::now() + std::chrono::milliseconds(ms); Stuff* news = new Stuff{ .h = h, .expiration = exp, }; Stuff* stuff = work; if (!stuff || stuff->expiration > exp) { news->next = stuff; work = news; return; } Stuff* s = stuff; while (s->next && s->next->expiration <= exp) { s = s->next; } news->next = s->next; s->next = news; } void main_loop(bool (*idle_function)()) { while (1) { if (idle_function != nullptr) { if (idle_function()) { break; }; } Stuff* stuff = work; if (stuff == nullptr) { continue; // busyloop } auto now = std::chrono::system_clock::now(); auto dt = stuff->expiration - now; if (dt > 0ms) { continue; } TRACE(tracing::TraceEvent::kAsyncTask); stuff->h(); TRACE(tracing::TraceEvent::kAsyncTaskDone); if (stuff->h.done()) { stuff->h.destroy(); } { InterruptLock lock; work = stuff->next; } delete stuff; } } void enqueue(std::coroutine_handle<> h, AwaitableType type) { auto ttype = static_cast(type); { InterruptLock lock; TRACE(tracing::TraceEvent::kAsyncEnqueue); const bool was_notified = std::exchange(notifications[ttype].pending, false); if (was_notified) { TRACE(tracing::TraceEvent::kAsyncAwaitWasNotified); schedule(h); return; } Stuff* item = new Stuff{.h = h}; Stuff* stuff = notifications[ttype].stuff; if (stuff == nullptr) { notifications[ttype].stuff = item; return; } while (stuff->next != nullptr) { stuff = stuff->next; } stuff->next = item; } } void resume(AwaitableType type) { auto ttype = static_cast(type); Stuff* stuff = nullptr; { InterruptLock lock; stuff = notifications[ttype].stuff; if (stuff == nullptr) { notifications[ttype].pending = true; return; } notifications[ttype].stuff = stuff->next; schedule(stuff->h); } delete stuff; } } // namespace async #if 0 task readline() { int size = 0; char c; buffer buff = buffer::make(32); // fcntl(0, F_SETFL, fcntl(0, F_GETFL) | O_NONBLOCK); while (true) { int n = read(0, &c, 1); if (n < 1) { co_await co_waitio(); continue; } buff.data[size++] = static_cast(c); if (c == '\n') { buff.data = buff.data.subspan(0, size); co_return buff; } } } #endif // 0