synth/arm/async.cc

196 lines
3.8 KiB
C++
Raw Normal View History

2022-05-17 03:56:25 +00:00
#include "async.h"
#include <array>
#include <atomic>
#include <chrono>
#include <utility>
#include "lock.h"
#include "trace.h"
2022-05-17 03:56:25 +00:00
namespace async {
namespace {
using namespace std::literals::chrono_literals;
struct Stuff {
std::coroutine_handle<> h;
std::chrono::system_clock::time_point expiration;
Stuff* next;
};
struct Notification {
bool pending; // can only be true if stuff is nullptr
Stuff* stuff;
};
std::atomic<Stuff*> work = nullptr;
std::array<Notification, static_cast<size_t>(AwaitableType::kNumTypes)>
notifications = {};
2022-05-17 03:56:25 +00:00
} // namespace
void schedule(std::coroutine_handle<> h, int ms) {
InterruptLock lock;
TRACE(tracing::TraceEvent::kAsyncSchedule);
2022-05-17 03:56:25 +00:00
std::chrono::system_clock::time_point exp =
std::chrono::system_clock::now() + std::chrono::milliseconds(ms);
Stuff* news = new Stuff{
.h = h,
.expiration = exp,
};
Stuff* stuff = work;
if (!stuff || stuff->expiration > exp) {
news->next = stuff;
work = news;
return;
}
Stuff* s = stuff;
while (s->next && s->next->expiration <= exp) {
s = s->next;
}
news->next = s->next;
s->next = news;
}
void step() {
Stuff* stuff;
// ensure all previous side effects are visible
{
InterruptLock lock;
stuff = work;
};
if (stuff == nullptr) {
return;
}
auto now = std::chrono::system_clock::now();
auto dt = stuff->expiration - now;
if (dt > 0ms) {
return;
}
int stuffinqueue = 0;
for (Stuff* s = stuff; s; s = s->next) stuffinqueue++;
TRACE(tracing::TraceEvent::kAsyncTask);
stuff->h();
TRACE(tracing::TraceEvent::kAsyncTaskDone);
if (stuff->h.done()) {
stuff->h.destroy();
}
{
InterruptLock lock;
work = stuff->next;
}
delete stuff;
}
void reset() {
Stuff* stuff = work;
while (stuff) {
Stuff* byebye = stuff;
stuff = stuff->next;
delete byebye;
}
work = nullptr;
}
void main_loop(bool (*idle_function)()) {
2022-05-17 03:56:25 +00:00
while (1) {
if (idle_function != nullptr) {
if (idle_function()) {
reset();
break;
};
2022-05-17 03:56:25 +00:00
}
step();
2022-05-17 03:56:25 +00:00
}
}
void enqueue(std::coroutine_handle<> h, AwaitableType type) {
auto ttype = static_cast<size_t>(type);
{
InterruptLock lock;
TRACE(tracing::TraceEvent::kAsyncEnqueue);
const bool was_notified =
std::exchange(notifications[ttype].pending, false);
if (was_notified) {
TRACE(tracing::TraceEvent::kAsyncAwaitWasNotified);
schedule(h);
return;
}
Stuff* item = new Stuff{.h = h};
Stuff* stuff = notifications[ttype].stuff;
if (stuff == nullptr) {
notifications[ttype].stuff = item;
return;
}
while (stuff->next != nullptr) {
stuff = stuff->next;
}
stuff->next = item;
2022-05-17 03:56:25 +00:00
}
}
void resume(AwaitableType type) {
auto ttype = static_cast<size_t>(type);
Stuff* stuff = nullptr;
{
InterruptLock lock;
2022-05-17 03:56:25 +00:00
stuff = notifications[ttype].stuff;
if (stuff == nullptr) {
notifications[ttype].pending = true;
return;
}
notifications[ttype].stuff = stuff->next;
schedule(stuff->h);
}
2022-05-17 03:56:25 +00:00
delete stuff;
}
} // namespace async
#if 0
task<buffer> readline() {
int size = 0;
char c;
buffer buff = buffer::make(32);
// fcntl(0, F_SETFL, fcntl(0, F_GETFL) | O_NONBLOCK);
while (true) {
int n = read(0, &c, 1);
if (n < 1) {
co_await co_waitio();
continue;
}
buff.data[size++] = static_cast<std::byte>(c);
if (c == '\n') {
buff.data = buff.data.subspan(0, size);
co_return buff;
}
}
}
#endif // 0