diff --git a/luprex/core/cpp/driver-mingw.cpp b/luprex/core/cpp/driver-mingw.cpp index c2ea0423..64ab144b 100644 --- a/luprex/core/cpp/driver-mingw.cpp +++ b/luprex/core/cpp/driver-mingw.cpp @@ -149,7 +149,7 @@ public: driven_->drv_get_new_closed(chans); for (int chid : chans) { if (socket_[chid] != INVALID_SOCKET) { - assert(closesocket(socket_[chid] == 0)); + assert(closesocket(socket_[chid]) == 0); socket_[chid] = INVALID_SOCKET; connected_[chid] = false; } diff --git a/luprex/core/cpp/invocation.cpp b/luprex/core/cpp/invocation.cpp index efba1c7e..75388681 100644 --- a/luprex/core/cpp/invocation.cpp +++ b/luprex/core/cpp/invocation.cpp @@ -1,5 +1,6 @@ #include "invocation.hpp" +#include const std::string &InvocationData::get(const std::string &key) const { static std::string blank_; @@ -51,3 +52,22 @@ void Invocation::deserialize(StreamBuffer *sb) { data_.deserialize(sb); } +std::string Invocation::debug_string() { + std::ostringstream oss; + oss << "inv["; + switch (kind_) { + case KIND_INVALID: oss << "invalid"; break; + case KIND_PLAN: oss << "plan"; break; + case KIND_LUA: oss << "lua"; break; + case KIND_FLUSH_PRINTS: oss << "flush_prints"; break; + default: oss << "UNKNOWN"; break; + } + oss << " a=" << actor_; + oss << " p=" << place_; + oss << " " << action_; + for (const auto &pair : data_) { + oss << " " << pair.first << "=" << pair.second; + } + oss << "]"; + return oss.str(); +} diff --git a/luprex/core/cpp/invocation.hpp b/luprex/core/cpp/invocation.hpp index a006a3ef..0f6cc3f5 100644 --- a/luprex/core/cpp/invocation.hpp +++ b/luprex/core/cpp/invocation.hpp @@ -42,6 +42,8 @@ public: void serialize(StreamBuffer *sb) const; void deserialize(StreamBuffer *sb); + + std::string debug_string(); }; class InvocationQueue : public std::deque { diff --git a/luprex/core/cpp/lpxclient.cpp b/luprex/core/cpp/lpxclient.cpp index 06427d6a..f420345a 100644 --- a/luprex/core/cpp/lpxclient.cpp +++ b/luprex/core/cpp/lpxclient.cpp @@ -62,6 +62,8 @@ public: } void abandon_server() { + stdostream() << "Abandoning server." << std::endl; + // Put the world model back into a known-good state. set_initial_state(); @@ -189,10 +191,11 @@ public: } void change_actor_id(int64_t actor_id) { + stdostream() << "Actor ID changing: " << actor_id << std::endl; print_channeler_.reset(); actor_id_ = actor_id; } - + void receive_ack_from_server(StreamBuffer *sb) { // An ack is just a single byte, so there's nothing left to read. if (unack_.empty()) { @@ -208,7 +211,7 @@ public: void receive_diff_from_server(StreamBuffer *sb) { world_to_synchronous(); try { - int nactor = world_->patch_everything(sb); + int64_t nactor = world_->patch_everything(sb); if (nactor != actor_id_) change_actor_id(nactor); } catch (const StreamEof &seof) { abandon_server(); @@ -260,10 +263,18 @@ public: stdostream() << "Server closed connection " << channel_->error() << std::endl; abandon_server(); } else { - while (receive_message_from_server(channel_->in())); + while (true) { + if (!receive_message_from_server(channel_->in())) break; + if (channel_ == nullptr) break; + } world_to_asynchronous(); } } + + // Channel print statements. + if (print_channeler_.channel(world_->get_printbuffer(actor_id_), stdostream())) { + send_invocation(print_channeler_.invocation(actor_id_)); + } } }; diff --git a/luprex/core/cpp/lpxserver.cpp b/luprex/core/cpp/lpxserver.cpp index d9d04878..7bc9314a 100644 --- a/luprex/core/cpp/lpxserver.cpp +++ b/luprex/core/cpp/lpxserver.cpp @@ -7,20 +7,20 @@ #include "printbuffer.hpp" #include -class ServerClient { +class Client { public: int64_t actor_id_; UniqueChannel channel_; UniqueWorld sync_; }; -using UniqueServerClient = std::unique_ptr; -using ServerClientVector = std::vector; +using UniqueClient = std::unique_ptr; +using ClientVector = std::vector; class LpxServer : public DrivenEngine { public: UniqueWorld master_; LuaConsole console_; - ServerClientVector clients_; + ClientVector clients_; PrintChanneler print_channeler_; int64_t admin_id_; @@ -79,10 +79,53 @@ public: } } - bool handle_invocation(ServerClient *sclient) { - return false; + void delete_client(UniqueClient &client) { + stdostream() << "Client closed: actor id=" << client->actor_id_ << std::endl; + client.reset(); } + void send_diffs(UniqueClient &client) { + StreamBuffer *sb = client->channel_->out(); + sb->write_uint8(util::MSG_DIFF); + sb->write_uint32(0); + int64_t tw_1 = sb->total_writes(); + stdostream() << "Sending diffs to client " << client->actor_id_ << std::endl; + client->sync_->diff_everything(client->actor_id_, master_.get(), sb); + int64_t tw_2 = sb->total_writes(); + sb->overwrite_int32(tw_1, tw_2 - tw_1); + } + + bool handle_invocation(UniqueClient &client) { + StreamBuffer *sb = client->channel_->in(); + int64_t tr_before = sb->total_reads(); + Invocation inv; + try { + uint8_t msg_type = sb->read_uint8(); + if (msg_type != util::MSG_INVOKE) { + delete_client(client); + return false; + } + inv.deserialize(sb); + } catch (const StreamEof &seof) { + sb->unread_to(tr_before); + return false; + } catch (const StreamCorruption &scorr) { + delete_client(client); + return false; + } + if (inv.actor() != client->actor_id_) { + stdostream() << "Ignoring invoke with wrong actor ID " << inv.actor() << std::endl; + return true; + } + stdostream() << "Invoking: " << inv.debug_string() << std::endl; + master_->invoke(inv); + client->channel_->out()->write_uint8(util::MSG_ACK); + client->channel_->out()->write_uint32(0); + client->sync_->invoke(inv); + send_diffs(client); + return true; + } + virtual void event_update() { // Check for keyboard input on stdin. while (true) { @@ -102,28 +145,27 @@ public: while (true) { UniqueChannel chan = new_incoming_channel(); if (chan == nullptr) break; - ServerClient *client = new ServerClient; + Client *client = new Client; client->actor_id_ = master_->create_login_actor(); client->channel_ = std::move(chan); client->sync_.reset(new World(util::WORLD_TYPE_S_SYNC)); client->sync_->create_login_actor(); clients_.emplace_back(client); stdostream() << "New client: actor id=" << client->actor_id_ << std::endl; + send_diffs(clients_.back()); } // Traverse all existing channels, process any communication. - ServerClientVector keep; - for (UniqueServerClient &client : clients_) { + for (UniqueClient &client : clients_) { if (client->channel_->closed()) { - stdostream() << "Client closed: actor id=" << client->actor_id_ << std::endl; + delete_client(client); continue; } + // Check for received invocations. - while (handle_invocation(client.get())); - // Transfer the client to the keep vector. - keep.emplace_back(std::move(client)); + while (handle_invocation(client)); } - clients_ = std::move(keep); + util::remove_nullptrs(clients_); } }; diff --git a/luprex/core/cpp/printbuffer.cpp b/luprex/core/cpp/printbuffer.cpp index 52ef15e6..d1c69662 100644 --- a/luprex/core/cpp/printbuffer.cpp +++ b/luprex/core/cpp/printbuffer.cpp @@ -103,7 +103,7 @@ bool PrintChanneler::channel(const PrintBuffer *printbuffer, std::ostream &ostre line_ = printbuffer->first_line(); } while (line_ < printbuffer->first_unchecked()) { - ostream << printbuffer->nth(line_) << std::endl; + ostream << "[" << printbuffer->nth(line_) << "]" << std::endl; line_ += 1; } return line_ > printbuffer->first_line(); diff --git a/luprex/core/cpp/streambuffer.cpp b/luprex/core/cpp/streambuffer.cpp index 2bd1fd21..a467a0bc 100644 --- a/luprex/core/cpp/streambuffer.cpp +++ b/luprex/core/cpp/streambuffer.cpp @@ -27,6 +27,7 @@ StreamBuffer::StreamBuffer(int64_t size, bool fixed) { StreamBuffer::StreamBuffer(const char *s, int64_t size) { assert(size >= 0); init(true, false, const_cast(s), size); + write_cursor_ = buf_hi_; } StreamBuffer::~StreamBuffer() { diff --git a/luprex/core/cpp/util.hpp b/luprex/core/cpp/util.hpp index 92846b9e..9466b51a 100644 --- a/luprex/core/cpp/util.hpp +++ b/luprex/core/cpp/util.hpp @@ -94,6 +94,14 @@ std::string trim(std::string s); // Calculate distance between two points double distance_squared(double x1, double y1, double x2, double y2); +// Remove nullptrs from a vector of unique pointers. +template +void remove_nullptrs(std::vector> &vec) { + std::unique_ptr nullp; + auto iter = std::remove(vec.begin(), vec.end(), nullp); + vec.erase(iter, vec.end()); +} + // Read the lua source code from the specified directory. LuaSourcePtr read_lua_source(const std::string &directory); diff --git a/luprex/core/cpp/world-core.cpp b/luprex/core/cpp/world-core.cpp index b4c910a7..90f82416 100644 --- a/luprex/core/cpp/world-core.cpp +++ b/luprex/core/cpp/world-core.cpp @@ -665,8 +665,6 @@ void World::close_lthread_state() { } if (pbuffer != nullptr) { pbuffer->add_string(output); - } else { - std::cerr << output; } } // Now clean up everything.