Overhaul of thread handling to support blocking functions other than wait

This commit is contained in:
2022-04-25 17:17:41 -04:00
parent bd389c7815
commit 9aec7c5299
15 changed files with 144 additions and 88 deletions

View File

@@ -64,7 +64,7 @@ World::World(util::WorldType wt) {
LS.rawset(LuaRegistry, "tangibles", LuaNewTable);
// Create the globaldb and oncedb in the registry.
if (util::world_type_authoritative(wt)) {
if ((wt == util::WORLD_TYPE_MASTER) || (wt == util::WORLD_TYPE_STANDALONE)) {
LS.rawset(LuaRegistry, "globaldb", LuaNewTable);
LS.rawset(LuaRegistry, "oncedb", LuaNewTable);
}
@@ -291,23 +291,20 @@ eng::string World::probe_lua(int64_t actor_id, const eng::string &lua) {
// Call the closure.
int top = lua_gettop(L);
lua_pushvalue(L, closure.index());
open_lthread_state(actor_id, actor_id, false, true);
status = traceback_pcall(L, 0, LUA_MULTRET);
open_lthread_state(actor_id, actor_id, 0, false, true);
eng::string msg = traceback_pcall(L, 0, LUA_MULTRET);
// If there's an error message, print it.
// Otherwise, pretty-print the results.
std::ostream *ostream = lthread_print_stream();
if (status == LUA_OK) {
if (msg.empty()) {
for (int i = top + 1; i <= lua_gettop(L); i++) {
LuaSpecial root(i);
pprint(LS, root, true, ostream);
(*ostream) << std::endl;
}
} else {
const char *msg = lua_tostring(L, -1);
if (msg == NULL) {
msg = "(error object is not a string)";
}
assert(msg != "attempt to yield from outside a coroutine");
(*ostream) << msg << std::endl;
}
@@ -363,13 +360,14 @@ void World::update_gui(int64_t actor_id, int64_t place_id, Gui *gui) {
lua_pushvalue(L, actor.index());
lua_pushvalue(L, place.index());
Gui::store_global_pointer(L, gui);
open_lthread_state(actor_id, place_id, false, false);
int status = traceback_pcall(L, 2, 0);
open_lthread_state(actor_id, place_id, 0, false, false);
eng::string msg = traceback_pcall(L, 2, 0);
close_lthread_state();
Gui::store_global_pointer(L, nullptr);
if (status != 0) {
if (!msg.empty()) {
gui->clear(0);
std::cerr << lua_tostring(L, -1);
assert(msg != "attempt to yield from outside a coroutine");
std::cerr << msg << std::endl;
LS.result();
return;
}
@@ -503,7 +501,7 @@ void World::invoke_lua(int64_t actor_id, int64_t place_id, const eng::string &ac
LS.newtable(thinfo);
LS.rawset(thinfo, "thread", thread);
LS.rawset(thinfo, "actorid", actor_id);
LS.rawset(thinfo, "nargs", 0);
LS.rawset(thinfo, "isnew", true);
LS.rawset(thinfo, "useppool", true);
LS.rawset(thinfo, "print", true);
@@ -596,7 +594,7 @@ void World::invoke_plan(int64_t actor_id, int64_t place_id, const eng::string &a
LS.newtable(thinfo);
LS.rawset(thinfo, "thread", thread);
LS.rawset(thinfo, "actorid", actor_id);
LS.rawset(thinfo, "nargs", 3); // actor, place, invdata
LS.rawset(thinfo, "isnew", true);
LS.rawset(thinfo, "useppool", true);
LS.rawset(thinfo, "print", false);
@@ -617,7 +615,7 @@ void World::invoke_plan(int64_t actor_id, int64_t place_id, const eng::string &a
}
void World::invoke_tick(int64_t actor_id, int64_t place_id, const eng::string &action, const InvocationData &data) {
if (!util::world_type_authoritative(world_type_)) {
if (!is_authoritative()) {
return;
}
clock_ += 1;
@@ -625,7 +623,7 @@ void World::invoke_tick(int64_t actor_id, int64_t place_id, const eng::string &a
}
void World::invoke_lua_source(int64_t actor_id, int64_t place_id, const eng::string &action, const InvocationData &data) {
if (!util::world_type_authoritative(world_type_)) {
if (!is_authoritative()) {
return;
}
// We need some kind of authentication here.
@@ -642,8 +640,8 @@ void World::invoke_lua_source(int64_t actor_id, int64_t place_id, const eng::str
void World::run_scheduled_threads() {
assert(stack_is_clear());
lua_State *L = state();
LuaVar tangibles, place, mt, threads, thinfo, actorid, nargs, useppool, thread, print;
LuaStack LS(L, tangibles, place, mt, threads, thinfo, actorid, nargs, useppool, thread, print);
LuaVar tangibles, place, mt, threads, thinfo, actorid, isnew, useppool, thread, print;
LuaStack LS(L, tangibles, place, mt, threads, thinfo, actorid, isnew, useppool, thread, print);
LS.rawget(tangibles, LuaRegistry, "tangibles");
while (thread_sched_.ready(clock_)) {
@@ -668,8 +666,8 @@ void World::run_scheduled_threads() {
if (!LS.isnumber(actorid)) {
continue;
}
LS.rawget(nargs, thinfo, "nargs");
if (!LS.isnumber(nargs)) {
LS.rawget(isnew, thinfo, "isnew");
if (!LS.isboolean(isnew)) {
continue;
}
LS.rawget(useppool, thinfo, "useppool");
@@ -683,35 +681,16 @@ void World::run_scheduled_threads() {
// Resume the coroutine.
lua_State *CO = LS.ckthread(thread);
open_lthread_state(LS.ckinteger(actorid), sched.place_id(), LS.ckboolean(useppool), true);
int status = lua_resume(CO, nullptr, LS.ckint(nargs));
open_lthread_state(LS.ckinteger(actorid), sched.place_id(), sched.thread_id(), LS.ckboolean(useppool), true);
int nargs = LS.ckboolean(isnew) ? (lua_gettop(CO) - 1) : lua_gettop(CO);
int status = lua_resume(CO, nullptr, nargs);
std::ostream *ostream = lthread_print_stream();
// Three possible outcomes: finished, yielded, or errored.
if (!util::world_type_authoritative(world_type_)) {
LS.rawset(threads, sched.thread_id(), LuaNil);
} else if (status == LUA_YIELD) {
// If there's nothing on the stack, infer that tangible.nopredict yielded.
if (lua_gettop(CO) == 0) {
LS.rawset(threads, sched.thread_id(), LuaNil);
}
// If there's a single number on the stack, infer that 'wait' yielded.
else if ((lua_gettop(CO) == 1) && (lua_isnumber(CO, 1))) {
lua_Number delay = lua_tonumber(CO, 1);
lua_settop(CO, 0);
LS.rawset(thinfo, "nargs", 0);
LS.rawset(thinfo, "useppool", false);
thread_sched_.add(clock_ + int64_t(delay), sched.thread_id(), sched.place_id());
}
// In any other case, generate an error and kill the coroutine.
else {
std::cerr << "Thread yielded incorrectly. Killing it." << std::endl;
LS.rawset(threads, sched.thread_id(), LuaNil);
}
} else if (status == LUA_OK) {
if (status == LUA_OK) {
// Successfully ran to completion. Print any return values.
// Remove from thread table.
LS.rawget(print, thinfo, "print");
LS.rawset(threads, sched.thread_id(), LuaNil);
LuaStack LSCO(CO);
if (LS.ckboolean(print)) {
for (int i = 1; i <= lua_gettop(CO); i++) {
@@ -719,12 +698,21 @@ void World::run_scheduled_threads() {
(*ostream) << std::endl;
}
}
LS.rawset(threads, sched.thread_id(), LuaNil);
} else if (status == LUA_YIELD) {
if (is_authoritative()) {
LS.rawset(thinfo, "isnew", false);
LS.rawset(thinfo, "useppool", false);
} else {
// In a nonauth model, a yield is converted to a 'nopredict'.
LS.rawset(threads, sched.thread_id(), LuaNil);
}
} else {
// Generated an error. Add a traceback, print, and kill the coroutine.
// Currently, the error is sent to the actor. That seems... not right in the long run.
traceback_coroutine(CO);
(*ostream) << lua_tostring(CO, -1);
if (is_authoritative()) {
traceback_coroutine(CO);
(*ostream) << lua_tostring(CO, -1);
}
LS.rawset(threads, sched.thread_id(), LuaNil);
}
close_lthread_state();
@@ -757,12 +745,14 @@ void World::clear_lthread_state() {
lthread_prints_.reset();
lthread_actor_id_ = 0;
lthread_place_id_ = 0;
lthread_thread_id_ = 0;
lthread_use_ppool_ = false;
}
void World::open_lthread_state(int64_t actor, int64_t place, bool ppool, bool prints) {
void World::open_lthread_state(int64_t actor, int64_t place, int64_t thread, bool ppool, bool prints) {
lthread_actor_id_ = actor;
lthread_place_id_ = place;
lthread_thread_id_ = thread;
lthread_use_ppool_ = ppool;
if (prints) {
lthread_prints_.reset(new eng::ostringstream);
@@ -780,8 +770,7 @@ void World::close_lthread_state() {
const eng::string &output = lthread_prints_->str();
Tangible *actor = tangible_get(lthread_actor_id_);
if (actor != nullptr) {
bool auth = util::world_type_authoritative(world_type_);
actor->print_buffer_.add_string(output, auth);
actor->print_buffer_.add_string(output, is_authoritative());
}
}
// Now clean up everything.