#include "wrap-string.hpp" #include "wrap-vector.hpp" #include "world.hpp" #include "drivenengine.hpp" #include "luaconsole.hpp" #include "util.hpp" #include "printbuffer.hpp" #include "luastack.hpp" #include class Client : public eng::opnew { public: int64_t actor_id_; SharedChannel channel_; UniqueWorld sync_; double last_full_diff_; double last_mini_diff_; bool async_diff_; }; using UniqueClient = std::unique_ptr; using ClientVector = eng::vector; class LpxServer : public DrivenEngine, public CommonCommands { public: using StringVec = LuaConsole::StringVec; UniqueWorld master_; LuaConsole console_; ClientVector clients_; PrintChanneler print_channeler_; HttpChannelMap http_client_channels_; HttpChannelVec http_server_channels_; eng::vector delayed_invocations_; int64_t admin_id_ = 0; double next_tick_ = 0; lua_State *lua_syntax_checker_; public: LpxServer() { // Create a little lua interpreter for syntax checking only. lua_syntax_checker_ = LuaCoreStack::newstate(eng::l_alloc); // Create the master world model. master_.reset(new World(WORLD_TYPE_MASTER)); // Create the admin actor. Note: there isn't any 'init' function yet. admin_id_ = master_->create_login_actor(); // Print out admin ID for debugging purposes. util::dprint("Admin actor id = ", admin_id_); // Enable listening on port 8085 (client connections) listen_port(8085); // Enable listening on port 8080 (http server connections) listen_port(8080); // Set the console prompt. set_console_prompt(console_.get_prompt()); // Export stuff to the graphics engine. set_visible_world_and_actor(master_.get(), admin_id_); // Reset the print channeler. print_channeler_.reset(); // Trigger the loading of the lua source. rescan_lua_source(true); } ~LpxServer() { lua_close(lua_syntax_checker_); } virtual void do_syntax_error(std::string_view error) override { util::dprint("Syntax error: ", error, "\n"); } virtual void do_unknown_command(std::string_view name) override { util::dprint("Unknown command: ", name, "\n"); } virtual void do_view_command() override { util::dprint(master_->tangibles_near_debug_string(admin_id_, 1000)); } virtual void do_moveto_command(int x, int y) override { do_unknown_command("moveto"); } virtual void do_quit_command() override { stop_driver(); } virtual void do_cpl_command() override { rescan_lua_source(true); } virtual void do_work_command() override { do_unknown_command("work"); } virtual void do_display_command() override { do_unknown_command("display"); } virtual void do_aborthttp_command() override { master_->abort_all_http_requests(425, "http requests aborted from the server command line"); } virtual void do_connect_command(std::string_view hostname) override { do_unknown_command("connect"); } virtual void do_luainvoke_command(std::string_view cmd) override { master_->invoke(Invocation(AccessKind::INVOKE_LUA_EXPR, admin_id_, admin_id_, cmd)); } virtual void do_luaprobe_command(std::string_view cmd) override { master_->snapshot(); util::dprint(master_->probe_lua_expr(admin_id_, cmd)); master_->rollback(); } void delete_client(UniqueClient &client) { util::dprint("Client closed: actor id=", client->actor_id_); master_->disconnected(client->actor_id_); client.reset(); } void send_diffs(UniqueClient &client, bool full) { StreamBuffer *sb = client->channel_->out(); sb->write_uint8(util::MSG_DIFF); sb->write_uint32(0); int64_t tw_1 = sb->total_writes(); // util::dprint("Sending diffs to client ", client->actor_id_); client->sync_->diff(client->actor_id_, full, master_.get(), sb); int64_t tw_2 = sb->total_writes(); sb->overwrite_int32(tw_1, tw_2 - tw_1); } bool handle_invocation(UniqueClient &client) { StreamBuffer *sb = client->channel_->in(); int64_t tr_before = sb->total_reads(); Invocation inv; try { uint8_t msg_type = sb->read_uint8(); if (msg_type != util::MSG_INVOKE) { delete_client(client); return false; } inv.deserialize(sb); } catch (const StreamEofOnRead &seof) { sb->unread_to(tr_before); return false; } catch (const StreamException &sexcept) { delete_client(client); return false; } // Security check. if (inv.actor() != client->actor_id_) { delete_client(client); return false; } // util::dprint("Invoking: ", inv.debug_string()); master_->invoke(inv); client->channel_->out()->write_uint8(util::MSG_ACK); client->channel_->out()->write_uint32(0); client->sync_->invoke(inv); client->async_diff_ = true; return true; } virtual void event_access(AccessKind kind, int64_t place_id, std::string_view datapk, StreamBuffer *retpk) override { if (place_id == 0) place_id = admin_id_; switch (kind) { case AccessKind::INVOKE_LUA_CALL: case AccessKind::INVOKE_LUA_EXPR: case AccessKind::INVOKE_FLUSH_PRINTS: case AccessKind::INVOKE_TICK: case AccessKind::INVOKE_LUA_SOURCE: { delayed_invocations_.emplace_back(kind, admin_id_, place_id, datapk); break; } case AccessKind::PROBE_LUA_CALL: { master_->snapshot(); master_->probe_lua_call(admin_id_, place_id, datapk, retpk); master_->rollback(); break; } case AccessKind::VALIDATE_LUA_EXPR: { LuaVar closure; LuaExtStack LS(lua_syntax_checker_, closure); eng::string errmsg = LS.load(closure, datapk, "stdin"); retpk->write_bytes(errmsg); break; } case AccessKind::CHANNEL_PRINTS: { // If there's nothing new in the printbuffer, this is very fast. if (print_channeler_.channel(master_->get_printbuffer(admin_id_), retpk)) { master_->invoke(print_channeler_.invocation(admin_id_)); } set_have_prints(false); break; } default: { util::dprint("Invalid event_access: ", int(kind)); } } } virtual void event_update() override { // Get the clock. double clock = get_clock(); // Execute any queued invocations. // We just feed these directly into the master model. for (const Invocation &inv : delayed_invocations_) { master_->invoke(inv); } delayed_invocations_.clear(); // Check for keyboard input on stdin. while (true) { eng::string line = get_stdio_channel()->in()->readline(); if (line == "") break; console_.add(line); set_console_prompt(console_.get_prompt()); do_command(console_.get_command()); } // Check for new incoming channels, set up client structures. while (true) { SharedChannel chan = new_incoming_channel(); if (chan == nullptr) break; if (chan->port() == 8085) { Client *client = new Client; client->actor_id_ = master_->create_login_actor(); // TODO: initialize the login actor on the master. client->channel_ = std::move(chan); client->async_diff_ = true; client->last_full_diff_ = -100.0; client->last_mini_diff_ = -100.0; client->sync_.reset(new World(WORLD_TYPE_PREDICTIVE)); // This login actor is never used, it is just to preserve the invariant that // the client model and the server synchronous model are identical. client->sync_->create_login_actor(); clients_.emplace_back(client); util::dprint("New client: actor id=", client->actor_id_); } else if (chan->port() == 8080) { HttpChannel htchan; htchan.channel_ = chan; http_server_channels_.push_back(htchan); } } // If the clock has advanced far enough, tick the master model. if (clock >= next_tick_) { master_->invoke(Invocation(AccessKind::INVOKE_TICK, 0, 0, "")); for (UniqueClient &client : clients_) { client->async_diff_ = true; } next_tick_ += 1.0; if (next_tick_ < clock + 0.3) next_tick_ = clock + 0.3; } // Traverse all existing channels, process any communication. for (UniqueClient &client : clients_) { if (client->channel_->closed()) { delete_client(client); continue; } // Check for received invocations. while (handle_invocation(client)); if (client == nullptr) continue; // Possibly send a diff. // Currently, it's configured to send about // ten mini-diffs per second, and two full diffs // per second. It actually only considers sending // a full diff if it already has decided to send // a mini diff. double mini_delay = 0.1; double full_delay = 0.45; if (client->async_diff_) { mini_delay = 0.05; full_delay = 0.1; } if (clock >= client->last_mini_diff_ + mini_delay) { if (clock >= client->last_full_diff_ + full_delay) { send_diffs(client, true); client->last_full_diff_ = clock; client->last_mini_diff_ = clock; } else { send_diffs(client, false); client->last_mini_diff_ = clock; } client->async_diff_ = false; } } util::remove_nullptrs(clients_); // Look for new outgoing HTTP client requests. for (const auto &pair : master_->http_requests()) { const HttpClientRequest &request = pair.second; HttpChannel &channel = http_client_channels_[request.request_id()]; if (channel.channel_ == nullptr) { channel.channel_ = new_outgoing_channel(request.target()); channel.method_ = request.method(); channel.parsed_bytes_ = 0; request.send(channel.channel_->out()); } } // Maintain existing outgoing HTTP client requests. HttpParserVec http_responses; for (auto &pair : http_client_channels_) { HttpChannel &htchan = pair.second; Channel &channel = *htchan.channel_; if (channel.closed() || (channel.in()->fill() > htchan.parsed_bytes_)) { HttpParser response; if (!channel.error().empty()) { response.fail(503, util::ss("Service Unavailable: ", channel.error())); } else { htchan.parsed_bytes_ = channel.in()->fill(); response.parse_response(channel.in()->view(), channel.closed(), htchan.method_); } if (response.complete()) { response.set_request_id(pair.first); http_responses.push_back(response); } } } for (const HttpParser &response : http_responses) { http_client_channels_.erase(response.request_id()); } master_->http_responses(http_responses); // Maintain incoming HTTP server channels. for (HttpChannel &htchan : http_server_channels_) { SharedChannel &chan = htchan.channel_; if (chan->in()->fill() > htchan.parsed_bytes_) { HttpParser parser; parser.parse_request(chan->in()->view(), chan->closed()); htchan.parsed_bytes_ = chan->in()->fill(); if (parser.complete()) { StreamBuffer *sb = chan->out(); HttpServerResponse resp = master_->http_serve(parser); resp.send(sb); htchan.channel_ = nullptr; } } } util::remove_marked_items(http_server_channels_); // Notify the driver if there are any prints. set_have_prints(print_channeler_.have_prints(master_->get_printbuffer(admin_id_))); } }; DrivenEngineDefine("lpxserver", LpxServer);