From ac383c616f575165eefb6aa62256fa515c5cdcef Mon Sep 17 00:00:00 2001 From: jyelon Date: Mon, 10 Apr 2023 16:00:47 -0400 Subject: [PATCH] Finished code for globals, but something's not working with client/server --- luprex/cpp/core/invocation.cpp | 2 +- luprex/cpp/core/invocation.hpp | 2 +- luprex/cpp/core/lpxclient.cpp | 2 + luprex/cpp/core/lpxserver.cpp | 1 + luprex/cpp/core/world-accessor.cpp | 51 +++---------------- luprex/cpp/core/world-core.cpp | 50 ++++++++++++++++++- luprex/cpp/core/world-diffxmit.cpp | 78 +++++++++++++++++++++++++++++- luprex/cpp/core/world-testing.cpp | 58 ++++++++++++++++++++++ luprex/cpp/core/world.hpp | 41 +++++++++++++--- 9 files changed, 230 insertions(+), 55 deletions(-) diff --git a/luprex/cpp/core/invocation.cpp b/luprex/cpp/core/invocation.cpp index f2311152..d22ddec3 100644 --- a/luprex/cpp/core/invocation.cpp +++ b/luprex/cpp/core/invocation.cpp @@ -59,7 +59,7 @@ void Invocation::deserialize(StreamBuffer *sb) { data_.deserialize(sb); } -eng::string Invocation::debug_string() { +eng::string Invocation::debug_string() const { eng::ostringstream oss; oss << "inv["; switch (kind_) { diff --git a/luprex/cpp/core/invocation.hpp b/luprex/cpp/core/invocation.hpp index 49737950..e8c59af5 100644 --- a/luprex/cpp/core/invocation.hpp +++ b/luprex/cpp/core/invocation.hpp @@ -48,7 +48,7 @@ public: void serialize(StreamBuffer *sb) const; void deserialize(StreamBuffer *sb); - eng::string debug_string(); + eng::string debug_string() const; }; class InvocationQueue : public eng::deque { diff --git a/luprex/cpp/core/lpxclient.cpp b/luprex/cpp/core/lpxclient.cpp index 02d1d345..076023ea 100644 --- a/luprex/cpp/core/lpxclient.cpp +++ b/luprex/cpp/core/lpxclient.cpp @@ -95,6 +95,7 @@ public: StreamBuffer *sb = channel_->out(); sb->write_uint8(util::MSG_INVOKE); inv.serialize(sb); + util::dprint("Sent ", inv.debug_string()); } void send_lua_source(const util::LuaSourceVec &sv) { @@ -227,6 +228,7 @@ public: receive_ack_from_server(&body); } else if (message_type == util::MSG_DIFF) { receive_diff_from_server(&body); + util::dprintf("Executed diff from server"); } else { abandon_server(); return false; diff --git a/luprex/cpp/core/lpxserver.cpp b/luprex/cpp/core/lpxserver.cpp index 9bdec15b..c9f018de 100644 --- a/luprex/cpp/core/lpxserver.cpp +++ b/luprex/cpp/core/lpxserver.cpp @@ -150,6 +150,7 @@ public: delete_client(client); return false; } + util::dprint("Received ", inv.debug_string()); if (inv.actor() != client->actor_id_) { stdostream() << "Ignoring invoke with wrong actor ID " << inv.actor() << std::endl; return true; diff --git a/luprex/cpp/core/world-accessor.cpp b/luprex/cpp/core/world-accessor.cpp index 6430d3f3..6ca4444a 100644 --- a/luprex/cpp/core/world-accessor.cpp +++ b/luprex/cpp/core/world-accessor.cpp @@ -1,7 +1,6 @@ #include "world.hpp" #include "pprint.hpp" -#include "serializelua.hpp" #include static void tangible_getall(LuaCoreStack &LS0, LuaSlot list, const util::IdVector &idv) { @@ -854,47 +853,6 @@ LuaDefine(http_post, "request", return lfn_http_request(L, "POST"); } -void global_set(LuaCoreStack &LS0, const eng::string &gvar, LuaSlot value) { - lua_State *L = LS0.state(); - World *w = World::fetch_global_pointer(L); - LuaVar globaldb, copy; - LuaOldStack LS(L, globaldb, copy); - - // Serialize then deserialize the data, to produce a copy. - StreamBuffer sb; - eng::string error = serialize_lua(LS, value, &sb); - if (!error.empty()) { - luaL_error(L, "%s", error.c_str()); - return; - } - eng::string serialized(sb.view()); - error = deserialize_lua(LS, copy, &sb); - if (!error.empty()) { - luaL_error(L, "%s", error.c_str()); - return; - } - - // Store the copy in the globalDB. - LS.rawget(globaldb, LuaRegistry, "globaldb"); - LS.rawset(globaldb, gvar, copy); - - // Store the serialized blob in the master model. - w->gvar_to_serial_.emplace(gvar, serialized); - - // In an authoritative model only, update the assignment counters. - if (w->is_authoritative()) { - int64_t newassign = w->next_gvar_assign_++; - auto oldassigniter = w->gvar_to_assign_.find(gvar); - if (oldassigniter != w->gvar_to_assign_.end()) { - int64_t oldassign = oldassigniter->second; - w->assign_to_gvar_.erase(oldassign); - } - w->assign_to_gvar_.emplace(newassign, gvar); - w->gvar_to_assign_[gvar] = newassign; - } - LS.result(); -} - LuaDefine(global_set, "varname, value", "|Store data in the global data table." "|" @@ -943,7 +901,8 @@ LuaDefine(global_set, "varname, value", return LS.result(); } - global_set(LS, gvar, value); + World *w = World::fetch_global_pointer(L); + w->set_global(LS, gvar, value); return LS.result(); } @@ -994,7 +953,8 @@ LuaDefine(global_once, "varname", } LS.set(result, true); - global_set(LS, gvar, result); + World *w = World::fetch_global_pointer(L); + w->set_global(LS, gvar, result); return LS.result(); } @@ -1018,6 +978,7 @@ LuaDefine(global_clearonce, "varname", gvar += ":once"; LS.set(null, LuaNil); - global_set(LS, gvar, null); + World *w = World::fetch_global_pointer(L); + w->set_global(LS, gvar, null); return LS.result(); } diff --git a/luprex/cpp/core/world-core.cpp b/luprex/cpp/core/world-core.cpp index 2b467dfa..fb1f2543 100644 --- a/luprex/cpp/core/world-core.cpp +++ b/luprex/cpp/core/world-core.cpp @@ -6,6 +6,7 @@ #include "traceback.hpp" #include "pprint.hpp" #include "util.hpp" +#include "serializelua.hpp" #include @@ -75,7 +76,7 @@ World::World(WorldType wt) { clock_ = 0; // Initialize global variable state. - next_gvar_assign_ = 1; + assign_seqno_ = 1; // There shouldn't be any lua errors in the sourceDB at this // point, since there's no lua code in the sourceDB. @@ -959,6 +960,53 @@ std::ostream *World::lthread_print_stream() const { } } +void World::set_global(LuaCoreStack &LS0, const eng::string &gvar, LuaSlot value) { + lua_State *L = LS0.state(); + LuaVar globaldb, copy; + LuaExtStack LS(L, globaldb, copy); + + // Serialize then deserialize the data, to produce a copy. + StreamBuffer sb; + eng::string error = serialize_lua(LS, value, &sb); + if (!error.empty()) { + luaL_error(L, "%s", error.c_str()); + return; + } + eng::string serialized(sb.view()); + error = deserialize_lua(LS, copy, &sb); + if (!error.empty()) { + luaL_error(L, "%s", error.c_str()); + return; + } + + // Store the copy in the globalDB. + LS.rawget(globaldb, LuaRegistry, "globaldb"); + LS.rawset(globaldb, gvar, copy); + + // Store the serialized blob. + gvname_to_serial_[gvar] = serialized; + + // Implement the tracking so that we can rapidly determine which global + // variables need to be difference transmitted. + // + // In the master model, we generate a sequence number for the assignment. + // We store the mapping from global variable name to that sequence number + // and vice versa. + // + // On the client side, we just record the global variable in a list + // of recently modified globals. + // + if (is_authoritative()) { + int64_t &seqno = gvname_to_seqno_[gvar]; + seqno_to_gvname_.erase(seqno); + seqno = assign_seqno_++; + seqno_to_gvname_[seqno] = gvar; + } else { + gvname_modified_.insert(gvar); + } +} + + void World::serialize(StreamBuffer *sb) { assert(stack_is_clear()); assert(redirects_.empty()); diff --git a/luprex/cpp/core/world-diffxmit.cpp b/luprex/cpp/core/world-diffxmit.cpp index ce2067d4..cee9ceba 100644 --- a/luprex/cpp/core/world-diffxmit.cpp +++ b/luprex/cpp/core/world-diffxmit.cpp @@ -1,5 +1,6 @@ #include "world.hpp" +#include "serializelua.hpp" util::IdVector World::get_visible_union(int64_t actor_id, World *master) { return util::sort_union_id_vectors( @@ -99,7 +100,7 @@ void World::diff_visible(const util::IdVector &visible, World *master, // For each tangible that exists in the master, but not // in the synchronous model, send a create message. tsb.write_int32(0); - int count_pos = tsb.total_writes(); + int64_t count_pos = tsb.total_writes(); int count = 0; for (int i = 0; i < int(svis.size()); i++) { const Tangible *mt = mvis[i]; @@ -305,6 +306,79 @@ void World::diff_source(World *master, StreamBuffer *sb) { assert(tsb.empty()); } +const eng::string &World::get_gvname_serial(const eng::string &gvar) { + static eng::string empty; + auto iter = gvname_to_serial_.find(gvar); + if (iter == gvname_to_serial_.end()) { + return empty; + } else { + return iter->second; + } +} + +void World::patch_globals(StreamBuffer *sb, DebugCollector *dbc) { + DebugBlock dbb(dbc, "patch_globals"); + int64_t seqno = sb->read_int64(); + int32_t total = sb->read_int32(); + if (total > 0) { + lua_State *L = state(); + LuaVar globaldb, copy; + LuaExtStack LS(L, globaldb, copy); + LS.rawget(globaldb, LuaRegistry, "globaldb"); + for (int i = 0; i < total; i++) { + eng::string gvar = sb->read_string(); + eng::string serial = sb->read_string(); + gvname_to_serial_[gvar] = serial; + StreamBuffer sb(serial); + eng::string error = deserialize_lua(LS, copy, &sb); + if (error.empty()) { + LS.rawset(globaldb, gvar, copy); + } else { + DebugLine(dbc) << "Invalid global serialized data: " << gvar << ":" << error; + } + } + } + assign_seqno_ = seqno; + gvname_to_seqno_.clear(); + seqno_to_gvname_.clear(); + gvname_modified_.clear(); +} + +void World::diff_globals(World *master, StreamBuffer *sb) { + StreamBuffer tsb; + tsb.write_int64(master->assign_seqno_); + tsb.write_int32(0); + int64_t count_pos = tsb.total_writes(); + int32_t total_mods = 0; + for (const eng::string &gvar : gvname_modified_) { + const eng::string &mval = master->get_gvname_serial(gvar); + const eng::string &sval = get_gvname_serial(gvar); + if (mval != sval) { + total_mods += 1; + tsb.write_string(gvar); + tsb.write_string(mval); + } + } + auto iter = master->seqno_to_gvname_.lower_bound(assign_seqno_); + while (iter != master->seqno_to_gvname_.end()) { + const auto &gvar = iter->second; + if (gvname_modified_.find(gvar) == gvname_modified_.end()) { + const eng::string &mval = master->get_gvname_serial(gvar); + const eng::string &sval = get_gvname_serial(gvar); + if (mval != sval) { + total_mods += 1; + tsb.write_string(gvar); + tsb.write_string(mval); + } + } + iter++; + } + tsb.overwrite_int32(count_pos, total_mods); + tsb.copy_into(sb); + patch_globals(&tsb, nullptr); + assert(tsb.empty()); +} + int64_t World::patch_everything(StreamBuffer *sb, DebugCollector *dbc) { DebugBlock dbb(dbc, "patch_everything"); int64_t actor_id = patch_actor(sb, dbc); @@ -312,6 +386,7 @@ int64_t World::patch_everything(StreamBuffer *sb, DebugCollector *dbc) { patch_luatabs(sb, dbc); patch_tanclass(sb, dbc); patch_source(sb, dbc); + patch_globals(sb, dbc); return actor_id; } @@ -322,4 +397,5 @@ void World::diff_everything(int64_t actor_id, World *master, StreamBuffer *sb) { diff_luatabs(actor_id, master, sb); diff_tanclass(actor_id, master, sb); diff_source(master, sb); + diff_globals(master, sb); } diff --git a/luprex/cpp/core/world-testing.cpp b/luprex/cpp/core/world-testing.cpp index 3560aac7..3587a557 100644 --- a/luprex/cpp/core/world-testing.cpp +++ b/luprex/cpp/core/world-testing.cpp @@ -1,5 +1,6 @@ #include "world.hpp" #include "pprint.hpp" +#include "json.hpp" #include void World::tangible_walkto(int64_t id, int64_t animid, float x, float y) { @@ -219,6 +220,32 @@ void World::tangible_set_class(int64_t id, const eng::string &c) const { LS.result(); } +void World::set_global_json(const eng::string &gvar, const eng::string &json) { + LuaVar decoded; + LuaExtStack LS(state(), decoded); + bool ok = json::decode(LS, decoded, json); + if (!ok) { + luaL_error(state(), "invalid json"); + return; + } + set_global(LS, gvar, decoded); +} + +eng::string World::get_global_json(const eng::string &gvar) { + LuaVar value, globaldb; + LuaExtStack LS(state(), globaldb, value); + LS.rawget(globaldb, LuaRegistry, "globaldb"); + LS.rawget(value, globaldb, gvar); + eng::string out; + eng::string error = json::encode(LS, value, out, false, 10000); + if (!error.empty()) { + luaL_error(state(), "%s", error.c_str()); + return ""; + } + return out; +} + + eng::string World::tangible_get_class(int64_t id) const { LuaVar tangibles, tan, meta, sclass; LuaOldStack LS(state(), tangibles, tan, meta, sclass); @@ -232,6 +259,8 @@ eng::string World::tangible_get_class(int64_t id) const { return result; } + + static bool worlds_identical(const UniqueWorld &w1, const UniqueWorld &w2) { StreamBuffer sbw1, sbw2; w1->serialize(&sbw1); @@ -450,3 +479,32 @@ LuaDefine(unittests_world4difftanclass, "", "some unit tests") { return 0; } + +LuaDefine(unittests_world5diffglobals, "", "some unit tests") { + UniqueWorld m(new World(WORLD_TYPE_MASTER)); + UniqueWorld ss(new World(WORLD_TYPE_PREDICTIVE)); + UniqueWorld cs(new World(WORLD_TYPE_PREDICTIVE)); + StreamBuffer sb; + + m->set_global_json("x", "3"); + ss->diff_globals(m.get(), &sb); + cs->patch_globals(&sb, nullptr); + LuaAssertStrEq(L, cs->get_global_json("x"), "3"); + + m->set_global_json("x", "4"); + m->set_global_json("x", "5"); + m->set_global_json("y", "6"); + ss->diff_globals(m.get(), &sb); + cs->patch_globals(&sb, nullptr); + LuaAssertStrEq(L, cs->get_global_json("x"), "5"); + LuaAssertStrEq(L, cs->get_global_json("y"), "6"); + + cs->set_global_json("x", "2"); + ss->set_global_json("x", "2"); + ss->diff_globals(m.get(), &sb); + cs->patch_globals(&sb, nullptr); + LuaAssertStrEq(L, cs->get_global_json("x"), "5"); + + return 0; +} + diff --git a/luprex/cpp/core/world.hpp b/luprex/cpp/core/world.hpp index 894c895e..67744725 100644 --- a/luprex/cpp/core/world.hpp +++ b/luprex/cpp/core/world.hpp @@ -287,6 +287,18 @@ public: std::ostream *lthread_print_stream() const; + // Set a global variable. + // + // This will throw lua errors if there's a problem. + // + void set_global(LuaCoreStack &LS0, const eng::string &gvar, LuaSlot value); + + // Get the serialized value of a global variable. + // + // This accessor is used during difference transmission. + // + const eng::string &get_gvname_serial(const eng::string &gvar); + // Allocate a single ID. // // The rules are as follows: @@ -397,7 +409,14 @@ public: // eng::string tangible_get_class(int64_t id) const; + // Store json in a global variable. + // + void set_global_json(const eng::string &gvar, const eng::string &json); + // Get a global variable as json. + // + eng::string get_global_json(const eng::string &gvar); + public: /////////////////////////////////////////////////////////// // @@ -442,6 +461,9 @@ public: void patch_source(StreamBuffer *sb, DebugCollector *dbc); void diff_source(World *master, StreamBuffer *sb); + void patch_globals(StreamBuffer *sb, DebugCollector *dbc); + void diff_globals(World *master, StreamBuffer *sb); + // This is the main entry point for difference transmission. // int64_t patch_everything(StreamBuffer *sb, DebugCollector *dbc); @@ -518,13 +540,20 @@ private: SourceDB source_db_; PlaneMap plane_map_; - // Lua Globals + // Lua Global Variables // - int64_t next_gvar_assign_; - eng::map gvar_to_assign_; - eng::map gvar_to_serial_; - eng::map assign_to_gvar_; - + // assign_seqno: sequence number generator for global variable assignments (master and client) + // gvname_to_serial: global variable name to serialized data. (master and client) + // gvname_to_seqno: global variable name to sequence number. (master only) + // seqno_to_gvname: sequence number to global variable name. (master only) + // gvname_modified: set of global variables recently locally modified. (client only) + // + int64_t assign_seqno_; + eng::map gvname_to_serial_; + eng::map gvname_to_seqno_; + eng::map seqno_to_gvname_; + eng::set gvname_modified_; + // Tangibles table. // eng::unordered_map tangibles_;