Machinery for blocking http requests in place, except actual networking

This commit is contained in:
2022-05-03 00:36:11 -04:00
parent d28c588468
commit c41d0522df
9 changed files with 381 additions and 80 deletions

View File

@@ -393,6 +393,66 @@ void World::update_source(const util::LuaSourceVec &source) {
assert(stack_is_clear());
}
void World::http_response(int64_t request_id, const HttpInResponse &response) {
// Find the request.
auto iter = http_requests_.find(request_id);
if (iter == http_requests_.end()) {
return;
}
HttpOutRequest request = iter->second;
http_requests_.erase(iter);
// Get the place and thread as lua objects.
LuaVar tangibles, place, mt, threads, thinfo, thread;
LuaStack LS(state(), tangibles, place, mt, threads, thinfo, thread);
LS.rawget(tangibles, LuaRegistry, "tangibles");
LS.rawget(place, tangibles, request.place_id());
if (!LS.istable(place)) {
return;
}
LS.getmetatable(mt, place);
if (!LS.istable(mt)) {
return;
}
LS.rawget(threads, mt, "threads");
if (!LS.istable(threads)) {
return;
}
LS.rawget(thinfo, threads, request.thread_id());
if (!LS.istable(thinfo)) {
return;
}
LS.rawget(thread, thinfo, "thread");
if (!LS.isthread(thread)) {
return;
}
lua_State *CO = LS.ckthread(thread);
// Push the response onto the awakening thread.
LuaRet responsetable;
LuaStack LSCO(CO, responsetable);
response.store(LSCO, responsetable);
// Clean up lua stacks.
LSCO.result();
LS.result();
assert(stack_is_clear());
// Awaken the thread, with its new return value.
schedule(0, request.thread_id(), request.place_id());
run_scheduled_threads();
}
void World::abort_all_http_requests(int status_code, std::string_view error) {
HttpInResponse abortresponse;
abortresponse.fail(status_code, error);
while (true) {
auto iter = http_requests_.begin();
if (iter == http_requests_.end()) break;
http_response(iter->second.request_id(), abortresponse);
}
}
void World::run_unittests() {
assert(stack_is_clear());
source_db_.run_unittests();
@@ -635,6 +695,30 @@ void World::invoke_lua_source(int64_t actor_id, int64_t place_id, const eng::str
}
}
void World::guard_blockable(lua_State *L, const char *fn) {
if (lthread_thread_id_ == 0) {
// in a probe, http.get throws an error.
luaL_error(L, "cannot %s in a probe", fn);
assert(false);
}
if (!is_authoritative()) {
// in a nonauth model, http.get is converted to nopredict.
lua_yield(L, 0);
luaL_error(L, "unexplained nopredict failure in %s", fn);
assert(false);
}
}
void World::guard_nopredict(lua_State *L, const char *fn) {
if (lthread_thread_id_ == 0) {
return;
}
if (!is_authoritative()) {
lua_yield(L, 0);
luaL_error(L, "unexplained nopredict failure in %s", fn);
}
}
void World::schedule(int64_t clk, int64_t thid, int64_t plid) {
assert(is_authoritative());
thread_sched_.add(clk, thid, plid);
@@ -796,6 +880,7 @@ void World::serialize(StreamBuffer *sb) {
id_global_pool_.serialize(sb);
sb->write_int64(clock_);
thread_sched_.serialize(sb);
http_requests_.serialize(sb);
sb->write_uint32(tangibles_.size());
for (const auto &p : tangibles_) {
sb->write_int64(p.first);
@@ -813,6 +898,7 @@ void World::deserialize(StreamBuffer *sb) {
id_global_pool_.deserialize(sb);
clock_ = sb->read_int64();
thread_sched_.deserialize(sb);
http_requests_.deserialize(sb);
// Mark all tangibles for deletion by setting ID to zero.
for (const auto &p : tangibles_) {
p.second->plane_item_.set_id(0);
@@ -837,6 +923,8 @@ void World::deserialize(StreamBuffer *sb) {
++iter;
}
}
// After a save and load, http requests no longer should exist
abort_all_http_requests(425, "http requests aborted by loading a save game");
assert(stack_is_clear());
}