168 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			168 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| #include "async.h"
 | |
| 
 | |
| #include <array>
 | |
| #include <atomic>
 | |
| #include <chrono>
 | |
| #include <utility>
 | |
| 
 | |
| #include "lock.h"
 | |
| #include "trace.h"
 | |
| 
 | |
| namespace async {
 | |
| namespace {
 | |
| 
 | |
| using namespace std::literals::chrono_literals;
 | |
| 
 | |
| struct Stuff {
 | |
|     std::coroutine_handle<> h;
 | |
|     std::chrono::system_clock::time_point expiration;
 | |
| 
 | |
|     Stuff* next;
 | |
| };
 | |
| 
 | |
| struct Notification {
 | |
|     bool pending;  // can only be true if stuff is nullptr
 | |
|     Stuff* stuff;
 | |
| };
 | |
| 
 | |
| std::atomic<Stuff*> work = nullptr;
 | |
| std::array<Notification, static_cast<size_t>(AwaitableType::kNumTypes)>
 | |
|     notifications = {};
 | |
| 
 | |
| }  // namespace
 | |
| 
 | |
| void schedule(std::coroutine_handle<> h, int ms) {
 | |
|     InterruptLock lock;
 | |
|     TRACE(tracing::TraceEvent::kAsyncSchedule);
 | |
|     std::chrono::system_clock::time_point exp =
 | |
|         std::chrono::system_clock::now() + std::chrono::milliseconds(ms);
 | |
|     Stuff* news = new Stuff{
 | |
|         .h = h,
 | |
|         .expiration = exp,
 | |
|     };
 | |
| 
 | |
|     Stuff* stuff = work;
 | |
| 
 | |
|     if (!stuff || stuff->expiration > exp) {
 | |
|         news->next = stuff;
 | |
|         work = news;
 | |
|         return;
 | |
|     }
 | |
| 
 | |
|     Stuff* s = stuff;
 | |
|     while (s->next && s->next->expiration <= exp) {
 | |
|         s = s->next;
 | |
|     }
 | |
| 
 | |
|     news->next = s->next;
 | |
|     s->next = news;
 | |
| }
 | |
| 
 | |
| void step() {
 | |
|     Stuff* stuff;
 | |
|     // ensure all previous side effects are visible
 | |
|     {
 | |
|         InterruptLock lock;
 | |
|         stuff = work;
 | |
|     };
 | |
| 
 | |
|     if (stuff == nullptr) {
 | |
|         return;
 | |
|     }
 | |
| 
 | |
|     auto now = std::chrono::system_clock::now();
 | |
|     auto dt = stuff->expiration - now;
 | |
| 
 | |
|     if (dt > 0ms) {
 | |
|         return;
 | |
|     }
 | |
| 
 | |
|     int stuffinqueue = 0;
 | |
|     for (Stuff* s = stuff; s; s = s->next) stuffinqueue++;
 | |
| 
 | |
|     TRACE(tracing::TraceEvent::kAsyncTask);
 | |
|     stuff->h();
 | |
|     TRACE(tracing::TraceEvent::kAsyncTaskDone);
 | |
| 
 | |
|     if (stuff->h.done()) {
 | |
|         stuff->h.destroy();
 | |
|     }
 | |
| 
 | |
|     {
 | |
|         InterruptLock lock;
 | |
|         work = stuff->next;
 | |
|     }
 | |
|     delete stuff;
 | |
| }
 | |
| 
 | |
| void reset() {
 | |
|     Stuff* stuff = work;
 | |
|     while (stuff) {
 | |
|         Stuff* byebye = stuff;
 | |
|         stuff = stuff->next;
 | |
| 
 | |
|         delete byebye;
 | |
|     }
 | |
|     work = nullptr;
 | |
| }
 | |
| 
 | |
| void main_loop(bool (*idle_function)()) {
 | |
|     while (1) {
 | |
|         if (idle_function != nullptr) {
 | |
|             if (idle_function()) {
 | |
|                 reset();
 | |
|                 break;
 | |
|             };
 | |
|         }
 | |
| 
 | |
|         step();
 | |
|     }
 | |
| }
 | |
| 
 | |
| void enqueue(std::coroutine_handle<> h, AwaitableType type) {
 | |
|     auto ttype = static_cast<size_t>(type);
 | |
| 
 | |
|     {
 | |
|         InterruptLock lock;
 | |
|         TRACE(tracing::TraceEvent::kAsyncEnqueue);
 | |
| 
 | |
|         const bool was_notified =
 | |
|             std::exchange(notifications[ttype].pending, false);
 | |
|         if (was_notified) {
 | |
|             TRACE(tracing::TraceEvent::kAsyncAwaitWasNotified);
 | |
|             schedule(h);
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         Stuff* item = new Stuff{.h = h};
 | |
|         Stuff* stuff = notifications[ttype].stuff;
 | |
|         if (stuff == nullptr) {
 | |
|             notifications[ttype].stuff = item;
 | |
|             return;
 | |
|         }
 | |
|         while (stuff->next != nullptr) {
 | |
|             stuff = stuff->next;
 | |
|         }
 | |
|         stuff->next = item;
 | |
|     }
 | |
| }
 | |
| 
 | |
| void resume(AwaitableType type) {
 | |
|     auto ttype = static_cast<size_t>(type);
 | |
|     Stuff* stuff = nullptr;
 | |
|     {
 | |
|         InterruptLock lock;
 | |
| 
 | |
|         stuff = notifications[ttype].stuff;
 | |
|         if (stuff == nullptr) {
 | |
|             notifications[ttype].pending = true;
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         notifications[ttype].stuff = stuff->next;
 | |
|         schedule(stuff->h);
 | |
|     }
 | |
|     delete stuff;
 | |
| }
 | |
| 
 | |
| }  // namespace async
 |