#include "wrap-string.hpp" #include "wrap-vector.hpp" #include "world.hpp" #include "drivenengine.hpp" #include "luaconsole.hpp" #include "util.hpp" #include "printbuffer.hpp" #include class Client : public eng::opnew { public: int64_t actor_id_; SharedChannel channel_; UniqueWorld sync_; }; using UniqueClient = std::unique_ptr; using ClientVector = eng::vector; class LpxServer : public DrivenEngine { public: using StringVec = LuaConsole::StringVec; UniqueWorld master_; LuaConsole console_; ClientVector clients_; PrintChanneler print_channeler_; HttpChannelMap http_client_channels_; HttpChannelVec http_server_channels_; int64_t admin_id_; Gui gui_; public: virtual void event_init(int argc, char *argv[]) { // Create the master world model. master_.reset(new World(WORLD_TYPE_MASTER)); // Update the source code of the master model. master_->update_source(get_lua_source()); // Create an actor for administrative commands. admin_id_ = master_->create_login_actor(); // Print out admin ID for debugging purposes. stdostream() << "Admin actor id = " << admin_id_ << std::endl; // Enable listening on port 8085 (client connections) listen_port(8085); // Enable listening on port 8080 (http server connections) listen_port(8080); // Set the console prompt. // get_stdio_channel()->set_prompt(console_.get_prompt()); } void do_luainvoke_command(const util::StringVec &words) { master_->invoke(Invocation(Invocation::KIND_LUA, admin_id_, admin_id_, words[1])); } void do_luaprobe_command(const util::StringVec &words) { master_->snapshot(); stdostream() << master_->probe_lua(admin_id_, words[1]);; master_->rollback(); } void do_syntax_command(const util::StringVec &words) { stdostream() << "Syntax Error: " << words[1] << std::endl; } void do_menu_command(const StringVec &cmd) { int64_t place = sv::to_int64(cmd[1], admin_id_); master_->update_gui(admin_id_, place, &gui_); stdostream() << gui_.menu_debug_string(); } void do_choose_command(const StringVec &cmd) { eng::string action = gui_.get_action(sv::to_int64(cmd[1])); if (action == "") { stdostream() << "Invalid menu item #" << std::endl; return; } stdostream() << "Invoking plan: " << action << std::endl; master_->invoke(Invocation(Invocation::KIND_PLAN, admin_id_, gui_.place(), action)); } void do_tick_command(const util::StringVec &words) { master_->invoke(Invocation(Invocation::KIND_TICK, admin_id_, admin_id_, "")); } void do_cpl_command(const util::StringVec &words) { rescan_lua_source(); } void do_quit_command(const util::StringVec &words) { stop_driver(); } void do_aborthttp_command(const util::StringVec &words) { master_->abort_all_http_requests(425, "http requests aborted from the server command line"); } void do_command(const util::StringVec &words) { if (words.empty()) return; else if (words[0] == "luainvoke") do_luainvoke_command(words); else if (words[0] == "luaprobe") do_luaprobe_command(words); else if (words[0] == "syntax") do_syntax_command(words); else if (words[0] == "menu") do_menu_command(words); else if (words[0] == "choose") do_choose_command(words); else if (words[0] == "tick") do_tick_command(words); else if (words[0] == "cpl") do_cpl_command(words); else if (words[0] == "quit") do_quit_command(words); else if (words[0] == "aborthttp") do_aborthttp_command(words); else { stdostream() << "Unsupported command: " << words[0] << std::endl; } } void delete_client(UniqueClient &client) { stdostream() << "Client closed: actor id=" << client->actor_id_ << std::endl; client.reset(); } void send_diffs(UniqueClient &client) { StreamBuffer *sb = client->channel_->out(); sb->write_uint8(util::MSG_DIFF); sb->write_uint32(0); int64_t tw_1 = sb->total_writes(); //stdostream() << "Sending diffs to client " << client->actor_id_ << std::endl; client->sync_->diff_everything(client->actor_id_, master_.get(), sb); int64_t tw_2 = sb->total_writes(); sb->overwrite_int32(tw_1, tw_2 - tw_1); } bool handle_invocation(UniqueClient &client) { StreamBuffer *sb = client->channel_->in(); int64_t tr_before = sb->total_reads(); Invocation inv; try { uint8_t msg_type = sb->read_uint8(); if (msg_type != util::MSG_INVOKE) { delete_client(client); return false; } inv.deserialize(sb); } catch (const StreamEof &seof) { sb->unread_to(tr_before); return false; } catch (const StreamCorruption &scorr) { delete_client(client); return false; } if (inv.actor() != client->actor_id_) { stdostream() << "Ignoring invoke with wrong actor ID " << inv.actor() << std::endl; return true; } //stdostream() << "Invoking: " << inv.debug_string() << std::endl; master_->invoke(inv); client->channel_->out()->write_uint8(util::MSG_ACK); client->channel_->out()->write_uint32(0); client->sync_->invoke(inv); send_diffs(client); return true; } virtual void event_update() { // If the driver has reloaded the source, put it into master model. master_->update_source(get_lua_source()); // Check for keyboard input on stdin. while (true) { eng::string line = get_stdio_channel()->in()->readline(); if (line == "") break; console_.add(line); // get_stdio_channel()->set_prompt(console_.get_prompt()); do_command(console_.get_command()); } // Anything in the administrator printbuffer should go to stdostream. if (print_channeler_.channel(master_->get_printbuffer(admin_id_), stdostream())) { master_->invoke(print_channeler_.invocation(admin_id_)); } // Check for new incoming channels, set up client structures. while (true) { SharedChannel chan = new_incoming_channel(); if (chan == nullptr) break; if (chan->port() == 8085) { Client *client = new Client; client->actor_id_ = master_->create_login_actor(); client->channel_ = std::move(chan); client->sync_.reset(new World(WORLD_TYPE_PREDICTIVE)); client->sync_->create_login_actor(); clients_.emplace_back(client); stdostream() << "New client: actor id=" << client->actor_id_ << std::endl; send_diffs(clients_.back()); } else if (chan->port() == 8080) { HttpChannel htchan; htchan.channel_ = chan; http_server_channels_.push_back(htchan); } } // Traverse all existing channels, process any communication. for (UniqueClient &client : clients_) { if (client->channel_->closed()) { delete_client(client); continue; } // Check for received invocations. while (handle_invocation(client)); } util::remove_nullptrs(clients_); // Look for new outgoing HTTP client requests. for (const auto &pair : master_->http_requests()) { const HttpClientRequest &request = pair.second; HttpChannel &channel = http_client_channels_[request.request_id()]; if (channel.channel_ == nullptr) { channel.channel_ = new_outgoing_channel(request.target()); channel.method_ = request.method(); channel.parsed_bytes_ = 0; request.send(channel.channel_->out()); } } // Maintain existing outgoing HTTP client requests. HttpParserVec http_responses; for (auto &pair : http_client_channels_) { HttpChannel &htchan = pair.second; Channel &channel = *htchan.channel_; if (channel.closed() || (channel.in()->fill() > htchan.parsed_bytes_)) { HttpParser response; if (!channel.error().empty()) { response.fail(503, util::ss("Service Unavailable: ", channel.error())); } else { htchan.parsed_bytes_ = channel.in()->fill(); response.parse_response(channel.in()->view(), channel.closed(), htchan.method_); } if (response.complete()) { response.set_request_id(pair.first); http_responses.push_back(response); } } } for (const HttpParser &response : http_responses) { http_client_channels_.erase(response.request_id()); } master_->http_responses(http_responses); // Maintain incoming HTTP server channels. for (HttpChannel &htchan : http_server_channels_) { SharedChannel &chan = htchan.channel_; if (chan->in()->fill() > htchan.parsed_bytes_) { HttpParser parser; parser.parse_request(chan->in()->view(), chan->closed()); htchan.parsed_bytes_ = chan->in()->fill(); if (parser.complete()) { StreamBuffer *sb = chan->out(); HttpServerResponse resp = master_->http_serve(parser); resp.send(sb); htchan.channel_ = nullptr; } } } util::remove_marked_items(http_server_channels_); } }; DrivenEngineDefine("lpxserver", LpxServer);