More work on client and server

This commit is contained in:
2021-11-11 16:23:11 -05:00
parent caa5bab9d3
commit c1c0b02926
9 changed files with 103 additions and 21 deletions

View File

@@ -149,7 +149,7 @@ public:
driven_->drv_get_new_closed(chans); driven_->drv_get_new_closed(chans);
for (int chid : chans) { for (int chid : chans) {
if (socket_[chid] != INVALID_SOCKET) { if (socket_[chid] != INVALID_SOCKET) {
assert(closesocket(socket_[chid] == 0)); assert(closesocket(socket_[chid]) == 0);
socket_[chid] = INVALID_SOCKET; socket_[chid] = INVALID_SOCKET;
connected_[chid] = false; connected_[chid] = false;
} }

View File

@@ -1,5 +1,6 @@
#include "invocation.hpp" #include "invocation.hpp"
#include <sstream>
const std::string &InvocationData::get(const std::string &key) const { const std::string &InvocationData::get(const std::string &key) const {
static std::string blank_; static std::string blank_;
@@ -51,3 +52,22 @@ void Invocation::deserialize(StreamBuffer *sb) {
data_.deserialize(sb); data_.deserialize(sb);
} }
std::string Invocation::debug_string() {
std::ostringstream oss;
oss << "inv[";
switch (kind_) {
case KIND_INVALID: oss << "invalid"; break;
case KIND_PLAN: oss << "plan"; break;
case KIND_LUA: oss << "lua"; break;
case KIND_FLUSH_PRINTS: oss << "flush_prints"; break;
default: oss << "UNKNOWN"; break;
}
oss << " a=" << actor_;
oss << " p=" << place_;
oss << " " << action_;
for (const auto &pair : data_) {
oss << " " << pair.first << "=" << pair.second;
}
oss << "]";
return oss.str();
}

View File

@@ -42,6 +42,8 @@ public:
void serialize(StreamBuffer *sb) const; void serialize(StreamBuffer *sb) const;
void deserialize(StreamBuffer *sb); void deserialize(StreamBuffer *sb);
std::string debug_string();
}; };
class InvocationQueue : public std::deque<Invocation> { class InvocationQueue : public std::deque<Invocation> {

View File

@@ -62,6 +62,8 @@ public:
} }
void abandon_server() { void abandon_server() {
stdostream() << "Abandoning server." << std::endl;
// Put the world model back into a known-good state. // Put the world model back into a known-good state.
set_initial_state(); set_initial_state();
@@ -189,6 +191,7 @@ public:
} }
void change_actor_id(int64_t actor_id) { void change_actor_id(int64_t actor_id) {
stdostream() << "Actor ID changing: " << actor_id << std::endl;
print_channeler_.reset(); print_channeler_.reset();
actor_id_ = actor_id; actor_id_ = actor_id;
} }
@@ -208,7 +211,7 @@ public:
void receive_diff_from_server(StreamBuffer *sb) { void receive_diff_from_server(StreamBuffer *sb) {
world_to_synchronous(); world_to_synchronous();
try { try {
int nactor = world_->patch_everything(sb); int64_t nactor = world_->patch_everything(sb);
if (nactor != actor_id_) change_actor_id(nactor); if (nactor != actor_id_) change_actor_id(nactor);
} catch (const StreamEof &seof) { } catch (const StreamEof &seof) {
abandon_server(); abandon_server();
@@ -260,10 +263,18 @@ public:
stdostream() << "Server closed connection " << channel_->error() << std::endl; stdostream() << "Server closed connection " << channel_->error() << std::endl;
abandon_server(); abandon_server();
} else { } else {
while (receive_message_from_server(channel_->in())); while (true) {
if (!receive_message_from_server(channel_->in())) break;
if (channel_ == nullptr) break;
}
world_to_asynchronous(); world_to_asynchronous();
} }
} }
// Channel print statements.
if (print_channeler_.channel(world_->get_printbuffer(actor_id_), stdostream())) {
send_invocation(print_channeler_.invocation(actor_id_));
}
} }
}; };

View File

@@ -7,20 +7,20 @@
#include "printbuffer.hpp" #include "printbuffer.hpp"
#include <memory> #include <memory>
class ServerClient { class Client {
public: public:
int64_t actor_id_; int64_t actor_id_;
UniqueChannel channel_; UniqueChannel channel_;
UniqueWorld sync_; UniqueWorld sync_;
}; };
using UniqueServerClient = std::unique_ptr<ServerClient>; using UniqueClient = std::unique_ptr<Client>;
using ServerClientVector = std::vector<UniqueServerClient>; using ClientVector = std::vector<UniqueClient>;
class LpxServer : public DrivenEngine { class LpxServer : public DrivenEngine {
public: public:
UniqueWorld master_; UniqueWorld master_;
LuaConsole console_; LuaConsole console_;
ServerClientVector clients_; ClientVector clients_;
PrintChanneler print_channeler_; PrintChanneler print_channeler_;
int64_t admin_id_; int64_t admin_id_;
@@ -79,8 +79,51 @@ public:
} }
} }
bool handle_invocation(ServerClient *sclient) { void delete_client(UniqueClient &client) {
return false; stdostream() << "Client closed: actor id=" << client->actor_id_ << std::endl;
client.reset();
}
void send_diffs(UniqueClient &client) {
StreamBuffer *sb = client->channel_->out();
sb->write_uint8(util::MSG_DIFF);
sb->write_uint32(0);
int64_t tw_1 = sb->total_writes();
stdostream() << "Sending diffs to client " << client->actor_id_ << std::endl;
client->sync_->diff_everything(client->actor_id_, master_.get(), sb);
int64_t tw_2 = sb->total_writes();
sb->overwrite_int32(tw_1, tw_2 - tw_1);
}
bool handle_invocation(UniqueClient &client) {
StreamBuffer *sb = client->channel_->in();
int64_t tr_before = sb->total_reads();
Invocation inv;
try {
uint8_t msg_type = sb->read_uint8();
if (msg_type != util::MSG_INVOKE) {
delete_client(client);
return false;
}
inv.deserialize(sb);
} catch (const StreamEof &seof) {
sb->unread_to(tr_before);
return false;
} catch (const StreamCorruption &scorr) {
delete_client(client);
return false;
}
if (inv.actor() != client->actor_id_) {
stdostream() << "Ignoring invoke with wrong actor ID " << inv.actor() << std::endl;
return true;
}
stdostream() << "Invoking: " << inv.debug_string() << std::endl;
master_->invoke(inv);
client->channel_->out()->write_uint8(util::MSG_ACK);
client->channel_->out()->write_uint32(0);
client->sync_->invoke(inv);
send_diffs(client);
return true;
} }
virtual void event_update() { virtual void event_update() {
@@ -102,28 +145,27 @@ public:
while (true) { while (true) {
UniqueChannel chan = new_incoming_channel(); UniqueChannel chan = new_incoming_channel();
if (chan == nullptr) break; if (chan == nullptr) break;
ServerClient *client = new ServerClient; Client *client = new Client;
client->actor_id_ = master_->create_login_actor(); client->actor_id_ = master_->create_login_actor();
client->channel_ = std::move(chan); client->channel_ = std::move(chan);
client->sync_.reset(new World(util::WORLD_TYPE_S_SYNC)); client->sync_.reset(new World(util::WORLD_TYPE_S_SYNC));
client->sync_->create_login_actor(); client->sync_->create_login_actor();
clients_.emplace_back(client); clients_.emplace_back(client);
stdostream() << "New client: actor id=" << client->actor_id_ << std::endl; stdostream() << "New client: actor id=" << client->actor_id_ << std::endl;
send_diffs(clients_.back());
} }
// Traverse all existing channels, process any communication. // Traverse all existing channels, process any communication.
ServerClientVector keep; for (UniqueClient &client : clients_) {
for (UniqueServerClient &client : clients_) {
if (client->channel_->closed()) { if (client->channel_->closed()) {
stdostream() << "Client closed: actor id=" << client->actor_id_ << std::endl; delete_client(client);
continue; continue;
} }
// Check for received invocations. // Check for received invocations.
while (handle_invocation(client.get())); while (handle_invocation(client));
// Transfer the client to the keep vector.
keep.emplace_back(std::move(client));
} }
clients_ = std::move(keep); util::remove_nullptrs(clients_);
} }
}; };

View File

@@ -103,7 +103,7 @@ bool PrintChanneler::channel(const PrintBuffer *printbuffer, std::ostream &ostre
line_ = printbuffer->first_line(); line_ = printbuffer->first_line();
} }
while (line_ < printbuffer->first_unchecked()) { while (line_ < printbuffer->first_unchecked()) {
ostream << printbuffer->nth(line_) << std::endl; ostream << "[" << printbuffer->nth(line_) << "]" << std::endl;
line_ += 1; line_ += 1;
} }
return line_ > printbuffer->first_line(); return line_ > printbuffer->first_line();

View File

@@ -27,6 +27,7 @@ StreamBuffer::StreamBuffer(int64_t size, bool fixed) {
StreamBuffer::StreamBuffer(const char *s, int64_t size) { StreamBuffer::StreamBuffer(const char *s, int64_t size) {
assert(size >= 0); assert(size >= 0);
init(true, false, const_cast<char *>(s), size); init(true, false, const_cast<char *>(s), size);
write_cursor_ = buf_hi_;
} }
StreamBuffer::~StreamBuffer() { StreamBuffer::~StreamBuffer() {

View File

@@ -94,6 +94,14 @@ std::string trim(std::string s);
// Calculate distance between two points // Calculate distance between two points
double distance_squared(double x1, double y1, double x2, double y2); double distance_squared(double x1, double y1, double x2, double y2);
// Remove nullptrs from a vector of unique pointers.
template<class T>
void remove_nullptrs(std::vector<std::unique_ptr<T>> &vec) {
std::unique_ptr<T> nullp;
auto iter = std::remove(vec.begin(), vec.end(), nullp);
vec.erase(iter, vec.end());
}
// Read the lua source code from the specified directory. // Read the lua source code from the specified directory.
LuaSourcePtr read_lua_source(const std::string &directory); LuaSourcePtr read_lua_source(const std::string &directory);

View File

@@ -665,8 +665,6 @@ void World::close_lthread_state() {
} }
if (pbuffer != nullptr) { if (pbuffer != nullptr) {
pbuffer->add_string(output); pbuffer->add_string(output);
} else {
std::cerr << output;
} }
} }
// Now clean up everything. // Now clean up everything.