Files
integration/luprex/cpp/core/lpxclient.cpp

304 lines
9.3 KiB
C++
Raw Normal View History

#include "wrap-string.hpp"
#include "wrap-vector.hpp"
#include "drivenengine.hpp"
#include "world.hpp"
#include "luaconsole.hpp"
#include "invocation.hpp"
#include "util.hpp"
2021-11-11 13:56:49 -05:00
#include "printbuffer.hpp"
#include <memory>
class LpxClient : public DrivenEngine, public CommonCommands {
public:
using StringVec = LuaConsole::StringVec;
UniqueWorld world_;
int64_t actor_id_;
InvocationQueue unack_;
SharedChannel channel_;
LuaConsole console_;
2021-11-11 13:56:49 -05:00
PrintChanneler print_channeler_;
eng::vector<Invocation> delayed_invocations_;
public:
LpxClient() {
set_console_prompt(console_.get_prompt());
set_initial_state_standalone();
}
void set_initial_state_connect(const eng::string &hostspec) {
// Create the world model.
world_.reset(new World(WORLD_TYPE_PREDICTIVE));
// Create the communication channel.
channel_ = new_outgoing_channel(hostspec);
// This is a temporary actor that will be used only until the server sends
// us the first difference transmission. We do this only to establish
// the invariant that there's always an actor. When the first difference
// transmission arrives, this actor may be deleted, or it may just be
// ignored, at the server's discretion.
actor_id_ = world_->create_login_actor();
// Clear the unack command queue.
unack_.clear();
// Export stuff to the graphics engine.
set_visible_world_and_actor(world_.get(), actor_id_);
// Reset the print channeler
print_channeler_.reset();
2025-06-27 20:03:38 -04:00
// Do not trigger lua source loading.
rescan_lua_source(false);
// Clear any saved invocations
delayed_invocations_.clear();
}
void set_initial_state_standalone() {
// Create the world model.
world_.reset(new World(WORLD_TYPE_MASTER));
// Make sure the channel is empty.
channel_.reset();
// Create the standalone actor.
actor_id_ = world_->create_login_actor();
// Clear the unack command queue.
unack_.clear();
// Export stuff to the graphics engine.
set_visible_world_and_actor(world_.get(), actor_id_);
// Reset the print channeler
print_channeler_.reset();
2021-11-09 16:27:39 -05:00
// Trigger lua source loading.
2025-06-27 20:03:38 -04:00
rescan_lua_source(true);
// Clear any saved invocations
delayed_invocations_.clear();
}
2021-11-09 16:27:39 -05:00
// When the world is in synchronous mode, there's no
// snapshot, and the commands in the unack queue are not
// reflected in the world. In asynchronous mode, there is
// a snapshot that allows return to the synchronous mode,
// and the unack commands are in the world model.
void world_to_synchronous() {
if (!world_->snapshot_empty()) {
world_->rollback();
}
}
void world_to_asynchronous() {
if (!world_->is_authoritative()) {
if (world_->snapshot_empty()) {
world_->snapshot();
for (const Invocation &inv : unack_) {
world_->invoke(inv);
}
2021-11-09 16:27:39 -05:00
}
}
}
void abandon_server() {
if (channel_)
{
set_initial_state_standalone();
}
}
2021-11-09 16:27:39 -05:00
void send_invocation(const Invocation &inv) {
if (channel_ == nullptr) {
world_->invoke(inv);
} else {
world_to_asynchronous();
world_->invoke(inv);
unack_.push_back(inv);
StreamBuffer *sb = channel_->out();
sb->write_uint8(util::MSG_INVOKE);
inv.serialize(sb);
2021-11-09 16:27:39 -05:00
}
}
virtual void do_syntax_error(std::string_view error) override {
stdostream() << "Syntax error: " << error << std::endl;
2021-11-09 16:27:39 -05:00
}
virtual void do_unknown_command(std::string_view name) override {
stdostream() << "Unknown command: " << name << std::endl;
}
virtual void do_view_command() override {
stdostream() << world_->tangibles_near_debug_string(actor_id_, 1000);
2021-11-09 16:27:39 -05:00
}
virtual void do_moveto_command(int x, int y) override {
do_unknown_command("moveto");
}
virtual void do_quit_command() override {
abandon_server();
stop_driver();
2021-11-09 16:27:39 -05:00
}
virtual void do_cpl_command() override {
2025-06-27 20:03:38 -04:00
rescan_lua_source(true);
2021-11-26 15:45:36 -05:00
}
virtual void do_work_command() override {
do_unknown_command("work");
2021-12-15 14:18:19 -05:00
}
virtual void do_display_command() override {
do_unknown_command("display");
2021-12-17 16:21:56 -05:00
}
virtual void do_aborthttp_command() override {
do_unknown_command("aborthttp");
2021-11-09 16:27:39 -05:00
}
virtual void do_connect_command(std::string_view hostname) override {
set_initial_state_connect(util::ss("nocert:", hostname, ":8085"));
}
virtual void do_luainvoke_command(std::string_view cmd) override {
send_invocation(Invocation(AccessKind::INVOKE_LUA_EXPR, actor_id_, actor_id_, cmd));
}
virtual void do_luaprobe_command(std::string_view cmd) override {
world_to_asynchronous();
stdostream() << world_->probe_lua_expr(actor_id_, cmd);
world_to_synchronous();
2021-11-09 16:27:39 -05:00
}
2021-11-11 13:56:49 -05:00
void change_actor_id(int64_t actor_id) {
util::dprint("Actor ID changing: ", actor_id);
2021-11-11 13:56:49 -05:00
print_channeler_.reset();
actor_id_ = actor_id;
set_visible_world_and_actor(world_.get(), actor_id_);
2021-11-11 13:56:49 -05:00
}
2021-11-11 16:23:11 -05:00
2021-11-09 16:27:39 -05:00
void receive_ack_from_server(StreamBuffer *sb) {
// An ack is just a single byte, so there's nothing left to read.
if (unack_.empty()) {
// Invalid acknowledgement when theres' nothing in the unack queue.
abandon_server();
return;
}
world_to_synchronous();
world_->invoke(unack_.front());
unack_.pop_front();
}
void receive_diff_from_server(StreamBuffer *sb) {
world_to_synchronous();
try {
DebugCollector dbc("");
2021-11-21 13:35:39 -05:00
int64_t nactor = world_->patch_everything(sb, &dbc);
2021-11-11 13:56:49 -05:00
if (nactor != actor_id_) change_actor_id(nactor);
2021-11-21 13:35:39 -05:00
dbc.dump(stdostream());
} catch (const StreamException &sexcept) {
2021-11-09 16:27:39 -05:00
abandon_server();
return;
}
}
bool receive_message_from_server(StreamBuffer *sb) {
if (sb->fill() < 5) return false;
int64_t tr_before = sb->total_reads();
uint8_t message_type = sb->read_uint8();
uint32_t message_body_len = sb->read_uint32();
if (sb->fill() < message_body_len) {
sb->unread_to(tr_before);
return false;
}
const char *message_body = sb->read_bytes(message_body_len);
StreamBuffer body(std::string_view(message_body, message_body_len));
2021-11-09 16:27:39 -05:00
if (message_type == util::MSG_ACK) {
receive_ack_from_server(&body);
} else if (message_type == util::MSG_DIFF) {
receive_diff_from_server(&body);
} else {
abandon_server();
return false;
}
if (!body.empty()) {
abandon_server();
return false;
}
return true;
}
virtual void event_access(AccessKind kind, int64_t place_id, std::string_view datapk, StreamBuffer *retpk) override {
if (place_id == 0) place_id = actor_id_;
2024-09-05 01:33:14 -04:00
switch (kind) {
case AccessKind::INVOKE_LUA_CALL:
case AccessKind::INVOKE_LUA_EXPR:
case AccessKind::INVOKE_FLUSH_PRINTS:
case AccessKind::INVOKE_TICK:
case AccessKind::INVOKE_LUA_SOURCE: {
delayed_invocations_.emplace_back(kind, actor_id_, place_id, datapk);
break;
}
case AccessKind::PROBE_LUA_CALL: {
2024-09-05 01:33:14 -04:00
world_to_asynchronous();
world_->probe_lua_call(actor_id_, place_id, datapk, retpk);
break;
}
case AccessKind::CONNECT_TO_SERVER: {
set_initial_state_connect(util::ss("nocert:", datapk, ":8085"));
break;
}
2024-09-05 01:33:14 -04:00
default: {
util::dprint("Invalid event_access: ", int(kind));
2024-09-05 01:33:14 -04:00
}
}
}
virtual void event_update() override {
// Send invocations. We execute these using predictive execution.
for (const Invocation &inv : delayed_invocations_) {
send_invocation(inv);
2021-12-15 14:18:19 -05:00
}
delayed_invocations_.clear();
2021-12-15 14:18:19 -05:00
// Check for keyboard input on stdin.
while (true) {
eng::string line = get_stdio_channel()->in()->readline();
if (line == "") break;
console_.add(line);
set_console_prompt(console_.get_prompt());
2021-11-04 14:49:25 -04:00
do_command(console_.get_command());
}
// Check for communication from server..
if (channel_ != nullptr) {
if (channel_->closed()) {
util::dprint("server closed connection: ", channel_->error());
2021-11-09 16:27:39 -05:00
abandon_server();
} else {
2021-11-11 16:23:11 -05:00
while (true) {
if (!receive_message_from_server(channel_->in())) break;
if (channel_ == nullptr) break;
}
2021-11-09 16:27:39 -05:00
world_to_asynchronous();
}
}
2021-11-11 16:23:11 -05:00
// Channel print statements.
if (print_channeler_.channel(world_->get_printbuffer(actor_id_), stdostream())) {
send_invocation(print_channeler_.invocation(actor_id_));
2021-11-11 16:23:11 -05:00
}
}
};
DrivenEngineDefine("lpxclient", LpxClient);