281 lines
10 KiB
C++
281 lines
10 KiB
C++
#include "wrap-string.hpp"
|
|
#include "wrap-vector.hpp"
|
|
|
|
#include "world.hpp"
|
|
#include "drivenengine.hpp"
|
|
#include "luaconsole.hpp"
|
|
#include "util.hpp"
|
|
#include "printbuffer.hpp"
|
|
|
|
#include <memory>
|
|
|
|
class Client : public eng::opnew {
|
|
public:
|
|
int64_t actor_id_;
|
|
SharedChannel channel_;
|
|
UniqueWorld sync_;
|
|
};
|
|
using UniqueClient = std::unique_ptr<Client>;
|
|
using ClientVector = eng::vector<UniqueClient>;
|
|
|
|
class LpxServer : public DrivenEngine {
|
|
public:
|
|
using StringVec = LuaConsole::StringVec;
|
|
UniqueWorld master_;
|
|
LuaConsole console_;
|
|
ClientVector clients_;
|
|
PrintChanneler print_channeler_;
|
|
HttpChannelMap http_client_channels_;
|
|
HttpChannelVec http_server_channels_;
|
|
int64_t admin_id_;
|
|
Gui gui_;
|
|
|
|
public:
|
|
virtual void event_init(std::string_view srcpk, int argc, char *argv[]) override {
|
|
// Create the master world model.
|
|
master_.reset(new World(WORLD_TYPE_MASTER));
|
|
|
|
// Update the source code of the master model.
|
|
master_->update_source(srcpk);
|
|
|
|
// Create an actor for administrative commands.
|
|
admin_id_ = master_->create_login_actor();
|
|
|
|
// Print out admin ID for debugging purposes.
|
|
stdostream() << "Admin actor id = " << admin_id_ << std::endl;
|
|
|
|
// Enable listening on port 8085 (client connections)
|
|
listen_port(8085);
|
|
|
|
// Enable listening on port 8080 (http server connections)
|
|
listen_port(8080);
|
|
|
|
// Set the console prompt.
|
|
set_console_prompt(console_.get_prompt());
|
|
|
|
// Export stuff to the graphics engine.
|
|
set_visible_world_and_actor(master_.get(), admin_id_);
|
|
}
|
|
|
|
void do_luainvoke_command(const util::StringVec &words) {
|
|
master_->invoke(Invocation(Invocation::KIND_LUA, admin_id_, admin_id_, words[1]));
|
|
}
|
|
|
|
void do_luaprobe_command(const util::StringVec &words) {
|
|
master_->snapshot();
|
|
stdostream() << master_->probe_lua(admin_id_, words[1]);;
|
|
master_->rollback();
|
|
}
|
|
|
|
void do_syntax_command(const util::StringVec &words) {
|
|
stdostream() << "Syntax Error: " << words[1] << std::endl;
|
|
}
|
|
|
|
void do_menu_command(const StringVec &cmd) {
|
|
int64_t place = sv::to_int64(cmd[1], admin_id_);
|
|
master_->update_gui(admin_id_, place, &gui_);
|
|
stdostream() << gui_.menu_debug_string();
|
|
}
|
|
|
|
void do_choose_command(const StringVec &cmd) {
|
|
eng::string action = gui_.get_action(sv::to_int64(cmd[1]));
|
|
if (action == "") {
|
|
stdostream() << "Invalid menu item #" << std::endl;
|
|
return;
|
|
}
|
|
stdostream() << "Invoking plan: " << action << std::endl;
|
|
master_->invoke(Invocation(Invocation::KIND_CHOOSE, admin_id_, gui_.place(), action));
|
|
}
|
|
|
|
void do_tick_command(const util::StringVec &words) {
|
|
master_->invoke(Invocation(Invocation::KIND_TICK, admin_id_, admin_id_, ""));
|
|
}
|
|
|
|
void do_cpl_command(const util::StringVec &words) {
|
|
rescan_lua_source();
|
|
}
|
|
|
|
void do_quit_command(const util::StringVec &words) {
|
|
stop_driver();
|
|
}
|
|
|
|
void do_aborthttp_command(const util::StringVec &words) {
|
|
master_->abort_all_http_requests(425, "http requests aborted from the server command line");
|
|
}
|
|
|
|
void do_command(const util::StringVec &words) {
|
|
if (words.empty()) return;
|
|
else if (words[0] == "luainvoke") do_luainvoke_command(words);
|
|
else if (words[0] == "luaprobe") do_luaprobe_command(words);
|
|
else if (words[0] == "syntax") do_syntax_command(words);
|
|
else if (words[0] == "menu") do_menu_command(words);
|
|
else if (words[0] == "choose") do_choose_command(words);
|
|
else if (words[0] == "tick") do_tick_command(words);
|
|
else if (words[0] == "cpl") do_cpl_command(words);
|
|
else if (words[0] == "quit") do_quit_command(words);
|
|
else if (words[0] == "aborthttp") do_aborthttp_command(words);
|
|
else {
|
|
stdostream() << "Unsupported command: " << words[0] << std::endl;
|
|
}
|
|
}
|
|
|
|
void delete_client(UniqueClient &client) {
|
|
stdostream() << "Client closed: actor id=" << client->actor_id_ << std::endl;
|
|
client.reset();
|
|
}
|
|
|
|
void send_diffs(UniqueClient &client) {
|
|
StreamBuffer *sb = client->channel_->out();
|
|
sb->write_uint8(util::MSG_DIFF);
|
|
sb->write_uint32(0);
|
|
int64_t tw_1 = sb->total_writes();
|
|
//stdostream() << "Sending diffs to client " << client->actor_id_ << std::endl;
|
|
client->sync_->diff_everything(client->actor_id_, master_.get(), sb);
|
|
int64_t tw_2 = sb->total_writes();
|
|
sb->overwrite_int32(tw_1, tw_2 - tw_1);
|
|
}
|
|
|
|
bool handle_invocation(UniqueClient &client) {
|
|
StreamBuffer *sb = client->channel_->in();
|
|
int64_t tr_before = sb->total_reads();
|
|
Invocation inv;
|
|
try {
|
|
uint8_t msg_type = sb->read_uint8();
|
|
if (msg_type != util::MSG_INVOKE) {
|
|
delete_client(client);
|
|
return false;
|
|
}
|
|
inv.deserialize(sb);
|
|
} catch (const StreamEofOnRead &seof) {
|
|
sb->unread_to(tr_before);
|
|
return false;
|
|
} catch (const StreamException &sexcept) {
|
|
delete_client(client);
|
|
return false;
|
|
}
|
|
// Security check.
|
|
if (inv.actor() != client->actor_id_) {
|
|
delete_client(client);
|
|
return false;
|
|
}
|
|
//stdostream() << "Invoking: " << inv.debug_string() << std::endl;
|
|
master_->invoke(inv);
|
|
client->channel_->out()->write_uint8(util::MSG_ACK);
|
|
client->channel_->out()->write_uint32(0);
|
|
client->sync_->invoke(inv);
|
|
send_diffs(client);
|
|
return true;
|
|
}
|
|
|
|
virtual void event_update() override {
|
|
// Execute any queued invocations.
|
|
// We just feed these directly into the master model.
|
|
eng::vector<UniqueInvocation> invocations = get_queued_invocations();
|
|
for (const UniqueInvocation &inv : invocations) {
|
|
master_->invoke(*inv);
|
|
}
|
|
|
|
// Check for keyboard input on stdin.
|
|
while (true) {
|
|
eng::string line = get_stdio_channel()->in()->readline();
|
|
if (line == "") break;
|
|
console_.add(line);
|
|
set_console_prompt(console_.get_prompt());
|
|
do_command(console_.get_command());
|
|
}
|
|
|
|
// Anything in the administrator printbuffer should go to stdostream.
|
|
if (print_channeler_.channel(master_->get_printbuffer(admin_id_), stdostream())) {
|
|
master_->invoke(print_channeler_.invocation(admin_id_));
|
|
}
|
|
|
|
// Check for new incoming channels, set up client structures.
|
|
while (true) {
|
|
SharedChannel chan = new_incoming_channel();
|
|
if (chan == nullptr) break;
|
|
if (chan->port() == 8085) {
|
|
Client *client = new Client;
|
|
client->actor_id_ = master_->create_login_actor();
|
|
client->channel_ = std::move(chan);
|
|
client->sync_.reset(new World(WORLD_TYPE_PREDICTIVE));
|
|
client->sync_->create_login_actor();
|
|
clients_.emplace_back(client);
|
|
stdostream() << "New client: actor id=" << client->actor_id_ << std::endl;
|
|
send_diffs(clients_.back());
|
|
} else if (chan->port() == 8080) {
|
|
HttpChannel htchan;
|
|
htchan.channel_ = chan;
|
|
http_server_channels_.push_back(htchan);
|
|
}
|
|
}
|
|
|
|
// Traverse all existing channels, process any communication.
|
|
for (UniqueClient &client : clients_) {
|
|
if (client->channel_->closed()) {
|
|
delete_client(client);
|
|
continue;
|
|
}
|
|
|
|
// Check for received invocations.
|
|
while (handle_invocation(client));
|
|
}
|
|
util::remove_nullptrs(clients_);
|
|
|
|
// Look for new outgoing HTTP client requests.
|
|
for (const auto &pair : master_->http_requests()) {
|
|
const HttpClientRequest &request = pair.second;
|
|
HttpChannel &channel = http_client_channels_[request.request_id()];
|
|
if (channel.channel_ == nullptr) {
|
|
channel.channel_ = new_outgoing_channel(request.target());
|
|
channel.method_ = request.method();
|
|
channel.parsed_bytes_ = 0;
|
|
request.send(channel.channel_->out());
|
|
}
|
|
}
|
|
|
|
// Maintain existing outgoing HTTP client requests.
|
|
HttpParserVec http_responses;
|
|
for (auto &pair : http_client_channels_) {
|
|
HttpChannel &htchan = pair.second;
|
|
Channel &channel = *htchan.channel_;
|
|
if (channel.closed() || (channel.in()->fill() > htchan.parsed_bytes_)) {
|
|
HttpParser response;
|
|
if (!channel.error().empty()) {
|
|
response.fail(503, util::ss("Service Unavailable: ", channel.error()));
|
|
} else {
|
|
htchan.parsed_bytes_ = channel.in()->fill();
|
|
response.parse_response(channel.in()->view(), channel.closed(), htchan.method_);
|
|
}
|
|
if (response.complete()) {
|
|
response.set_request_id(pair.first);
|
|
http_responses.push_back(response);
|
|
}
|
|
}
|
|
}
|
|
for (const HttpParser &response : http_responses) {
|
|
http_client_channels_.erase(response.request_id());
|
|
}
|
|
master_->http_responses(http_responses);
|
|
|
|
// Maintain incoming HTTP server channels.
|
|
for (HttpChannel &htchan : http_server_channels_) {
|
|
SharedChannel &chan = htchan.channel_;
|
|
if (chan->in()->fill() > htchan.parsed_bytes_) {
|
|
HttpParser parser;
|
|
parser.parse_request(chan->in()->view(), chan->closed());
|
|
htchan.parsed_bytes_ = chan->in()->fill();
|
|
if (parser.complete()) {
|
|
StreamBuffer *sb = chan->out();
|
|
HttpServerResponse resp = master_->http_serve(parser);
|
|
resp.send(sb);
|
|
htchan.channel_ = nullptr;
|
|
}
|
|
}
|
|
}
|
|
util::remove_marked_items(http_server_channels_);
|
|
}
|
|
};
|
|
|
|
DrivenEngineDefine("lpxserver", LpxServer);
|
|
|