#include "wrap-string.hpp" #include "wrap-vector.hpp" #include "world.hpp" #include "drivenengine.hpp" #include "luaconsole.hpp" #include "util.hpp" #include "printbuffer.hpp" #include class Client : public eng::opnew { public: int64_t actor_id_; SharedChannel channel_; UniqueWorld sync_; double last_diff_; bool async_diff_; }; using UniqueClient = std::unique_ptr; using ClientVector = eng::vector; class LpxServer : public DrivenEngine { public: using StringVec = LuaConsole::StringVec; UniqueWorld master_; LuaConsole console_; ClientVector clients_; PrintChanneler print_channeler_; HttpChannelMap http_client_channels_; HttpChannelVec http_server_channels_; int64_t admin_id_; Gui gui_; int next_diff_chan_; double next_tick_; public: virtual void event_init(std::string_view srcpk, int argc, char *argv[]) override { // Create the master world model. master_.reset(new World(WORLD_TYPE_MASTER)); // Update the source code of the master model. master_->update_source(srcpk); // Create an actor for administrative commands. admin_id_ = master_->create_login_actor(true); // TODO: initialize the admin actor. // Print out admin ID for debugging purposes. stdostream() << "Admin actor id = " << admin_id_ << std::endl; // Enable listening on port 8085 (client connections) listen_port(8085); // Enable listening on port 8080 (http server connections) listen_port(8080); // Set the console prompt. set_console_prompt(console_.get_prompt()); // Export stuff to the graphics engine. set_visible_world_and_actor(master_.get(), admin_id_); // for ticking. next_tick_ = 0.0; } void do_luainvoke_command(const util::StringVec &words) { master_->invoke(Invocation(Invocation::KIND_LUA, admin_id_, admin_id_, words[1])); } void do_luaprobe_command(const util::StringVec &words) { master_->snapshot(); stdostream() << master_->probe_lua(admin_id_, words[1]);; master_->rollback(); } void do_syntax_command(const util::StringVec &words) { stdostream() << "Syntax Error: " << words[1] << std::endl; } void do_menu_command(const StringVec &cmd) { int64_t place = sv::to_int64(cmd[1], admin_id_); master_->update_gui(admin_id_, place, &gui_); stdostream() << gui_.menu_debug_string(); } void do_choose_command(const StringVec &cmd) { stdostream() << "Chose menu item: " << cmd[1] << std::endl; eng::string action = gui_.get_action(sv::to_int64(cmd[1])); if (action == "") { stdostream() << "Invalid menu item #" << std::endl; return; } stdostream() << "Invoking plan: " << action << std::endl; master_->invoke(Invocation(Invocation::KIND_CHOOSE, admin_id_, gui_.place(), action)); } void do_tick_command(const util::StringVec &words) { master_->invoke(Invocation(Invocation::KIND_TICK, admin_id_, admin_id_, "")); } void do_cpl_command(const util::StringVec &words) { rescan_lua_source(); } void do_quit_command(const util::StringVec &words) { stop_driver(); } void do_aborthttp_command(const util::StringVec &words) { master_->abort_all_http_requests(425, "http requests aborted from the server command line"); } void do_command(const util::StringVec &words) { if (words.empty()) return; else if (words[0] == "luainvoke") do_luainvoke_command(words); else if (words[0] == "luaprobe") do_luaprobe_command(words); else if (words[0] == "syntax") do_syntax_command(words); else if (words[0] == "menu") do_menu_command(words); else if (words[0] == "choose") do_choose_command(words); else if (words[0] == "tick") do_tick_command(words); else if (words[0] == "cpl") do_cpl_command(words); else if (words[0] == "quit") do_quit_command(words); else if (words[0] == "aborthttp") do_aborthttp_command(words); else { stdostream() << "Unsupported command: " << words[0] << std::endl; } } void delete_client(UniqueClient &client) { stdostream() << "Client closed: actor id=" << client->actor_id_ << std::endl; client.reset(); } void send_diffs(UniqueClient &client) { StreamBuffer *sb = client->channel_->out(); sb->write_uint8(util::MSG_DIFF); sb->write_uint32(0); int64_t tw_1 = sb->total_writes(); //stdostream() << "Sending diffs to client " << client->actor_id_ << std::endl; client->sync_->diff_everything(client->actor_id_, master_.get(), sb); int64_t tw_2 = sb->total_writes(); sb->overwrite_int32(tw_1, tw_2 - tw_1); } bool handle_invocation(UniqueClient &client) { StreamBuffer *sb = client->channel_->in(); int64_t tr_before = sb->total_reads(); Invocation inv; try { uint8_t msg_type = sb->read_uint8(); if (msg_type != util::MSG_INVOKE) { delete_client(client); return false; } inv.deserialize(sb); } catch (const StreamEofOnRead &seof) { sb->unread_to(tr_before); return false; } catch (const StreamException &sexcept) { delete_client(client); return false; } // Security check. if (inv.actor() != client->actor_id_) { delete_client(client); return false; } //stdostream() << "Invoking: " << inv.debug_string() << std::endl; master_->invoke(inv); client->channel_->out()->write_uint8(util::MSG_ACK); client->channel_->out()->write_uint32(0); client->sync_->invoke(inv); client->async_diff_ = true; return true; } virtual void event_update() override { // Get the clock. double clock = get_clock(); // Execute any queued invocations. // We just feed these directly into the master model. eng::vector invocations = get_queued_invocations(); for (const UniqueInvocation &inv : invocations) { master_->invoke(*inv); } // Check for keyboard input on stdin. while (true) { eng::string line = get_stdio_channel()->in()->readline(); if (line == "") break; console_.add(line); set_console_prompt(console_.get_prompt()); do_command(console_.get_command()); } // Anything in the administrator printbuffer should go to stdostream. if (print_channeler_.channel(master_->get_printbuffer(admin_id_), stdostream())) { master_->invoke(print_channeler_.invocation(admin_id_)); } // Check for new incoming channels, set up client structures. while (true) { SharedChannel chan = new_incoming_channel(); if (chan == nullptr) break; if (chan->port() == 8085) { Client *client = new Client; client->actor_id_ = master_->create_login_actor(true); // TODO: initialize the login actor on the master. client->channel_ = std::move(chan); client->async_diff_ = true; client->last_diff_ = -100.0; client->sync_.reset(new World(WORLD_TYPE_PREDICTIVE)); // This login actor is never used, it is just to preserve the invariant that // the client model and the server synchronous model are identical. client->sync_->create_login_actor(false); clients_.emplace_back(client); stdostream() << "New client: actor id=" << client->actor_id_ << std::endl; } else if (chan->port() == 8080) { HttpChannel htchan; htchan.channel_ = chan; http_server_channels_.push_back(htchan); } } // If the clock has advanced far enough, tick the master model. if (clock >= next_tick_) { master_->invoke(Invocation(Invocation::KIND_TICK, 0, 0, "")); for (UniqueClient &client : clients_) { client->async_diff_ = true; } next_tick_ += 1.0; if (next_tick_ < clock + 0.3) next_tick_ = clock + 0.3; } // Traverse all existing channels, process any communication. for (UniqueClient &client : clients_) { if (client->channel_->closed()) { delete_client(client); continue; } // Check for received invocations. while (handle_invocation(client)); // Possibly send a diff. double diffdelay = 5.0; if (client->async_diff_) diffdelay = 0.1; if (clock >= client->last_diff_ + diffdelay) { send_diffs(client); client->last_diff_ = clock; client->async_diff_ = false; } } util::remove_nullptrs(clients_); // Look for new outgoing HTTP client requests. for (const auto &pair : master_->http_requests()) { const HttpClientRequest &request = pair.second; HttpChannel &channel = http_client_channels_[request.request_id()]; if (channel.channel_ == nullptr) { channel.channel_ = new_outgoing_channel(request.target()); channel.method_ = request.method(); channel.parsed_bytes_ = 0; request.send(channel.channel_->out()); } } // Maintain existing outgoing HTTP client requests. HttpParserVec http_responses; for (auto &pair : http_client_channels_) { HttpChannel &htchan = pair.second; Channel &channel = *htchan.channel_; if (channel.closed() || (channel.in()->fill() > htchan.parsed_bytes_)) { HttpParser response; if (!channel.error().empty()) { response.fail(503, util::ss("Service Unavailable: ", channel.error())); } else { htchan.parsed_bytes_ = channel.in()->fill(); response.parse_response(channel.in()->view(), channel.closed(), htchan.method_); } if (response.complete()) { response.set_request_id(pair.first); http_responses.push_back(response); } } } for (const HttpParser &response : http_responses) { http_client_channels_.erase(response.request_id()); } master_->http_responses(http_responses); // Maintain incoming HTTP server channels. for (HttpChannel &htchan : http_server_channels_) { SharedChannel &chan = htchan.channel_; if (chan->in()->fill() > htchan.parsed_bytes_) { HttpParser parser; parser.parse_request(chan->in()->view(), chan->closed()); htchan.parsed_bytes_ = chan->in()->fill(); if (parser.complete()) { StreamBuffer *sb = chan->out(); HttpServerResponse resp = master_->http_serve(parser); resp.send(sb); htchan.channel_ = nullptr; } } } util::remove_marked_items(http_server_channels_); } }; DrivenEngineDefine("lpxserver", LpxServer);