Cleaning up lpxclient and lpxserver in preparation for redirect implementation

This commit is contained in:
2026-06-01 22:49:25 -04:00
parent 1b0d96934f
commit bfaf161d30
6 changed files with 45 additions and 18 deletions

View File

@@ -8,11 +8,7 @@
//
// The actual contents of the datapack depends on the type of invocation:
//
// AccessKind::INVALID:
//
// Nothing.
//
// AccessKind::LUA_CALL:
// AccessKind::INVOKE_LUA_CALL:
//
// A lua function call. The class name, the function name, and then
// the function arguments.
@@ -25,14 +21,14 @@
//
// Line number in ascii.
//
// AccessKind::INVOKE_TICK:
//
// Nothing.
//
// AccessKind::INVOKE_LUA_SOURCE:
//
// Packaged lua sourcecode. See drvutil::package_lua_source.
//
// AccessKind::INVOKE_TICK:
//
// Nothing.
//
//////////////////////////////////////////////////////////////////////////
#pragma once
@@ -64,6 +60,14 @@ public:
void serialize(StreamBuffer *sb) const;
void deserialize(StreamBuffer *sb);
// Return true if the kind is a valid kind of invocation
// to go over the network.
static bool is_valid_network_kind(AccessKind k)
{ return k == AccessKind::INVOKE_FLUSH_PRINTS ||
k == AccessKind::INVOKE_LUA_CALL ||
k == AccessKind::INVOKE_LUA_EXPR ||
k == AccessKind::INVOKE_LUA_SOURCE; }
eng::string debug_string() const;
};

View File

@@ -222,7 +222,9 @@ public:
// If there's nothing new in the printbuffer, this is very fast.
world_to_asynchronous();
if (print_channeler_.channel(world_->get_printbuffer(actor_id_), retpk)) {
send_invocation(print_channeler_.invocation(actor_id_));
// TODO: Don't necessarily need to do this on every line of output.
// Can send this every 30 seconds or so.
delayed_invocations_.emplace_back(print_channeler_.invocation(actor_id_));
}
set_have_prints(false);
break;

View File

@@ -91,6 +91,7 @@ public:
bool handle_invocation(UniqueClient &client) {
StreamBuffer *sb = client->channel_->in();
if (sb->empty()) return false;
int64_t tr_before = sb->total_reads();
Invocation inv;
try {
@@ -107,18 +108,29 @@ public:
delete_client(client);
return false;
}
// If the user sent an invalid kind, log them out.
if (!Invocation::is_valid_network_kind(inv.kind())) {
delete_client(client);
return false;
}
// Acknowledge the invocation.
client->channel_->out()->write_uint8(util::MSG_ACK);
client->channel_->out()->write_uint32(0);
// Process the invocation.
// Execute the invocation with the sync model.
client->sync_->invoke(inv);
client->async_diff_ = true;
// Process the invocation in the master model.
//
// An invoke with the wrong actor_id is quietly a noop. This is
// to make leeway for clients who have recently been redirected, and
// who may not know their new actor_id yet.
//
if (inv.actor() == client->actor_id_) {
world_->invoke(inv);
client->sync_->invoke(inv);
client->async_diff_ = true;
}
return true;
}
@@ -150,7 +162,9 @@ public:
case AccessKind::CHANNEL_PRINTS: {
// If there's nothing new in the printbuffer, this is very fast.
if (print_channeler_.channel(world_->get_printbuffer(actor_id_), retpk)) {
world_->invoke(print_channeler_.invocation(actor_id_));
// TODO: Don't necessarily need to do this on every line of output.
// Can send this every 30 seconds or so.
delayed_invocations_.emplace_back(print_channeler_.invocation(actor_id_));
}
set_have_prints(false);
break;

View File

@@ -191,12 +191,14 @@ bool PrintChanneler::channel(const PrintBuffer *printbuffer, StreamBuffer *sb) {
if (printbuffer->first_line() > line_) {
line_ = printbuffer->first_line();
}
bool any = false;
while (line_ < printbuffer->first_unchecked()) {
sb->write_bytes(printbuffer->nth(line_));
sb->write_bytes("\n");
line_ += 1;
any = true;
}
return line_ > printbuffer->first_line();
return any;
}
Invocation PrintChanneler::invocation(int64_t actor_id) {

View File

@@ -173,12 +173,13 @@ public:
bool have_prints(const PrintBuffer *pb) const;
// Copy any new lines from the printbuffer to the stream buffer.
// Update the current line number. Return true if the printbuffer
// contains any lines that have already been channeled.
// Update the current line number. Return true if any data
// was channeled.
bool channel(const PrintBuffer *pb, StreamBuffer *sb);
// Generate an invocation that removes unnecessary lines from the
// printbuffer.
// printbuffer. You should only do this if you just channeled
// some new output.
Invocation invocation(int64_t actor_id);
};