#include "wrap-string.hpp" #include "wrap-vector.hpp" #include "drivenengine.hpp" #include "world.hpp" #include "luaconsole.hpp" #include "invocation.hpp" #include "util.hpp" #include "printbuffer.hpp" #include class LpxClient : public DrivenEngine, public CommonCommands { public: using StringVec = LuaConsole::StringVec; UniqueWorld world_; int64_t actor_id_; InvocationQueue unack_; SharedChannel channel_; LuaConsole console_; PrintChanneler print_channeler_; public: void set_initial_state_connect(const eng::string &hostspec) { // Create the world model. world_.reset(new World(WORLD_TYPE_PREDICTIVE)); // Create the communication channel. channel_ = new_outgoing_channel(hostspec); // This is a temporary actor that will be used only until the server sends // us the first difference transmission. We do this only to establish // the invariant that there's always an actor. When the first difference // transmission arrives, this actor may be deleted, or it may just be // ignored, at the server's discretion. actor_id_ = world_->create_login_actor(); // Clear the unack command queue. unack_.clear(); // Export stuff to the graphics engine. set_visible_world_and_actor(world_.get(), actor_id_); // Reset the print channeler print_channeler_.reset(); } void set_initial_state_standalone(std::string_view srcpk) { // Create the world model. world_.reset(new World(WORLD_TYPE_MASTER)); // Update the source code of the master model. world_->update_source(srcpk); // Make sure the channel is empty. channel_.reset(); // Create the standalone actor. actor_id_ = world_->create_login_actor(); // TODO: initialize the standalone actor. // Clear the unack command queue. unack_.clear(); // Export stuff to the graphics engine. set_visible_world_and_actor(world_.get(), actor_id_); // Reset the print channeler print_channeler_.reset(); } // When the world is in synchronous mode, there's no // snapshot, and the commands in the unack queue are not // reflected in the world. In asynchronous mode, there is // a snapshot that allows return to the synchronous mode, // and the unack commands are in the world model. void world_to_synchronous() { if (!world_->snapshot_empty()) { world_->rollback(); } } void world_to_asynchronous() { if (!world_->is_authoritative()) { if (world_->snapshot_empty()) { world_->snapshot(); for (const Invocation &inv : unack_) { world_->invoke(inv); } } } } void abandon_server() { // When we abandon the server, we leave the world model // hanging around to preserve the invariant that there // is always a world model. Then, we trigger a rescan // of the lua source. When the lua source shows up, then // we will create a standalone model to replace the client // model. channel_.reset(); rescan_lua_source(); } virtual void event_init(std::string_view srcpk, int argc, char *argv[]) { // Put the world into the starting state. set_initial_state_standalone(srcpk); // Set the console prompt set_console_prompt(console_.get_prompt()); } void send_invocation(const Invocation &inv) { if (channel_ == nullptr) { if ((!world_->is_authoritative()) && (inv.kind() == InvocationKind::LUA_SOURCE)) { // We have a client model, but no client connection. That means we're // in the process of shutting down a client model. The client model // is supposed to linger until the lua source is reread. Once we have // the lua source, we're supposed to throw out the client model and // create a standalone model. set_initial_state_standalone(inv.datapack()); } else { world_->invoke(inv); } } else { world_to_asynchronous(); world_->invoke(inv); unack_.push_back(inv); StreamBuffer *sb = channel_->out(); sb->write_uint8(util::MSG_INVOKE); inv.serialize(sb); } } virtual void do_syntax_error(std::string_view error) override { stdostream() << "Syntax error: " << error << std::endl; } virtual void do_unknown_command(std::string_view name) override { stdostream() << "Unknown command: " << name << std::endl; } virtual void do_view_command() override { stdostream() << world_->tangibles_near_debug_string(actor_id_, 1000); } virtual void do_moveto_command(int x, int y) override { do_unknown_command("moveto"); } virtual void do_quit_command() override { abandon_server(); stop_driver(); } virtual void do_cpl_command() override { rescan_lua_source(); } virtual void do_work_command() override { do_unknown_command("work"); } virtual void do_display_command() override { do_unknown_command("display"); } virtual void do_aborthttp_command() override { do_unknown_command("aborthttp"); } virtual void do_connect_command(std::string_view hostname) override { set_initial_state_connect(util::ss("nocert:", hostname, ":8085")); } virtual void do_luainvoke_command(std::string_view cmd) override { send_invocation(Invocation(InvocationKind::LUA_EXPR, actor_id_, actor_id_, cmd)); } virtual void do_luaprobe_command(std::string_view cmd) override { world_to_asynchronous(); stdostream() << world_->probe_lua_expr(actor_id_, cmd); world_to_synchronous(); } void change_actor_id(int64_t actor_id) { stdostream() << "Actor ID changing: " << actor_id << std::endl; print_channeler_.reset(); actor_id_ = actor_id; set_visible_world_and_actor(world_.get(), actor_id_); } void receive_ack_from_server(StreamBuffer *sb) { // An ack is just a single byte, so there's nothing left to read. if (unack_.empty()) { // Invalid acknowledgement when theres' nothing in the unack queue. abandon_server(); return; } world_to_synchronous(); world_->invoke(unack_.front()); unack_.pop_front(); } void receive_diff_from_server(StreamBuffer *sb) { world_to_synchronous(); try { DebugCollector dbc(""); int64_t nactor = world_->patch_everything(sb, &dbc); if (nactor != actor_id_) change_actor_id(nactor); dbc.dump(stdostream()); } catch (const StreamException &sexcept) { abandon_server(); return; } } bool receive_message_from_server(StreamBuffer *sb) { if (sb->fill() < 5) return false; int64_t tr_before = sb->total_reads(); uint8_t message_type = sb->read_uint8(); uint32_t message_body_len = sb->read_uint32(); if (sb->fill() < message_body_len) { sb->unread_to(tr_before); return false; } const char *message_body = sb->read_bytes(message_body_len); StreamBuffer body(std::string_view(message_body, message_body_len)); if (message_type == util::MSG_ACK) { receive_ack_from_server(&body); } else if (message_type == util::MSG_DIFF) { receive_diff_from_server(&body); } else { abandon_server(); return false; } if (!body.empty()) { abandon_server(); return false; } return true; } virtual void event_update() { // Send invocations. We execute these using predictive execution. eng::vector invocations = get_queued_invocations(); for (const UniqueInvocation &inv : invocations) { send_invocation(*inv); } // Check for keyboard input on stdin. while (true) { eng::string line = get_stdio_channel()->in()->readline(); if (line == "") break; console_.add(line); set_console_prompt(console_.get_prompt()); do_command(console_.get_command()); } // Check for communication from server.. if (channel_ != nullptr) { if (channel_->closed()) { stdostream() << "server closed connection: " << channel_->error() << std::endl; abandon_server(); } else { while (true) { if (!receive_message_from_server(channel_->in())) break; if (channel_ == nullptr) break; } world_to_asynchronous(); } } // Channel print statements. if (print_channeler_.channel(world_->get_printbuffer(actor_id_), stdostream())) { send_invocation(print_channeler_.invocation(actor_id_)); } } }; DrivenEngineDefine("lpxclient", LpxClient);