Files
integration/luprex/core/cpp/lpxclient.cpp

237 lines
7.5 KiB
C++
Raw Normal View History

#include "lpxclient.hpp"
#include "drivenengine.hpp"
#include "world.hpp"
#include "luaconsole.hpp"
#include "invocation.hpp"
#include "util.hpp"
2021-11-11 13:56:49 -05:00
#include "printbuffer.hpp"
#include <memory>
class LpxClient : public DrivenEngine {
public:
using StringVec = LuaConsole::StringVec;
UniqueWorld world_;
int64_t actor_id_;
InvocationQueue unack_;
UniqueChannel channel_;
LuaConsole console_;
2021-11-11 13:56:49 -05:00
PrintChanneler print_channeler_;
Gui gui_;
public:
2021-11-09 16:27:39 -05:00
void set_initial_state() {
// Create the world model.
world_.reset(new World(util::WORLD_TYPE_C_SYNC));
2021-11-09 16:27:39 -05:00
// Snapshot the initial state.
world_->snapshot();
// Clear the unack command queue.
unack_.clear();
// This is a temporary actor that will be used only until the server sends
// us the first difference transmission. We do this only to establish
// the invariant that there's always an actor. When the first difference
// transmission arrives, this actor may be deleted, or it may just be
// ignored, at the server's discretion.
actor_id_ = world_->create_login_actor();
2021-11-09 16:27:39 -05:00
}
// When the world is in synchronous mode, there's no
// snapshot, and the commands in the unack queue are not
// reflected in the world. In asynchronous mode, there is
// a snapshot that allows return to the synchronous mode,
// and the unack commands are in the world model.
void world_to_synchronous() {
if (!world_->snapshot_empty()) {
world_->rollback();
}
}
void world_to_asynchronous() {
if (world_->snapshot_empty()) {
world_->snapshot();
for (const Invocation &inv : unack_) {
world_->invoke(inv);
}
}
}
void abandon_server() {
2021-11-11 16:23:11 -05:00
stdostream() << "Abandoning server." << std::endl;
2021-11-09 16:27:39 -05:00
// Put the world model back into a known-good state.
set_initial_state();
// Disconnect from the server.
channel_.reset();
}
virtual void event_init(int argc, char *argv[]) {
// Put the world into the starting state.
set_initial_state();
// Establish a connection to the server.
channel_ = new_outgoing_channel("localhost:8085");
2021-11-04 14:49:25 -04:00
// Set the console prompt
get_stdio_channel()->set_prompt(console_.get_prompt());
}
2021-11-09 16:27:39 -05:00
void send_invocation(const Invocation &inv) {
if (channel_ == nullptr) {
stdostream() << "Cannot invoke any actions, not connected." << std::endl;
return;
}
world_to_asynchronous();
world_->invoke(inv);
unack_.push_back(inv);
StreamBuffer *sb = channel_->out();
sb->write_uint8(util::MSG_INVOKE);
inv.serialize(sb);
}
void do_lua_command(const StringVec &words) {
2021-11-16 12:20:11 -05:00
send_invocation(Invocation(Invocation::KIND_LUA, actor_id_, actor_id_, words[1]));
2021-11-09 16:27:39 -05:00
}
void do_syntax_command(const StringVec &words) {
2021-11-16 12:20:11 -05:00
stdostream() << "Syntax Error: " << words[1] << std::endl;
2021-11-09 16:27:39 -05:00
}
void do_view_command(const StringVec &cmd) {
2021-11-16 13:14:59 -05:00
stdostream() << world_->tangibles_near_debug_string(actor_id_, 100);
2021-11-09 16:27:39 -05:00
}
void do_menu_command(const StringVec &cmd) {
world_to_asynchronous();
2021-11-16 13:14:59 -05:00
int64_t place = util::strtoint(cmd[1], actor_id_);
world_->update_gui(actor_id_, place, &gui_);
stdostream() << gui_.menu_debug_string();
2021-11-09 16:27:39 -05:00
}
void do_choose_command(const StringVec &cmd) {
2021-11-16 12:20:11 -05:00
std::string action = gui_.get_action(util::strtoint(cmd[1], -1));
if (action == "") {
stdostream() << "Invalid menu item #" << std::endl;
2021-11-09 16:27:39 -05:00
return;
}
stdostream() << "Invoking plan: " << action << std::endl;
2021-11-16 13:14:59 -05:00
Invocation inv(Invocation::KIND_PLAN, actor_id_, gui_.place(), action);
2021-11-09 16:27:39 -05:00
send_invocation(inv);
}
void do_quit_command(const util::StringVec &words) {
abandon_server();
stop_driver();
}
void do_command(const util::StringVec &words) {
2021-11-04 14:49:25 -04:00
if (words.empty()) return;
2021-11-09 16:27:39 -05:00
else if (words[0] == "lua") do_lua_command(words);
else if (words[0] == "syntax") do_syntax_command(words);
else if (words[0] == "view") do_view_command(words);
else if (words[0] == "menu") do_menu_command(words);
else if (words[0] == "quit") do_quit_command(words);
2021-11-16 12:20:11 -05:00
else if (words[0] == "choose") do_choose_command(words);
2021-11-09 16:27:39 -05:00
else {
2021-11-16 12:20:11 -05:00
stdostream() << "Unsupported command: " << words[0] << std::endl;
}
2021-11-09 16:27:39 -05:00
}
2021-11-11 13:56:49 -05:00
void change_actor_id(int64_t actor_id) {
2021-11-11 16:23:11 -05:00
stdostream() << "Actor ID changing: " << actor_id << std::endl;
2021-11-11 13:56:49 -05:00
print_channeler_.reset();
actor_id_ = actor_id;
}
2021-11-11 16:23:11 -05:00
2021-11-09 16:27:39 -05:00
void receive_ack_from_server(StreamBuffer *sb) {
// An ack is just a single byte, so there's nothing left to read.
if (unack_.empty()) {
// Invalid acknowledgement when theres' nothing in the unack queue.
abandon_server();
return;
}
world_to_synchronous();
world_->invoke(unack_.front());
unack_.pop_front();
}
void receive_diff_from_server(StreamBuffer *sb) {
world_to_synchronous();
try {
2021-11-11 16:23:11 -05:00
int64_t nactor = world_->patch_everything(sb);
2021-11-11 13:56:49 -05:00
if (nactor != actor_id_) change_actor_id(nactor);
2021-11-09 16:27:39 -05:00
} catch (const StreamEof &seof) {
abandon_server();
return;
} catch (const StreamCorruption &scorr) {
abandon_server();
return;
}
}
bool receive_message_from_server(StreamBuffer *sb) {
if (sb->fill() < 5) return false;
int64_t tr_before = sb->total_reads();
uint8_t message_type = sb->read_uint8();
uint32_t message_body_len = sb->read_uint32();
if (sb->fill() < message_body_len) {
sb->unread_to(tr_before);
return false;
}
StreamBuffer body(sb->read_bytes(message_body_len), message_body_len);
if (message_type == util::MSG_ACK) {
receive_ack_from_server(&body);
} else if (message_type == util::MSG_DIFF) {
receive_diff_from_server(&body);
} else {
abandon_server();
return false;
}
if (!body.empty()) {
abandon_server();
return false;
}
return true;
}
virtual void event_update() {
// Check for keyboard input on stdin.
while (true) {
std::string line = get_stdio_channel()->in()->readline();
if (line == "") break;
console_.add(line);
2021-11-04 14:49:25 -04:00
get_stdio_channel()->set_prompt(console_.get_prompt());
do_command(console_.get_command());
}
// Check for communication from server..
if (channel_ != nullptr) {
if (channel_->closed()) {
stdostream() << "Server closed connection " << channel_->error() << std::endl;
2021-11-09 16:27:39 -05:00
abandon_server();
} else {
2021-11-11 16:23:11 -05:00
while (true) {
if (!receive_message_from_server(channel_->in())) break;
if (channel_ == nullptr) break;
}
2021-11-09 16:27:39 -05:00
world_to_asynchronous();
}
}
2021-11-11 16:23:11 -05:00
// Channel print statements.
if (print_channeler_.channel(world_->get_printbuffer(actor_id_), stdostream())) {
send_invocation(print_channeler_.invocation(actor_id_));
}
}
};
UniqueDrivenEngine make_LpxClient() {
return UniqueDrivenEngine(new LpxClient);
}