From 051e6bc8bf3028c31db430933cc7f64d09da0771 Mon Sep 17 00:00:00 2001 From: Josh Yelon Date: Tue, 9 Nov 2021 16:27:39 -0500 Subject: [PATCH] Initial implementation of lpxclient --- luprex/core/cpp/lpxclient.cpp | 208 +++++++++++++++++++++++++++-- luprex/core/cpp/streambuffer.hpp | 17 ++- luprex/core/cpp/textgame.cpp | 12 +- luprex/core/cpp/util.hpp | 7 + luprex/core/cpp/world-core.cpp | 4 +- luprex/core/cpp/world-diffxmit.cpp | 5 +- luprex/core/cpp/world.hpp | 5 +- 7 files changed, 229 insertions(+), 29 deletions(-) diff --git a/luprex/core/cpp/lpxclient.cpp b/luprex/core/cpp/lpxclient.cpp index 7a981b6d..ece1f075 100644 --- a/luprex/core/cpp/lpxclient.cpp +++ b/luprex/core/cpp/lpxclient.cpp @@ -19,16 +19,57 @@ public: int64_t gui_place_; public: - virtual void event_init(int argc, char *argv[]) { + void set_initial_state() { // Create the world model. world_.reset(new World(util::WORLD_TYPE_C_SYNC)); + // Snapshot the initial state. + world_->snapshot(); + + // Clear the unack command queue. + unack_.clear(); + // This is a temporary actor that will be used only until the server sends // us the first difference transmission. We do this only to establish // the invariant that there's always an actor. When the first difference // transmission arrives, this actor may be deleted, or it may just be // ignored, at the server's discretion. actor_id_ = world_->create_login_actor(); + } + + + // When the world is in synchronous mode, there's no + // snapshot, and the commands in the unack queue are not + // reflected in the world. In asynchronous mode, there is + // a snapshot that allows return to the synchronous mode, + // and the unack commands are in the world model. + + void world_to_synchronous() { + if (!world_->snapshot_empty()) { + world_->rollback(); + } + } + + void world_to_asynchronous() { + if (world_->snapshot_empty()) { + world_->snapshot(); + for (const Invocation &inv : unack_) { + world_->invoke(inv); + } + } + } + + void abandon_server() { + // Put the world model back into a known-good state. + set_initial_state(); + + // Disconnect from the server. + channel_.reset(); + } + + virtual void event_init(int argc, char *argv[]) { + // Put the world into the starting state. + set_initial_state(); // Establish a connection to the server. channel_ = new_outgoing_channel("localhost:8085"); @@ -37,15 +78,164 @@ public: get_stdio_channel()->set_prompt(console_.get_prompt()); } - void do_command(const util::StringVec &words) { - if (words.empty()) return; - stdostream() << "Command: "; - for (const std::string &word : words) { - stdostream() << word << " "; + void send_invocation(const Invocation &inv) { + if (channel_ == nullptr) { + stdostream() << "Cannot invoke any actions, not connected." << std::endl; + return; + } + world_to_asynchronous(); + world_->invoke(inv); + unack_.push_back(inv); + StreamBuffer *sb = channel_->out(); + sb->write_uint8(util::MSG_INVOKE); + inv.serialize(sb); + } + + void do_lua_command(const StringVec &words) { + if (words.size() != 2) { + stdostream() << "lua command (lua) takes a single string" << std::endl; + return; + } + const std::string &exp = words[1]; + InvocationData dummyresult; + Invocation inv(Invocation::KIND_LUA, actor_id_, actor_id_, exp, dummyresult); + send_invocation(inv); + } + + void do_syntax_command(const StringVec &words) { + stdostream() << "Syntax Error: "; + for (int i = 1; i < int(words.size()); i++) { + stdostream() << words[i] << " "; } stdostream() << std::endl; } + void do_view_command(const StringVec &cmd) { + if (cmd.size() != 1) { + stdostream() << "view command takes no arguments" << std::endl; + return; + } + for (int64_t id : world_->get_near(actor_id_, 100, true)) { + const Tangible *tan = world_->tangible_get(id); + const AnimStep &aqback = tan->anim_queue_.back(); + stdostream() << id << ": " << aqback.graphic() << " " << aqback.plane() << " " << aqback.xyz().debug_string() << std::endl; + } + } + + void do_menu_command(const StringVec &cmd) { + int64_t id; + if (cmd.size() == 1) { + id = actor_id_; + } else if (cmd.size() == 2) { + id = util::strtoint(cmd[1], -1); + } else { + stdostream() << "menu command expects a tangible ID or defaults to actor_id" << std::endl; + return; + } + world_to_asynchronous(); + gui_place_ = id; + world_->update_gui(actor_id_, id, &gui_); + int index = 0; + for (const GuiElt &elt : gui_.elts()) { + stdostream() << index << " " << elt.label() << std::endl; + index += 1; + } + } + + void do_choose_command(const StringVec &cmd) { + int64_t index; + if (cmd.size() == 1) { + index = util::strtoint(cmd[0], -1); + } else { + stdostream() << "choose command consists of a single menu line number" << std::endl; + return; + } + const Gui::EltVec &elts = gui_.elts(); + if ((index < 0) || (index >= int(elts.size()))) { + stdostream() << "No menu item #" << index << std::endl; + return; + } + std::string action = elts[index].action(); + stdostream() << "Invoking plan: " << action << std::endl; + InvocationData dummyresult; + dummyresult["flavor"] = "chocolate"; + dummyresult["color"] = "blue"; + Invocation inv(Invocation::KIND_PLAN, actor_id_, gui_place_, action, dummyresult); + send_invocation(inv); + } + + void do_quit_command(const util::StringVec &words) { + if (words.size() != 1) { + stdostream() << "quit command takes no arguments" << std::endl; + return; + } + abandon_server(); + stop_driver(); + } + + void do_command(const util::StringVec &words) { + if (words.empty()) return; + else if (words[0] == "lua") do_lua_command(words); + else if (words[0] == "syntax") do_syntax_command(words); + else if (words[0] == "view") do_view_command(words); + else if (words[0] == "menu") do_menu_command(words); + else if (words[0] == "quit") do_quit_command(words); + else if (util::validinteger(words[0])) do_choose_command(words); + else { + stdostream() << "Unknown command: " << words[0] << std::endl; + } + } + + void receive_ack_from_server(StreamBuffer *sb) { + // An ack is just a single byte, so there's nothing left to read. + if (unack_.empty()) { + // Invalid acknowledgement when theres' nothing in the unack queue. + abandon_server(); + return; + } + world_to_synchronous(); + world_->invoke(unack_.front()); + unack_.pop_front(); + } + + void receive_diff_from_server(StreamBuffer *sb) { + world_to_synchronous(); + try { + actor_id_ = world_->patch_everything(sb); + } catch (const StreamEof &seof) { + abandon_server(); + return; + } catch (const StreamCorruption &scorr) { + abandon_server(); + return; + } + } + + bool receive_message_from_server(StreamBuffer *sb) { + if (sb->fill() < 5) return false; + int64_t tr_before = sb->total_reads(); + uint8_t message_type = sb->read_uint8(); + uint32_t message_body_len = sb->read_uint32(); + if (sb->fill() < message_body_len) { + sb->unread_to(tr_before); + return false; + } + StreamBuffer body(sb->read_bytes(message_body_len), message_body_len); + if (message_type == util::MSG_ACK) { + receive_ack_from_server(&body); + } else if (message_type == util::MSG_DIFF) { + receive_diff_from_server(&body); + } else { + abandon_server(); + return false; + } + if (!body.empty()) { + abandon_server(); + return false; + } + return true; + } + virtual void event_update() { // Check for keyboard input on stdin. while (true) { @@ -60,10 +250,10 @@ public: if (channel_ != nullptr) { if (channel_->closed()) { stdostream() << "Server closed connection " << channel_->error() << std::endl; - channel_.reset(); - // stop_driver(); + abandon_server(); } else { - // Implement reception of messages. + while (receive_message_from_server(channel_->in())); + world_to_asynchronous(); } } } diff --git a/luprex/core/cpp/streambuffer.hpp b/luprex/core/cpp/streambuffer.hpp index c92d13fc..6b647f2a 100644 --- a/luprex/core/cpp/streambuffer.hpp +++ b/luprex/core/cpp/streambuffer.hpp @@ -364,6 +364,14 @@ public: // Verify that the buffer is empty, if not, throw StreamCorruption. void verify_empty(); + // Make sure the specified number of bytes are available to read. + void check_available(int64_t bytes) { + int64_t avail = write_cursor_ - read_cursor_; + if (avail < bytes) { + throw StreamEof(); + } + } + // Rewind the read cursor to a previous position. void unread_to(int64_t total_reads); @@ -429,14 +437,6 @@ private: void wrote_space(int64_t bytes); - // Make sure the specified number of bytes are available to read. - void check_available(int64_t bytes) { - int64_t avail = write_cursor_ - read_cursor_; - if (avail < bytes) { - throw StreamEof(); - } - } - // Implementation for the overwrite_int functions. char *get_overwrite(int64_t size, int64_t write_count_after); @@ -446,5 +446,4 @@ private: friend int lfn_unittests_streambuffer(lua_State *L); }; - #endif // STREAMBUFFER_HPP diff --git a/luprex/core/cpp/textgame.cpp b/luprex/core/cpp/textgame.cpp index c8841d86..d9f65044 100644 --- a/luprex/core/cpp/textgame.cpp +++ b/luprex/core/cpp/textgame.cpp @@ -161,12 +161,12 @@ void TextGame::do_command(const StringVec &words) { if (words.empty()) return; else if (words[0] == "lua") do_lua_command(words); else if (words[0] == "syntax") do_syntax_command(words); - else if (words[0] == "v") do_view_command(words); - else if (words[0] == "m") do_menu_command(words); - else if (words[0] == "q") do_quit_command(words); - else if (words[0] == "s") do_snapshot_command(words); - else if (words[0] == "r") do_rollback_command(words); - else if (words[0] == "t") do_tick_command(words); + else if (words[0] == "view") do_view_command(words); + else if (words[0] == "menu") do_menu_command(words); + else if (words[0] == "quit") do_quit_command(words); + else if (words[0] == "snap") do_snapshot_command(words); + else if (words[0] == "roll") do_rollback_command(words); + else if (words[0] == "tick") do_tick_command(words); else if (util::validinteger(words[0])) do_choose_command(words); else { stdostream() << "Unknown command: " << words[0] << std::endl; diff --git a/luprex/core/cpp/util.hpp b/luprex/core/cpp/util.hpp index 3648ff93..92846b9e 100644 --- a/luprex/core/cpp/util.hpp +++ b/luprex/core/cpp/util.hpp @@ -22,6 +22,13 @@ enum WorldType { WORLD_TYPE_MASTER, }; +enum MessageType { + MSG_NULL, + MSG_DIFF, + MSG_ACK, + MSG_INVOKE, +}; + using StringVec = std::vector; using StringPair = std::pair; using LuaSourceVec = std::vector; diff --git a/luprex/core/cpp/world-core.cpp b/luprex/core/cpp/world-core.cpp index f4c39792..b4c910a7 100644 --- a/luprex/core/cpp/world-core.cpp +++ b/luprex/core/cpp/world-core.cpp @@ -732,12 +732,14 @@ void World::deserialize(StreamBuffer *sb) { } void World::snapshot() { - snapshot_.clear(); + assert(snapshot_.empty()); serialize(&snapshot_); + assert(!snapshot_.empty()); } void World::rollback() { assert(!snapshot_.empty()); deserialize(&snapshot_); + assert(snapshot_.empty()); } diff --git a/luprex/core/cpp/world-diffxmit.cpp b/luprex/core/cpp/world-diffxmit.cpp index e1011dec..db14bb08 100644 --- a/luprex/core/cpp/world-diffxmit.cpp +++ b/luprex/core/cpp/world-diffxmit.cpp @@ -277,12 +277,13 @@ void World::diff_source(World *master, StreamBuffer *sb) { source_db_.diff(master->source_db_, sb); } -void World::patch_everything(StreamBuffer *sb) { - patch_actor(sb); +int64_t World::patch_everything(StreamBuffer *sb) { + int64_t actor_id = patch_actor(sb); patch_visible(sb); patch_luatabs(sb); patch_tanclass(sb); patch_source(sb); + return actor_id; } void World::diff_everything(int64_t actor_id, World *master, StreamBuffer *sb) { diff --git a/luprex/core/cpp/world.hpp b/luprex/core/cpp/world.hpp index 62ecb997..c635e118 100644 --- a/luprex/core/cpp/world.hpp +++ b/luprex/core/cpp/world.hpp @@ -212,7 +212,8 @@ public: // void snapshot(); void rollback(); - + bool snapshot_empty() { return snapshot_.empty(); } + // Run any threads which according to the scheduler queue are ready. // void run_scheduled_threads(int64_t clk); @@ -366,7 +367,7 @@ public: // This is the main entry point for difference transmission. // - void patch_everything(StreamBuffer *sb); + int64_t patch_everything(StreamBuffer *sb); void diff_everything(int64_t actor, World *master, StreamBuffer *sb); public: