#include "wrap-string.hpp" #include "wrap-vector.hpp" #include "drivenengine.hpp" #include "world.hpp" #include "invocation.hpp" #include "util.hpp" #include "printbuffer.hpp" #include class LpxClient : public DrivenEngine { public: EngineWrapper *wrapper_; std::unique_ptr world_; InvocationQueue unack_; SharedChannel channel_; PrintChanneler print_channeler_; eng::vector delayed_invocations_; lua_State *lua_syntax_checker_; public: LpxClient(EngineWrapper *w) : wrapper_(w) { lua_syntax_checker_ = LuaCoreStack::newstate(eng::l_alloc); set_initial_state_standalone(); } ~LpxClient() { unexpose_world_to_driver(wrapper_); lua_close(lua_syntax_checker_); } void set_initial_state_connect(const eng::string &hostspec) { // Create the world model. unexpose_world_to_driver(wrapper_); world_.reset(new World(WORLD_TYPE_PREDICTIVE)); world_->expose_world_to_driver(wrapper_); // Create the communication channel. channel_ = new_outgoing_channel(hostspec); // This is a temporary actor that will be used only until the server sends // us the first difference transmission. We do this only to establish // the invariant that there's always an actor. When the first difference // transmission arrives, this actor may be deleted, or it may just be // ignored, at the server's discretion. actor_id_ = world_->create_login_actor(); // Clear the unack command queue. unack_.clear(); // Reset the print channeler print_channeler_.reset(); // Do not trigger lua source loading. rescan_lua_source(false); // Clear any saved invocations delayed_invocations_.clear(); } void set_initial_state_standalone() { // Create the world model. unexpose_world_to_driver(wrapper_); world_.reset(new World(WORLD_TYPE_MASTER)); world_->expose_world_to_driver(wrapper_); // Make sure the channel is empty. channel_.reset(); // Create the standalone actor. actor_id_ = world_->create_login_actor(); // Clear the unack command queue. unack_.clear(); // Reset the print channeler print_channeler_.reset(); // Trigger lua source loading. rescan_lua_source(true); // Clear any saved invocations delayed_invocations_.clear(); } // When the world is in synchronous mode, there's no // snapshot, and the commands in the unack queue are not // reflected in the world. In asynchronous mode, there is // a snapshot that allows return to the synchronous mode, // and the unack commands are in the world model. void world_to_synchronous() { if (!world_->snapshot_empty()) { world_->rollback(); } } void world_to_asynchronous() { if (!world_->is_authoritative()) { if (world_->snapshot_empty()) { world_->snapshot(); for (const Invocation &inv : unack_) { world_->invoke(inv); } } } } void abandon_server() { if (channel_) { set_initial_state_standalone(); } } void send_invocation(const Invocation &inv) { if (channel_ == nullptr) { world_->invoke(inv); } else { world_to_asynchronous(); world_->invoke(inv); unack_.push_back(inv); StreamBuffer *sb = channel_->out(); sb->write_uint8(util::MSG_INVOKE); inv.serialize(sb); } } void slash_command(std::string_view command) { util::dprint("Slash Command: ", command); // // set_initial_state_connect(util::ss("nocert:", hostname, ":8085")); } void change_actor_id(int64_t actor_id) { util::dprint("Actor ID changing: ", actor_id); print_channeler_.reset(); actor_id_ = actor_id; } void receive_ack_from_server(StreamBuffer *sb) { // An ack is just a single byte, so there's nothing left to read. if (unack_.empty()) { // Invalid acknowledgement when theres' nothing in the unack queue. abandon_server(); return; } world_to_synchronous(); world_->invoke(unack_.front()); unack_.pop_front(); } void receive_diff_from_server(StreamBuffer *sb) { world_to_synchronous(); try { DebugCollector dbc(""); int64_t nactor = world_->patch(sb, &dbc); if (nactor != actor_id_) change_actor_id(nactor); // dbc.dump(...); } catch (const StreamException &sexcept) { abandon_server(); return; } } bool receive_message_from_server(StreamBuffer *sb) { if (sb->fill() < 5) return false; int64_t tr_before = sb->total_reads(); uint8_t message_type = sb->read_uint8(); uint32_t message_body_len = sb->read_uint32(); if (sb->fill() < message_body_len) { sb->unread_to(tr_before); return false; } const char *message_body = sb->read_bytes(message_body_len); StreamBuffer body(std::string_view(message_body, message_body_len)); if (message_type == util::MSG_ACK) { receive_ack_from_server(&body); } else if (message_type == util::MSG_DIFF) { receive_diff_from_server(&body); } else { abandon_server(); return false; } if (!body.empty()) { abandon_server(); return false; } return true; } virtual void event_access(AccessKind kind, int64_t place_id, std::string_view datapk, StreamBuffer *retpk) override { if (place_id == 0) place_id = actor_id_; switch (kind) { case AccessKind::INVOKE_LUA_CALL: case AccessKind::INVOKE_LUA_EXPR: case AccessKind::INVOKE_FLUSH_PRINTS: case AccessKind::INVOKE_TICK: case AccessKind::INVOKE_LUA_SOURCE: { delayed_invocations_.emplace_back(kind, actor_id_, place_id, datapk); break; } case AccessKind::PROBE_LUA_CALL: { world_to_asynchronous(); world_->probe_lua_call(actor_id_, place_id, datapk, retpk); break; } case AccessKind::CONNECT_TO_SERVER: { set_initial_state_connect(util::ss("nocert:", datapk, ":8085")); break; } case AccessKind::VALIDATE_LUA_EXPR: { LuaVar closure; LuaExtStack LS(lua_syntax_checker_, closure); eng::string errmsg = LS.load(closure, datapk, "stdin"); retpk->write_bytes(errmsg); break; } case AccessKind::CHANNEL_PRINTS: { // If there's nothing new in the printbuffer, this is very fast. world_to_asynchronous(); if (print_channeler_.channel(world_->get_printbuffer(actor_id_), retpk)) { send_invocation(print_channeler_.invocation(actor_id_)); } set_have_prints(false); break; } case AccessKind::SLASH_COMMAND: { slash_command(datapk); break; } default: { util::dprint("Invalid event_access: ", int(kind)); } } } virtual void event_update() override { // Send invocations. We execute these using predictive execution. for (const Invocation &inv : delayed_invocations_) { send_invocation(inv); } delayed_invocations_.clear(); // Check for communication from server.. if (channel_ != nullptr) { if (channel_->closed()) { util::dprint("server closed connection: ", channel_->error()); abandon_server(); } else { while (true) { if (!receive_message_from_server(channel_->in())) break; if (channel_ == nullptr) break; } world_to_asynchronous(); } } set_have_prints(print_channeler_.have_prints(world_->get_printbuffer(actor_id_))); } }; DrivenEngineDefine("lpxclient", LpxClient);