#include "wrap-string.hpp" #include "wrap-vector.hpp" #include "drivenengine.hpp" #include "world.hpp" #include "luaconsole.hpp" #include "invocation.hpp" #include "util.hpp" #include "printbuffer.hpp" #include class LpxClient : public DrivenEngine { public: using StringVec = LuaConsole::StringVec; UniqueWorld world_; int64_t actor_id_; InvocationQueue unack_; SharedChannel channel_; LuaConsole console_; PrintChanneler print_channeler_; Gui gui_; public: void set_initial_state() { // Create the world model. world_.reset(new World(util::WORLD_TYPE_C_SYNC)); // This is a temporary actor that will be used only until the server sends // us the first difference transmission. We do this only to establish // the invariant that there's always an actor. When the first difference // transmission arrives, this actor may be deleted, or it may just be // ignored, at the server's discretion. actor_id_ = world_->create_login_actor(); // Clear the unack command queue. unack_.clear(); } // When the world is in synchronous mode, there's no // snapshot, and the commands in the unack queue are not // reflected in the world. In asynchronous mode, there is // a snapshot that allows return to the synchronous mode, // and the unack commands are in the world model. void world_to_synchronous() { if (!world_->snapshot_empty()) { world_->rollback(); } } void world_to_asynchronous() { if (world_->snapshot_empty()) { world_->snapshot(); for (const Invocation &inv : unack_) { world_->invoke(inv); } } } void abandon_server() { stdostream() << "Abandoning server." << std::endl; // Put the world model back into a known-good state. set_initial_state(); // Disconnect from the server. channel_.reset(); } virtual void event_init(int argc, char *argv[]) { // Put the world into the starting state. set_initial_state(); // Establish a connection to the server. channel_ = new_outgoing_channel("nocert:localhost:8085"); // Set the console prompt get_stdio_channel()->set_prompt(console_.get_prompt()); // The driver loads the lua source automatically. // However, we don't need it. Throw it out. get_lua_source(); } void send_invocation(const Invocation &inv) { if (channel_ == nullptr) { stdostream() << "Cannot invoke any actions, not connected." << std::endl; return; } world_to_asynchronous(); world_->invoke(inv); unack_.push_back(inv); StreamBuffer *sb = channel_->out(); sb->write_uint8(util::MSG_INVOKE); inv.serialize(sb); } void send_lua_source(const util::LuaSourceVec &sv) { StreamBuffer serial; SourceDB::serialize_source(sv, &serial); eng::string sstr = serial.read_entire_contents(); Invocation inv(Invocation::KIND_LUA_SOURCE, actor_id_, actor_id_, sstr); send_invocation(inv); } void do_luainvoke_command(const StringVec &words) { send_invocation(Invocation(Invocation::KIND_LUA, actor_id_, actor_id_, words[1])); } void do_luaprobe_command(const StringVec &words) { world_to_asynchronous(); stdostream() << world_->probe_lua(actor_id_, words[1]); world_to_synchronous(); } void do_syntax_command(const StringVec &words) { stdostream() << "Syntax Error: " << words[1] << std::endl; } void do_view_command(const StringVec &cmd) { stdostream() << world_->tangibles_near_debug_string(actor_id_, 100); } void do_menu_command(const StringVec &cmd) { world_to_asynchronous(); int64_t place = util::strtoint(cmd[1], actor_id_); world_->update_gui(actor_id_, place, &gui_); stdostream() << gui_.menu_debug_string(); } void do_choose_command(const StringVec &cmd) { eng::string action = gui_.get_action(util::strtoint(cmd[1], -1)); if (action == "") { stdostream() << "Invalid menu item #" << std::endl; return; } stdostream() << "Invoking plan: " << action << std::endl; Invocation inv(Invocation::KIND_PLAN, actor_id_, gui_.place(), action); send_invocation(inv); } void do_tick_command(const util::StringVec &words) { send_invocation(Invocation(Invocation::KIND_TICK, actor_id_, actor_id_, "")); } void do_cpl_command(const util::StringVec &words) { rescan_lua_source(); } void do_work_command(const util::StringVec &words) { int reps = 10000; int64_t t1 = util::profiling_clock(); for (int i = 0; i < reps; i++) { world_to_synchronous(); world_to_asynchronous(); } int64_t t2 = util::profiling_clock(); stdostream() << "Snapshot/rollback took " << ((t2-t1)/reps) << " nanosec." << std::endl; } void do_quit_command(const util::StringVec &words) { abandon_server(); stop_driver(); } void do_command(const util::StringVec &words) { if (words.empty()) return; else if (words[0] == "luainvoke") do_luainvoke_command(words); else if (words[0] == "luaprobe") do_luaprobe_command(words); else if (words[0] == "syntax") do_syntax_command(words); else if (words[0] == "view") do_view_command(words); else if (words[0] == "menu") do_menu_command(words); else if (words[0] == "choose") do_choose_command(words); else if (words[0] == "tick") do_tick_command(words); else if (words[0] == "cpl") do_cpl_command(words); else if (words[0] == "work") do_work_command(words); else if (words[0] == "quit") do_quit_command(words); else { stdostream() << "Unsupported command: " << words[0] << std::endl; } } void change_actor_id(int64_t actor_id) { stdostream() << "Actor ID changing: " << actor_id << std::endl; print_channeler_.reset(); actor_id_ = actor_id; } void receive_ack_from_server(StreamBuffer *sb) { // An ack is just a single byte, so there's nothing left to read. if (unack_.empty()) { // Invalid acknowledgement when theres' nothing in the unack queue. abandon_server(); return; } world_to_synchronous(); world_->invoke(unack_.front()); unack_.pop_front(); } void receive_diff_from_server(StreamBuffer *sb) { world_to_synchronous(); try { DebugCollector dbc("patch_everything"); int64_t nactor = world_->patch_everything(sb, &dbc); if (nactor != actor_id_) change_actor_id(nactor); dbc.dump(stdostream()); } catch (const StreamEof &seof) { abandon_server(); return; } catch (const StreamCorruption &scorr) { abandon_server(); return; } } bool receive_message_from_server(StreamBuffer *sb) { if (sb->fill() < 5) return false; int64_t tr_before = sb->total_reads(); uint8_t message_type = sb->read_uint8(); uint32_t message_body_len = sb->read_uint32(); if (sb->fill() < message_body_len) { sb->unread_to(tr_before); return false; } StreamBuffer body(sb->read_bytes(message_body_len), message_body_len); if (message_type == util::MSG_ACK) { receive_ack_from_server(&body); } else if (message_type == util::MSG_DIFF) { receive_diff_from_server(&body); } else { abandon_server(); return false; } if (!body.empty()) { abandon_server(); return false; } return true; } virtual void event_update() { // Check for lua source code. If this returns non-null, // it is because somebody typed CPL. util::LuaSourcePtr lua_source = get_lua_source(); if (lua_source != nullptr) { send_lua_source(*lua_source); lua_source.reset(); } // Check for keyboard input on stdin. while (true) { eng::string line = get_stdio_channel()->in()->readline(); if (line == "") break; console_.add(line); get_stdio_channel()->set_prompt(console_.get_prompt()); do_command(console_.get_command()); } // Check for communication from server.. if (channel_ != nullptr) { if (channel_->closed()) { stdostream() << "server closed connection: " << channel_->error() << std::endl; abandon_server(); } else { while (true) { if (!receive_message_from_server(channel_->in())) break; if (channel_ == nullptr) break; } world_to_asynchronous(); } } // Channel print statements. if (print_channeler_.channel(world_->get_printbuffer(actor_id_), stdostream())) { if (channel_ != nullptr) { send_invocation(print_channeler_.invocation(actor_id_)); } } } }; DrivenEngineDefine("lpxclient", LpxClient);