Files
integration/luprex/cpp/core/lpxserver.cpp

319 lines
11 KiB
C++
Raw Normal View History

#include "wrap-string.hpp"
#include "wrap-vector.hpp"
#include "world.hpp"
#include "drivenengine.hpp"
2021-11-11 13:56:49 -05:00
#include "luaconsole.hpp"
#include "util.hpp"
#include "printbuffer.hpp"
#include <memory>
2022-02-28 21:57:54 -05:00
class Client : public eng::opnew {
public:
int64_t actor_id_;
SharedChannel channel_;
UniqueWorld sync_;
double last_diff_;
bool async_diff_;
};
using UniqueClient = std::unique_ptr<Client>;
using ClientVector = eng::vector<UniqueClient>;
class LpxServer : public DrivenEngine, public CommonCommands {
public:
using StringVec = LuaConsole::StringVec;
UniqueWorld master_;
2021-11-11 13:56:49 -05:00
LuaConsole console_;
2021-11-11 16:23:11 -05:00
ClientVector clients_;
2021-11-11 13:56:49 -05:00
PrintChanneler print_channeler_;
HttpChannelMap http_client_channels_;
HttpChannelVec http_server_channels_;
2021-11-11 13:56:49 -05:00
int64_t admin_id_;
Gui gui_;
int next_diff_chan_;
double next_tick_;
public:
virtual void event_init(std::string_view srcpk, int argc, char *argv[]) override {
2021-11-11 13:56:49 -05:00
// Create the master world model.
master_.reset(new World(WORLD_TYPE_MASTER));
2021-11-11 13:56:49 -05:00
2021-11-21 13:35:39 -05:00
// Update the source code of the master model.
master_->update_source(srcpk);
2021-11-21 13:35:39 -05:00
2021-11-11 13:56:49 -05:00
// Create an actor for administrative commands.
admin_id_ = master_->create_login_actor();
2021-11-11 13:56:49 -05:00
// TODO: initialize the admin actor.
2021-11-11 13:56:49 -05:00
// Print out admin ID for debugging purposes.
stdostream() << "Admin actor id = " << admin_id_ << std::endl;
// Enable listening on port 8085 (client connections)
2021-11-11 13:56:49 -05:00
listen_port(8085);
// Enable listening on port 8080 (http server connections)
listen_port(8080);
2021-11-11 13:56:49 -05:00
// Set the console prompt.
set_console_prompt(console_.get_prompt());
// Export stuff to the graphics engine.
set_visible_world_and_actor(master_.get(), admin_id_);
// for ticking.
next_tick_ = 0.0;
2021-11-11 13:56:49 -05:00
}
virtual void do_syntax_error(std::string_view error) override {
stdostream() << "Syntax error: " << error << std::endl;
2021-11-11 13:56:49 -05:00
}
virtual void do_unknown_command(std::string_view name) override {
stdostream() << "Unknown command: " << name << std::endl;
}
virtual void do_choose_command(int64_t n) override {
eng::string action = gui_.get_action(n);
if (action == "") {
stdostream() << "Invalid menu item #" << std::endl;
return;
}
master_->invoke(Invocation(Invocation::KIND_CHOOSE, admin_id_, gui_.place(), action));
2021-11-11 13:56:49 -05:00
}
virtual void do_view_command() override {
stdostream() << master_->tangibles_near_debug_string(admin_id_, 1000);
}
virtual void do_moveto_command(int x, int y) override {
do_unknown_command("moveto");
}
virtual void do_menu_command(int64_t tanid) override {
master_->update_gui(admin_id_, (tanid == 0) ? admin_id_ : tanid, &gui_);
stdostream() << gui_.menu_debug_string();
}
virtual void do_quit_command() override {
stop_driver();
2021-11-26 15:45:36 -05:00
}
virtual void do_cpl_command() override {
2021-12-15 14:18:19 -05:00
rescan_lua_source();
}
virtual void do_work_command() override {
do_unknown_command("work");
2021-11-11 13:56:49 -05:00
}
virtual void do_display_command() override {
do_unknown_command("display");
}
virtual void do_aborthttp_command() override {
master_->abort_all_http_requests(425, "http requests aborted from the server command line");
}
virtual void do_connect_command(std::string_view hostname) override {
do_unknown_command("connect");
}
virtual void do_luainvoke_command(std::string_view cmd) override {
master_->invoke(Invocation(Invocation::KIND_LUA, admin_id_, admin_id_, cmd));
}
virtual void do_luaprobe_command(std::string_view cmd) override {
master_->snapshot();
stdostream() << master_->probe_lua(admin_id_, cmd);
master_->rollback();
2021-11-11 13:56:49 -05:00
}
2021-11-11 16:23:11 -05:00
void delete_client(UniqueClient &client) {
stdostream() << "Client closed: actor id=" << client->actor_id_ << std::endl;
master_->disconnected(client->actor_id_);
2021-11-11 16:23:11 -05:00
client.reset();
}
2021-11-11 16:23:11 -05:00
void send_diffs(UniqueClient &client) {
StreamBuffer *sb = client->channel_->out();
sb->write_uint8(util::MSG_DIFF);
sb->write_uint32(0);
int64_t tw_1 = sb->total_writes();
2023-04-10 16:23:31 -04:00
//stdostream() << "Sending diffs to client " << client->actor_id_ << std::endl;
2021-11-11 16:23:11 -05:00
client->sync_->diff_everything(client->actor_id_, master_.get(), sb);
int64_t tw_2 = sb->total_writes();
sb->overwrite_int32(tw_1, tw_2 - tw_1);
}
bool handle_invocation(UniqueClient &client) {
StreamBuffer *sb = client->channel_->in();
int64_t tr_before = sb->total_reads();
Invocation inv;
try {
uint8_t msg_type = sb->read_uint8();
if (msg_type != util::MSG_INVOKE) {
delete_client(client);
return false;
}
inv.deserialize(sb);
} catch (const StreamEofOnRead &seof) {
2021-11-11 16:23:11 -05:00
sb->unread_to(tr_before);
return false;
} catch (const StreamException &sexcept) {
2021-11-11 16:23:11 -05:00
delete_client(client);
return false;
}
// Security check.
2021-11-11 16:23:11 -05:00
if (inv.actor() != client->actor_id_) {
delete_client(client);
return false;
2021-11-11 16:23:11 -05:00
}
2023-04-10 16:23:31 -04:00
//stdostream() << "Invoking: " << inv.debug_string() << std::endl;
2021-11-11 16:23:11 -05:00
master_->invoke(inv);
client->channel_->out()->write_uint8(util::MSG_ACK);
client->channel_->out()->write_uint32(0);
client->sync_->invoke(inv);
client->async_diff_ = true;
2021-11-11 16:23:11 -05:00
return true;
}
virtual void event_update() override {
// Get the clock.
double clock = get_clock();
// Execute any queued invocations.
// We just feed these directly into the master model.
eng::vector<UniqueInvocation> invocations = get_queued_invocations();
for (const UniqueInvocation &inv : invocations) {
master_->invoke(*inv);
}
2021-12-15 14:18:19 -05:00
2021-11-11 13:56:49 -05:00
// Check for keyboard input on stdin.
while (true) {
eng::string line = get_stdio_channel()->in()->readline();
2021-11-11 13:56:49 -05:00
if (line == "") break;
console_.add(line);
set_console_prompt(console_.get_prompt());
2021-11-11 13:56:49 -05:00
do_command(console_.get_command());
}
// Anything in the administrator printbuffer should go to stdostream.
if (print_channeler_.channel(master_->get_printbuffer(admin_id_), stdostream())) {
master_->invoke(print_channeler_.invocation(admin_id_));
}
// Check for new incoming channels, set up client structures.
while (true) {
SharedChannel chan = new_incoming_channel();
2021-11-11 13:56:49 -05:00
if (chan == nullptr) break;
if (chan->port() == 8085) {
Client *client = new Client;
client->actor_id_ = master_->create_login_actor();
// TODO: initialize the login actor on the master.
client->channel_ = std::move(chan);
client->async_diff_ = true;
client->last_diff_ = -100.0;
client->sync_.reset(new World(WORLD_TYPE_PREDICTIVE));
// This login actor is never used, it is just to preserve the invariant that
// the client model and the server synchronous model are identical.
client->sync_->create_login_actor();
clients_.emplace_back(client);
stdostream() << "New client: actor id=" << client->actor_id_ << std::endl;
} else if (chan->port() == 8080) {
HttpChannel htchan;
htchan.channel_ = chan;
http_server_channels_.push_back(htchan);
}
2021-11-11 13:56:49 -05:00
}
// If the clock has advanced far enough, tick the master model.
if (clock >= next_tick_) {
master_->invoke(Invocation(Invocation::KIND_TICK, 0, 0, ""));
2024-02-26 15:20:47 -05:00
for (UniqueClient &client : clients_) {
client->async_diff_ = true;
}
next_tick_ += 1.0;
if (next_tick_ < clock + 0.3) next_tick_ = clock + 0.3;
}
2021-11-11 13:56:49 -05:00
// Traverse all existing channels, process any communication.
2021-11-11 16:23:11 -05:00
for (UniqueClient &client : clients_) {
2021-11-11 13:56:49 -05:00
if (client->channel_->closed()) {
2021-11-11 16:23:11 -05:00
delete_client(client);
2021-11-11 13:56:49 -05:00
continue;
}
2021-11-11 16:23:11 -05:00
2021-11-11 13:56:49 -05:00
// Check for received invocations.
2021-11-11 16:23:11 -05:00
while (handle_invocation(client));
// Possibly send a diff.
double diffdelay = 5.0;
if (client->async_diff_) diffdelay = 0.1;
if (clock >= client->last_diff_ + diffdelay) {
send_diffs(client);
client->last_diff_ = clock;
client->async_diff_ = false;
}
2021-11-11 13:56:49 -05:00
}
2021-11-11 16:23:11 -05:00
util::remove_nullptrs(clients_);
2022-05-06 13:16:27 -04:00
// Look for new outgoing HTTP client requests.
for (const auto &pair : master_->http_requests()) {
const HttpClientRequest &request = pair.second;
HttpChannel &channel = http_client_channels_[request.request_id()];
2022-05-06 13:16:27 -04:00
if (channel.channel_ == nullptr) {
channel.channel_ = new_outgoing_channel(request.target());
channel.method_ = request.method();
2022-05-06 13:16:27 -04:00
channel.parsed_bytes_ = 0;
request.send(channel.channel_->out());
}
}
// Maintain existing outgoing HTTP client requests.
2022-05-16 17:16:42 -04:00
HttpParserVec http_responses;
2022-05-06 13:16:27 -04:00
for (auto &pair : http_client_channels_) {
HttpChannel &htchan = pair.second;
2022-05-06 13:16:27 -04:00
Channel &channel = *htchan.channel_;
if (channel.closed() || (channel.in()->fill() > htchan.parsed_bytes_)) {
2022-05-16 17:16:42 -04:00
HttpParser response;
2022-05-06 13:16:27 -04:00
if (!channel.error().empty()) {
response.fail(503, util::ss("Service Unavailable: ", channel.error()));
} else {
htchan.parsed_bytes_ = channel.in()->fill();
response.parse_response(channel.in()->view(), channel.closed(), htchan.method_);
2022-05-06 13:16:27 -04:00
}
if (response.complete()) {
response.set_request_id(pair.first);
http_responses.push_back(response);
}
}
}
2022-05-16 17:16:42 -04:00
for (const HttpParser &response : http_responses) {
2022-05-06 13:16:27 -04:00
http_client_channels_.erase(response.request_id());
}
master_->http_responses(http_responses);
// Maintain incoming HTTP server channels.
for (HttpChannel &htchan : http_server_channels_) {
SharedChannel &chan = htchan.channel_;
if (chan->in()->fill() > htchan.parsed_bytes_) {
HttpParser parser;
parser.parse_request(chan->in()->view(), chan->closed());
htchan.parsed_bytes_ = chan->in()->fill();
if (parser.complete()) {
StreamBuffer *sb = chan->out();
2022-05-20 17:12:58 -04:00
HttpServerResponse resp = master_->http_serve(parser);
resp.send(sb);
htchan.channel_ = nullptr;
}
}
}
util::remove_marked_items(http_server_channels_);
}
};
DrivenEngineDefine("lpxserver", LpxServer);