arm: async changes

- more tracing
- pending resume() notifications
- idle function returns bool (useful for testing)

may not build (more stuff coming in later commits)
This commit is contained in:
2022-06-19 09:32:09 +02:00
parent 77bc82d294
commit 15f2657254
5 changed files with 245 additions and 70 deletions

View File

@@ -3,6 +3,23 @@
#include <array>
#include <atomic>
#include <chrono>
#include <utility>
#include "trace.h"
#ifdef __x86_64__
#include <mutex>
struct InterruptLock {
static std::mutex m;
InterruptLock() { m.lock(); }
~InterruptLock() { m.unlock(); }
};
std::mutex InterruptLock::m;
#else // __x86_64__
#include "lock.h"
using tracing::trace;
#endif // __x86_64__
namespace async {
namespace {
@@ -16,13 +33,19 @@ struct Stuff {
Stuff* next;
};
struct Notification {
bool pending; // can only be true if stuff is nullptr
Stuff* stuff;
};
std::atomic<Stuff*> work;
std::array<std::atomic<Stuff*>, static_cast<size_t>(AwaitableType::kNumTypes)>
queues;
std::array<Notification, static_cast<size_t>(AwaitableType::kNumTypes)>
notifications = {};
} // namespace
void schedule(std::coroutine_handle<> h, int ms) {
TRACE(tracing::TraceEvent::kAsyncSchedule);
std::chrono::system_clock::time_point exp =
std::chrono::system_clock::now() + std::chrono::milliseconds(ms);
Stuff* news = new Stuff{
@@ -47,10 +70,12 @@ void schedule(std::coroutine_handle<> h, int ms) {
s->next = news;
}
void main_loop(void (*idle_function)()) {
void main_loop(bool (*idle_function)()) {
while (1) {
if (idle_function != nullptr) {
idle_function();
if (idle_function()) {
break;
};
}
Stuff* stuff = work;
if (stuff == nullptr) {
@@ -64,38 +89,65 @@ void main_loop(void (*idle_function)()) {
continue;
}
TRACE(tracing::TraceEvent::kAsyncTask);
stuff->h();
TRACE(tracing::TraceEvent::kAsyncTaskDone);
Stuff* oldstuff = stuff;
work = stuff->next;
if (stuff->h.done()) {
stuff->h.destroy();
}
{
InterruptLock lock;
delete oldstuff;
work = stuff->next;
}
delete stuff;
}
}
void enqueue(std::coroutine_handle<> h, AwaitableType type) {
Stuff* item = new Stuff{.h = h};
auto ttype = static_cast<size_t>(type);
Stuff* stuff = queues[ttype];
if (stuff == nullptr) {
queues[ttype] = item;
return;
{
InterruptLock lock;
TRACE(tracing::TraceEvent::kAsyncEnqueue);
const bool was_notified =
std::exchange(notifications[ttype].pending, false);
if (was_notified) {
TRACE(tracing::TraceEvent::kAsyncAwaitWasNotified);
schedule(h);
return;
}
Stuff* item = new Stuff{.h = h};
Stuff* stuff = notifications[ttype].stuff;
if (stuff == nullptr) {
notifications[ttype].stuff = item;
return;
}
while (stuff->next != nullptr) {
stuff = stuff->next;
}
stuff->next = item;
}
while (stuff->next != nullptr) {
stuff = stuff->next;
}
stuff->next = item;
}
void resume(AwaitableType type) {
auto ttype = static_cast<size_t>(type);
Stuff* stuff = queues[ttype];
if (stuff == nullptr) {
return;
}
Stuff* stuff = nullptr;
{
InterruptLock lock;
queues[ttype] = stuff->next;
schedule(stuff->h);
stuff = notifications[ttype].stuff;
if (stuff == nullptr) {
notifications[ttype].pending = true;
return;
}
notifications[ttype].stuff = stuff->next;
schedule(stuff->h);
}
delete stuff;
}