Finished code for globals, but something's not working with client/server

This commit is contained in:
2023-04-10 16:00:47 -04:00
parent ef7d0ec365
commit ac383c616f
9 changed files with 230 additions and 55 deletions

View File

@@ -59,7 +59,7 @@ void Invocation::deserialize(StreamBuffer *sb) {
data_.deserialize(sb); data_.deserialize(sb);
} }
eng::string Invocation::debug_string() { eng::string Invocation::debug_string() const {
eng::ostringstream oss; eng::ostringstream oss;
oss << "inv["; oss << "inv[";
switch (kind_) { switch (kind_) {

View File

@@ -48,7 +48,7 @@ public:
void serialize(StreamBuffer *sb) const; void serialize(StreamBuffer *sb) const;
void deserialize(StreamBuffer *sb); void deserialize(StreamBuffer *sb);
eng::string debug_string(); eng::string debug_string() const;
}; };
class InvocationQueue : public eng::deque<Invocation> { class InvocationQueue : public eng::deque<Invocation> {

View File

@@ -95,6 +95,7 @@ public:
StreamBuffer *sb = channel_->out(); StreamBuffer *sb = channel_->out();
sb->write_uint8(util::MSG_INVOKE); sb->write_uint8(util::MSG_INVOKE);
inv.serialize(sb); inv.serialize(sb);
util::dprint("Sent ", inv.debug_string());
} }
void send_lua_source(const util::LuaSourceVec &sv) { void send_lua_source(const util::LuaSourceVec &sv) {
@@ -227,6 +228,7 @@ public:
receive_ack_from_server(&body); receive_ack_from_server(&body);
} else if (message_type == util::MSG_DIFF) { } else if (message_type == util::MSG_DIFF) {
receive_diff_from_server(&body); receive_diff_from_server(&body);
util::dprintf("Executed diff from server");
} else { } else {
abandon_server(); abandon_server();
return false; return false;

View File

@@ -150,6 +150,7 @@ public:
delete_client(client); delete_client(client);
return false; return false;
} }
util::dprint("Received ", inv.debug_string());
if (inv.actor() != client->actor_id_) { if (inv.actor() != client->actor_id_) {
stdostream() << "Ignoring invoke with wrong actor ID " << inv.actor() << std::endl; stdostream() << "Ignoring invoke with wrong actor ID " << inv.actor() << std::endl;
return true; return true;

View File

@@ -1,7 +1,6 @@
#include "world.hpp" #include "world.hpp"
#include "pprint.hpp" #include "pprint.hpp"
#include "serializelua.hpp"
#include <cmath> #include <cmath>
static void tangible_getall(LuaCoreStack &LS0, LuaSlot list, const util::IdVector &idv) { static void tangible_getall(LuaCoreStack &LS0, LuaSlot list, const util::IdVector &idv) {
@@ -854,47 +853,6 @@ LuaDefine(http_post, "request",
return lfn_http_request(L, "POST"); return lfn_http_request(L, "POST");
} }
void global_set(LuaCoreStack &LS0, const eng::string &gvar, LuaSlot value) {
lua_State *L = LS0.state();
World *w = World::fetch_global_pointer(L);
LuaVar globaldb, copy;
LuaOldStack LS(L, globaldb, copy);
// Serialize then deserialize the data, to produce a copy.
StreamBuffer sb;
eng::string error = serialize_lua(LS, value, &sb);
if (!error.empty()) {
luaL_error(L, "%s", error.c_str());
return;
}
eng::string serialized(sb.view());
error = deserialize_lua(LS, copy, &sb);
if (!error.empty()) {
luaL_error(L, "%s", error.c_str());
return;
}
// Store the copy in the globalDB.
LS.rawget(globaldb, LuaRegistry, "globaldb");
LS.rawset(globaldb, gvar, copy);
// Store the serialized blob in the master model.
w->gvar_to_serial_.emplace(gvar, serialized);
// In an authoritative model only, update the assignment counters.
if (w->is_authoritative()) {
int64_t newassign = w->next_gvar_assign_++;
auto oldassigniter = w->gvar_to_assign_.find(gvar);
if (oldassigniter != w->gvar_to_assign_.end()) {
int64_t oldassign = oldassigniter->second;
w->assign_to_gvar_.erase(oldassign);
}
w->assign_to_gvar_.emplace(newassign, gvar);
w->gvar_to_assign_[gvar] = newassign;
}
LS.result();
}
LuaDefine(global_set, "varname, value", LuaDefine(global_set, "varname, value",
"|Store data in the global data table." "|Store data in the global data table."
"|" "|"
@@ -943,7 +901,8 @@ LuaDefine(global_set, "varname, value",
return LS.result(); return LS.result();
} }
global_set(LS, gvar, value); World *w = World::fetch_global_pointer(L);
w->set_global(LS, gvar, value);
return LS.result(); return LS.result();
} }
@@ -994,7 +953,8 @@ LuaDefine(global_once, "varname",
} }
LS.set(result, true); LS.set(result, true);
global_set(LS, gvar, result); World *w = World::fetch_global_pointer(L);
w->set_global(LS, gvar, result);
return LS.result(); return LS.result();
} }
@@ -1018,6 +978,7 @@ LuaDefine(global_clearonce, "varname",
gvar += ":once"; gvar += ":once";
LS.set(null, LuaNil); LS.set(null, LuaNil);
global_set(LS, gvar, null); World *w = World::fetch_global_pointer(L);
w->set_global(LS, gvar, null);
return LS.result(); return LS.result();
} }

View File

@@ -6,6 +6,7 @@
#include "traceback.hpp" #include "traceback.hpp"
#include "pprint.hpp" #include "pprint.hpp"
#include "util.hpp" #include "util.hpp"
#include "serializelua.hpp"
#include <iostream> #include <iostream>
@@ -75,7 +76,7 @@ World::World(WorldType wt) {
clock_ = 0; clock_ = 0;
// Initialize global variable state. // Initialize global variable state.
next_gvar_assign_ = 1; assign_seqno_ = 1;
// There shouldn't be any lua errors in the sourceDB at this // There shouldn't be any lua errors in the sourceDB at this
// point, since there's no lua code in the sourceDB. // point, since there's no lua code in the sourceDB.
@@ -959,6 +960,53 @@ std::ostream *World::lthread_print_stream() const {
} }
} }
void World::set_global(LuaCoreStack &LS0, const eng::string &gvar, LuaSlot value) {
lua_State *L = LS0.state();
LuaVar globaldb, copy;
LuaExtStack LS(L, globaldb, copy);
// Serialize then deserialize the data, to produce a copy.
StreamBuffer sb;
eng::string error = serialize_lua(LS, value, &sb);
if (!error.empty()) {
luaL_error(L, "%s", error.c_str());
return;
}
eng::string serialized(sb.view());
error = deserialize_lua(LS, copy, &sb);
if (!error.empty()) {
luaL_error(L, "%s", error.c_str());
return;
}
// Store the copy in the globalDB.
LS.rawget(globaldb, LuaRegistry, "globaldb");
LS.rawset(globaldb, gvar, copy);
// Store the serialized blob.
gvname_to_serial_[gvar] = serialized;
// Implement the tracking so that we can rapidly determine which global
// variables need to be difference transmitted.
//
// In the master model, we generate a sequence number for the assignment.
// We store the mapping from global variable name to that sequence number
// and vice versa.
//
// On the client side, we just record the global variable in a list
// of recently modified globals.
//
if (is_authoritative()) {
int64_t &seqno = gvname_to_seqno_[gvar];
seqno_to_gvname_.erase(seqno);
seqno = assign_seqno_++;
seqno_to_gvname_[seqno] = gvar;
} else {
gvname_modified_.insert(gvar);
}
}
void World::serialize(StreamBuffer *sb) { void World::serialize(StreamBuffer *sb) {
assert(stack_is_clear()); assert(stack_is_clear());
assert(redirects_.empty()); assert(redirects_.empty());

View File

@@ -1,5 +1,6 @@
#include "world.hpp" #include "world.hpp"
#include "serializelua.hpp"
util::IdVector World::get_visible_union(int64_t actor_id, World *master) { util::IdVector World::get_visible_union(int64_t actor_id, World *master) {
return util::sort_union_id_vectors( return util::sort_union_id_vectors(
@@ -99,7 +100,7 @@ void World::diff_visible(const util::IdVector &visible, World *master,
// For each tangible that exists in the master, but not // For each tangible that exists in the master, but not
// in the synchronous model, send a create message. // in the synchronous model, send a create message.
tsb.write_int32(0); tsb.write_int32(0);
int count_pos = tsb.total_writes(); int64_t count_pos = tsb.total_writes();
int count = 0; int count = 0;
for (int i = 0; i < int(svis.size()); i++) { for (int i = 0; i < int(svis.size()); i++) {
const Tangible *mt = mvis[i]; const Tangible *mt = mvis[i];
@@ -305,6 +306,79 @@ void World::diff_source(World *master, StreamBuffer *sb) {
assert(tsb.empty()); assert(tsb.empty());
} }
const eng::string &World::get_gvname_serial(const eng::string &gvar) {
static eng::string empty;
auto iter = gvname_to_serial_.find(gvar);
if (iter == gvname_to_serial_.end()) {
return empty;
} else {
return iter->second;
}
}
void World::patch_globals(StreamBuffer *sb, DebugCollector *dbc) {
DebugBlock dbb(dbc, "patch_globals");
int64_t seqno = sb->read_int64();
int32_t total = sb->read_int32();
if (total > 0) {
lua_State *L = state();
LuaVar globaldb, copy;
LuaExtStack LS(L, globaldb, copy);
LS.rawget(globaldb, LuaRegistry, "globaldb");
for (int i = 0; i < total; i++) {
eng::string gvar = sb->read_string();
eng::string serial = sb->read_string();
gvname_to_serial_[gvar] = serial;
StreamBuffer sb(serial);
eng::string error = deserialize_lua(LS, copy, &sb);
if (error.empty()) {
LS.rawset(globaldb, gvar, copy);
} else {
DebugLine(dbc) << "Invalid global serialized data: " << gvar << ":" << error;
}
}
}
assign_seqno_ = seqno;
gvname_to_seqno_.clear();
seqno_to_gvname_.clear();
gvname_modified_.clear();
}
void World::diff_globals(World *master, StreamBuffer *sb) {
StreamBuffer tsb;
tsb.write_int64(master->assign_seqno_);
tsb.write_int32(0);
int64_t count_pos = tsb.total_writes();
int32_t total_mods = 0;
for (const eng::string &gvar : gvname_modified_) {
const eng::string &mval = master->get_gvname_serial(gvar);
const eng::string &sval = get_gvname_serial(gvar);
if (mval != sval) {
total_mods += 1;
tsb.write_string(gvar);
tsb.write_string(mval);
}
}
auto iter = master->seqno_to_gvname_.lower_bound(assign_seqno_);
while (iter != master->seqno_to_gvname_.end()) {
const auto &gvar = iter->second;
if (gvname_modified_.find(gvar) == gvname_modified_.end()) {
const eng::string &mval = master->get_gvname_serial(gvar);
const eng::string &sval = get_gvname_serial(gvar);
if (mval != sval) {
total_mods += 1;
tsb.write_string(gvar);
tsb.write_string(mval);
}
}
iter++;
}
tsb.overwrite_int32(count_pos, total_mods);
tsb.copy_into(sb);
patch_globals(&tsb, nullptr);
assert(tsb.empty());
}
int64_t World::patch_everything(StreamBuffer *sb, DebugCollector *dbc) { int64_t World::patch_everything(StreamBuffer *sb, DebugCollector *dbc) {
DebugBlock dbb(dbc, "patch_everything"); DebugBlock dbb(dbc, "patch_everything");
int64_t actor_id = patch_actor(sb, dbc); int64_t actor_id = patch_actor(sb, dbc);
@@ -312,6 +386,7 @@ int64_t World::patch_everything(StreamBuffer *sb, DebugCollector *dbc) {
patch_luatabs(sb, dbc); patch_luatabs(sb, dbc);
patch_tanclass(sb, dbc); patch_tanclass(sb, dbc);
patch_source(sb, dbc); patch_source(sb, dbc);
patch_globals(sb, dbc);
return actor_id; return actor_id;
} }
@@ -322,4 +397,5 @@ void World::diff_everything(int64_t actor_id, World *master, StreamBuffer *sb) {
diff_luatabs(actor_id, master, sb); diff_luatabs(actor_id, master, sb);
diff_tanclass(actor_id, master, sb); diff_tanclass(actor_id, master, sb);
diff_source(master, sb); diff_source(master, sb);
diff_globals(master, sb);
} }

View File

@@ -1,5 +1,6 @@
#include "world.hpp" #include "world.hpp"
#include "pprint.hpp" #include "pprint.hpp"
#include "json.hpp"
#include <cassert> #include <cassert>
void World::tangible_walkto(int64_t id, int64_t animid, float x, float y) { void World::tangible_walkto(int64_t id, int64_t animid, float x, float y) {
@@ -219,6 +220,32 @@ void World::tangible_set_class(int64_t id, const eng::string &c) const {
LS.result(); LS.result();
} }
void World::set_global_json(const eng::string &gvar, const eng::string &json) {
LuaVar decoded;
LuaExtStack LS(state(), decoded);
bool ok = json::decode(LS, decoded, json);
if (!ok) {
luaL_error(state(), "invalid json");
return;
}
set_global(LS, gvar, decoded);
}
eng::string World::get_global_json(const eng::string &gvar) {
LuaVar value, globaldb;
LuaExtStack LS(state(), globaldb, value);
LS.rawget(globaldb, LuaRegistry, "globaldb");
LS.rawget(value, globaldb, gvar);
eng::string out;
eng::string error = json::encode(LS, value, out, false, 10000);
if (!error.empty()) {
luaL_error(state(), "%s", error.c_str());
return "";
}
return out;
}
eng::string World::tangible_get_class(int64_t id) const { eng::string World::tangible_get_class(int64_t id) const {
LuaVar tangibles, tan, meta, sclass; LuaVar tangibles, tan, meta, sclass;
LuaOldStack LS(state(), tangibles, tan, meta, sclass); LuaOldStack LS(state(), tangibles, tan, meta, sclass);
@@ -232,6 +259,8 @@ eng::string World::tangible_get_class(int64_t id) const {
return result; return result;
} }
static bool worlds_identical(const UniqueWorld &w1, const UniqueWorld &w2) { static bool worlds_identical(const UniqueWorld &w1, const UniqueWorld &w2) {
StreamBuffer sbw1, sbw2; StreamBuffer sbw1, sbw2;
w1->serialize(&sbw1); w1->serialize(&sbw1);
@@ -450,3 +479,32 @@ LuaDefine(unittests_world4difftanclass, "", "some unit tests") {
return 0; return 0;
} }
LuaDefine(unittests_world5diffglobals, "", "some unit tests") {
UniqueWorld m(new World(WORLD_TYPE_MASTER));
UniqueWorld ss(new World(WORLD_TYPE_PREDICTIVE));
UniqueWorld cs(new World(WORLD_TYPE_PREDICTIVE));
StreamBuffer sb;
m->set_global_json("x", "3");
ss->diff_globals(m.get(), &sb);
cs->patch_globals(&sb, nullptr);
LuaAssertStrEq(L, cs->get_global_json("x"), "3");
m->set_global_json("x", "4");
m->set_global_json("x", "5");
m->set_global_json("y", "6");
ss->diff_globals(m.get(), &sb);
cs->patch_globals(&sb, nullptr);
LuaAssertStrEq(L, cs->get_global_json("x"), "5");
LuaAssertStrEq(L, cs->get_global_json("y"), "6");
cs->set_global_json("x", "2");
ss->set_global_json("x", "2");
ss->diff_globals(m.get(), &sb);
cs->patch_globals(&sb, nullptr);
LuaAssertStrEq(L, cs->get_global_json("x"), "5");
return 0;
}

View File

@@ -287,6 +287,18 @@ public:
std::ostream *lthread_print_stream() const; std::ostream *lthread_print_stream() const;
// Set a global variable.
//
// This will throw lua errors if there's a problem.
//
void set_global(LuaCoreStack &LS0, const eng::string &gvar, LuaSlot value);
// Get the serialized value of a global variable.
//
// This accessor is used during difference transmission.
//
const eng::string &get_gvname_serial(const eng::string &gvar);
// Allocate a single ID. // Allocate a single ID.
// //
// The rules are as follows: // The rules are as follows:
@@ -397,7 +409,14 @@ public:
// //
eng::string tangible_get_class(int64_t id) const; eng::string tangible_get_class(int64_t id) const;
// Store json in a global variable.
//
void set_global_json(const eng::string &gvar, const eng::string &json);
// Get a global variable as json.
//
eng::string get_global_json(const eng::string &gvar);
public: public:
/////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////
// //
@@ -442,6 +461,9 @@ public:
void patch_source(StreamBuffer *sb, DebugCollector *dbc); void patch_source(StreamBuffer *sb, DebugCollector *dbc);
void diff_source(World *master, StreamBuffer *sb); void diff_source(World *master, StreamBuffer *sb);
void patch_globals(StreamBuffer *sb, DebugCollector *dbc);
void diff_globals(World *master, StreamBuffer *sb);
// This is the main entry point for difference transmission. // This is the main entry point for difference transmission.
// //
int64_t patch_everything(StreamBuffer *sb, DebugCollector *dbc); int64_t patch_everything(StreamBuffer *sb, DebugCollector *dbc);
@@ -518,13 +540,20 @@ private:
SourceDB source_db_; SourceDB source_db_;
PlaneMap plane_map_; PlaneMap plane_map_;
// Lua Globals // Lua Global Variables
// //
int64_t next_gvar_assign_; // assign_seqno: sequence number generator for global variable assignments (master and client)
eng::map<eng::string, int64_t> gvar_to_assign_; // gvname_to_serial: global variable name to serialized data. (master and client)
eng::map<eng::string, eng::string> gvar_to_serial_; // gvname_to_seqno: global variable name to sequence number. (master only)
eng::map<int64_t, eng::string> assign_to_gvar_; // seqno_to_gvname: sequence number to global variable name. (master only)
// gvname_modified: set of global variables recently locally modified. (client only)
//
int64_t assign_seqno_;
eng::map<eng::string, eng::string> gvname_to_serial_;
eng::map<eng::string, int64_t> gvname_to_seqno_;
eng::map<int64_t, eng::string> seqno_to_gvname_;
eng::set<eng::string> gvname_modified_;
// Tangibles table. // Tangibles table.
// //
eng::unordered_map<int64_t, UniqueTangible> tangibles_; eng::unordered_map<int64_t, UniqueTangible> tangibles_;