Added class Schedule for threads

This commit is contained in:
2021-02-18 17:21:25 -05:00
parent d931191901
commit 36ff4eacae
5 changed files with 113 additions and 64 deletions

27
luprex/core/cpp/sched.cpp Normal file
View File

@@ -0,0 +1,27 @@
#include "sched.hpp"
bool SchedEntry::operator < (const SchedEntry &other) const {
if (clock_ < other.clock_) return true;
if (clock_ > other.clock_) return false;
if (thread_id_ < other.thread_id_) return true;
if (thread_id_ > other.thread_id_) return false;
if (place_id_ < other.place_id_) return true;
if (place_id_ > other.place_id_) return false;
return false;
}
void Schedule::add(int64_t clk, int64_t thid, int64_t plid) {
schedule_.insert(SchedEntry(clk, thid, plid));
}
bool Schedule::ready(int64_t clk) const {
return (!schedule_.empty()) && (schedule_.begin()->clock() <= clk);
}
SchedEntry Schedule::pop() {
SchedEntry result = *schedule_.begin();
schedule_.erase(schedule_.begin());
return result;
}

39
luprex/core/cpp/sched.hpp Normal file
View File

@@ -0,0 +1,39 @@
#ifndef SCHED_HPP
#define SCHED_HPP
#include <cstdint>
#include <set>
class SchedEntry {
private:
friend class Schedule;
int64_t clock_;
int64_t thread_id_;
int64_t place_id_;
public:
int64_t clock() const { return clock_; }
int64_t thread_id() const { return thread_id_; }
int64_t place_id() const { return place_id_; }
SchedEntry(int64_t clk, int64_t thid, int64_t plid) {
clock_ = clk;
thread_id_ = thid;
place_id_ = plid;
}
bool operator < (const SchedEntry &other) const;
};
class Schedule {
private:
std::set<SchedEntry> schedule_;
public:
void add(int64_t clk, int64_t thid, int64_t plid);
bool ready(int64_t clk) const;
SchedEntry pop();
};
#endif // SCHED_HPP

View File

@@ -261,73 +261,60 @@ void World::invoke_plan(int64_t actor_id, int64_t place_id, const std::string &a
// Push the thread's ID into the runnable thread queue,
// then run the thread queue.
enqueue_thread(tid, place_id);
run_thread_queue();
thread_sched_.add(0, tid, place_id);
run_scheduled_threads(0);
}
void World::enqueue_thread(int64_t tid, int64_t place_id) {
thread_queue_.insert(std::make_pair(tid, place_id));
}
void World::run_thread(int64_t tid, int64_t place_id) {
void World::run_scheduled_threads(int64_t clk) {
lua_State *L = state();
LuaVar tangibles, place, mt, threads, thread;
LuaStack LS(L, tangibles, place, mt, threads, thread);
LS.rawget(tangibles, LuaRegistry, "tangibles");
LS.rawget(place, tangibles, place_id);
if (!LS.istable(place)) {
LS.result();
return;
}
LS.getmetatable(mt, place);
if (!LS.istable(mt)) {
LS.result();
return;
}
LS.rawget(threads, mt, "threads");
if (!LS.istable(threads)) {
LS.result();
return;
}
LS.rawget(thread, threads, tid);
if (!LS.isthread(thread)) {
LS.result();
return;
}
// Resume the coroutine.
lua_State *CO = LS.ckthread(thread);
int status = lua_resume(CO, 3);
while (thread_sched_.ready(clk)) {
SchedEntry sched = thread_sched_.pop();
LS.rawget(place, tangibles, sched.place_id());
if (!LS.istable(place)) {
continue;
}
LS.getmetatable(mt, place);
if (!LS.istable(mt)) {
continue;
}
LS.rawget(threads, mt, "threads");
if (!LS.istable(threads)) {
continue;
}
LS.rawget(thread, threads, sched.thread_id());
if (!LS.isthread(thread)) {
continue;
}
// Three possible outcomes: finished, yielded, or errored.
if (status == LUA_YIELD) {
// When the wait statement yields, it yields the desired timestamp.
std::cerr << "Thread yield top = " << lua_gettop(CO) << std::endl;
LS.rawset(threads, tid, LuaNil);
} else if (status == 0) {
// Successfully ran to completion. Remove from thread table.
std::cerr << "Thread ran to completion." << std::endl;
LS.rawset(threads, tid, LuaNil);
} else {
// Transfer the error message from CO to L, and add a traceback.
LS.rawset(threads, tid, LuaNil);
traceback_coroutine(L, CO);
std::cerr << lua_tostring(L, -1);
// Resume the coroutine.
lua_State *CO = LS.ckthread(thread);
int status = lua_resume(CO, 3);
// Three possible outcomes: finished, yielded, or errored.
if (status == LUA_YIELD) {
// When the wait statement yields, it yields the desired timestamp.
std::cerr << "Thread yield top = " << lua_gettop(CO) << std::endl;
// TODO: Insert the thread back into thread_sched_.
LS.rawset(threads, sched.thread_id(), LuaNil);
} else if (status == 0) {
// Successfully ran to completion. Remove from thread table.
std::cerr << "Thread ran to completion." << std::endl;
LS.rawset(threads, sched.thread_id(), LuaNil);
} else {
// Transfer the error message from CO to L, and add a traceback.
LS.rawset(threads, sched.thread_id(), LuaNil);
traceback_coroutine(L, CO);
std::cerr << lua_tostring(L, -1);
}
}
LS.result();
}
void World::run_thread_queue() {
while (!thread_queue_.empty()) {
auto iter = thread_queue_.begin();
int64_t tid = iter->first;
int64_t place_id = iter->second;
run_thread(tid, place_id);
thread_queue_.erase(iter);
}
}
LuaDefine(tangible_get, "c") {
LuaArg id;
LuaRet database;

View File

@@ -6,6 +6,7 @@
#include "planemap.hpp"
#include "idalloc.hpp"
#include "animqueue.hpp"
#include "sched.hpp"
#include "source.hpp"
#include "gui.hpp"
#include "luasnap.hpp"
@@ -46,10 +47,6 @@ public:
class World {
public:
// A thread ID appended to a place ID.
//
using ThreadPair = std::pair<int64_t, int64_t>;
// A lua intepreter with snapshot function.
//
LuaSnap lua_snap_;
@@ -64,14 +61,12 @@ public:
PlaneMap plane_map_;
std::unordered_map<int64_t, Tangible> tangibles_;
// Thread queue.
// Thread schedule: should include every thread, except
// for the one currently-executing thread.
//
std::set<ThreadPair> thread_queue_;
void enqueue_thread(int64_t tid, int64_t place_id);
void run_thread_queue();
void run_thread(int64_t tid, int64_t place_id);
Schedule thread_sched_;
void run_scheduled_threads(int64_t clk);
public:
// Constructor.
//