diff --git a/luprex/core/cpp/drivenengine.cpp b/luprex/core/cpp/drivenengine.cpp index 0440800c..43f11e75 100644 --- a/luprex/core/cpp/drivenengine.cpp +++ b/luprex/core/cpp/drivenengine.cpp @@ -1,8 +1,7 @@ #include "drivenengine.hpp" -Channel::Channel(DrivenEngine *de, int chid, int port, const std::string &target) { - driven_ = de; +Channel::Channel(DrivenEngine *de, int chid, int port, const std::string &target, bool stop) { chid_ = chid; sb_in_.reset(new StreamBuffer); sb_out_.reset(new StreamBuffer); @@ -12,20 +11,12 @@ Channel::Channel(DrivenEngine *de, int chid, int port, const std::string &target readline_enabled_ = (chid == 0); readline_lastc_ = 0; desired_prompt_ = ""; - assert(driven_->channels_[chid_] == nullptr); - driven_->channels_[chid_] = this; -} - -Channel::~Channel() { - assert(driven_->channels_[chid_] == this); - driven_->new_closed_.insert(chid_); - driven_->new_outgoing_.erase(chid_); - driven_->channels_[chid_] = nullptr; + stop_driver_ = stop; } void Channel::show_or_hide_command(bool ignore_sb_out) { bool sb_out_empty = (sb_out_->fill() == 0) || ignore_sb_out; - if (!sb_out_empty || (!readline_enabled_) || (driven_->stop_driver_)) { + if (!sb_out_empty || (!readline_enabled_) || (stop_driver_)) { int ccsize = current_prompt_.size() + current_command_.size(); readline_echo_ += util::repeat_string("\b \b", ccsize); current_prompt_ = ""; @@ -140,8 +131,8 @@ int DrivenEngine::find_unused_chid() { Channel *DrivenEngine::get_chid(int chid) { assert(unsigned(chid) < MAX_CHAN); - assert(channels_[chid] != nullptr); - return channels_[chid]; + assert(channels_[chid].get() != nullptr); + return channels_[chid].get(); } void DrivenEngine::listen_port(int port) { @@ -152,24 +143,26 @@ double DrivenEngine::get_clock() { return clock_; } -UniqueChannel DrivenEngine::new_outgoing_channel(const std::string &target) { +SharedChannel DrivenEngine::new_outgoing_channel(const std::string &target) { int chid = find_unused_chid(); new_outgoing_.insert(chid); - return UniqueChannel(new Channel(this, chid, 0, target)); + SharedChannel result = std::make_shared(this, chid, 0, target, stop_driver_); + channels_[chid] = result; + return result; } -UniqueChannel DrivenEngine::new_incoming_channel() { +SharedChannel DrivenEngine::new_incoming_channel() { if (accepted_channels_.empty()) { return nullptr; } else { - UniqueChannel result = std::move(accepted_channels_.back()); + SharedChannel result = std::move(accepted_channels_.back()); accepted_channels_.pop_back(); return result; } } -Channel *DrivenEngine::get_stdio_channel() { - return stdio_channel_.get(); +SharedChannel DrivenEngine::get_stdio_channel() { + return stdio_channel_; } util::LuaSourcePtr DrivenEngine::get_lua_source() { @@ -182,17 +175,17 @@ void DrivenEngine::rescan_lua_source() { void DrivenEngine::stop_driver() { stop_driver_ = true; + for (int i = 0; i < MAX_CHAN; i++) { + if (channels_[i] != nullptr) { + channels_[i]->stop_driver_ = true; + } + } } void DrivenEngine::drv_get_listen_ports(std::set &ports) { ports = listen_ports_; } -void DrivenEngine::drv_get_new_closed(std::set &channels) { - channels = std::move(new_closed_); - new_closed_.clear(); -} - void DrivenEngine::drv_get_new_outgoing(std::set &channels) { channels = std::move(new_outgoing_); new_outgoing_.clear(); @@ -206,6 +199,10 @@ bool DrivenEngine::drv_outgoing_empty(int chid) { return get_chid(chid)->sb_out_->empty(); } +bool DrivenEngine::drv_get_channel_released(int chid) { + return channels_[chid].use_count() == 1; +} + void DrivenEngine::drv_peek_outgoing(int chid, int *nbytes, const char **bytes) { return get_chid(chid)->peek_outgoing(nbytes, bytes); } @@ -229,11 +226,13 @@ void DrivenEngine::drv_notify_close(int chid, const std::string &err) { Channel *ch = get_chid(chid); ch->closed_ = true; ch->error_ = err; + channels_[chid].reset(); } int DrivenEngine::drv_notify_accept(int port) { int chid = find_unused_chid(); - accepted_channels_.emplace_back(new Channel(this, chid, port, "")); + channels_[chid] = std::make_shared(this, chid, port, "", stop_driver_); + accepted_channels_.push_back(channels_[chid]); return chid; } @@ -263,25 +262,15 @@ bool DrivenEngine::drv_get_stop_driver() { } DrivenEngine::DrivenEngine() { - for (int i = 0; i < MAX_CHAN; i++) { - channels_[i] = nullptr; - } next_unused_chid_ = 1; - stdio_channel_.reset(new Channel(this, 0, 0, "")); + stdio_channel_ = std::make_shared(this, 0, 0, "", false); + channels_[0] = stdio_channel_; rescan_lua_source_ = true; clock_ = 0.0; stop_driver_ = false; } -DrivenEngine::~DrivenEngine() { - // Delete the channels that we own. - stdio_channel_.reset(); - accepted_channels_.clear(); - // At this point, all channels should be gone. - for (int i = 0; i < MAX_CHAN; i++) { - assert(channels_[i] == nullptr); - } -} +DrivenEngine::~DrivenEngine() {} static DrivenEngine *engine_; diff --git a/luprex/core/cpp/drivenengine.hpp b/luprex/core/cpp/drivenengine.hpp index c403df68..dae1eaee 100644 --- a/luprex/core/cpp/drivenengine.hpp +++ b/luprex/core/cpp/drivenengine.hpp @@ -148,16 +148,18 @@ public: // void set_prompt(const std::string &prompt); - // You may delete any channel except for stdio. This closes - // the channel. + // Do not construct your own Channels. Instead, + // use methods of class DrivenEngine like new_outgoing_channel. + // Channels are referenced by shared_ptr. You can + // release your shared_ptr at any time. // - ~Channel(); + Channel(DrivenEngine *de, int chid, int port, const std::string &target, bool stop); + ~Channel() {}; private: // Constructor is deliberately private. Use // DrivenEngine::new_outgoing_channel to create outgoing socket channels. // - Channel(DrivenEngine *de, int chid, int port, const std::string &target); void feed_readline(int nbytes, const char *bytes); void peek_outgoing(int *nbytes, const char **bytes); @@ -166,7 +168,6 @@ private: private: static const int READLINE_MAX=512; - DrivenEngine *driven_; int chid_; std::unique_ptr sb_in_; std::unique_ptr sb_out_; @@ -174,6 +175,7 @@ private: bool closed_; std::string error_; std::string target_; + bool stop_driver_; // Readline stuff. std::string desired_command_; @@ -187,7 +189,7 @@ private: friend class DrivenEngine; }; -using UniqueChannel = std::unique_ptr; +using SharedChannel = std::shared_ptr; class DrivenEngine { public: @@ -229,7 +231,7 @@ public: // actually opening the connection and relaying data into the channel using // drv_get_target, drv_peek_outgoing, drv_sent_outgoing, drv_recv_incoming. // - UniqueChannel new_outgoing_channel(const std::string &target); + SharedChannel new_outgoing_channel(const std::string &target); // Create a new channel from any pending incoming connection. If there is no // incoming connection, returns nullptr. @@ -242,17 +244,16 @@ public: // using drv_get_target, drv_peek_outgoing, drv_sent_outgoing, // drv_recv_incoming. // - UniqueChannel new_incoming_channel(); + SharedChannel new_incoming_channel(); - // Obtain the stdio channel. There is only one stdio channel. It is owned - // by the DrivenEngine. It is an error to delete the stdio channel. + // Obtain the stdio channel. There is only one stdio channel. // // DRIVER: the stdio channel is created automatically when the DrivenEngine // is created. The driver is responsible for relaying data into the channel // using drv_get_target, drv_peek_outgoing, drv_sent_outgoing, // drv_recv_incoming. // - Channel *get_stdio_channel(); + SharedChannel get_stdio_channel(); // Obtain the output buffer of the stdio channel as an ostream. // @@ -292,13 +293,6 @@ public: // void drv_get_listen_ports(std::set &ports); - // Get a list of all recently-closed channels. The driver should - // discard all socket information associated with these channels. - // Caution: this may contain channels that the driver has never - // heard of. In that case, just ignore the close-request. - // - void drv_get_new_closed(std::set &opened); - // Get a list of all recently-opened channels that were created using // drv_new_outgoing_channel. The driver should initiate outgoing // connections for these channels. @@ -316,6 +310,12 @@ public: // bool drv_outgoing_empty(int chid); + // Return true if the user has released all references to this channel. + // In this case, the driver should initiate shutdown of the channel, + // and the driver should eventually call drv_notify_close. + // + bool drv_get_channel_released(int chid); + // Get a pointer to the bytes in the outgoing buffer. The pointer returned // here is naturally only valid until the buffer is changed. This function // is used for all channels, including sockets and stdio. @@ -415,11 +415,10 @@ private: Channel *get_chid(int chid); private: - Channel *channels_[MAX_CHAN]; + SharedChannel channels_[MAX_CHAN]; int next_unused_chid_; - UniqueChannel stdio_channel_; - std::vector accepted_channels_; - std::set new_closed_; + SharedChannel stdio_channel_; + std::vector accepted_channels_; std::set new_outgoing_; util::LuaSourcePtr lua_source_; std::set listen_ports_; diff --git a/luprex/core/cpp/driver-linux.cpp b/luprex/core/cpp/driver-linux.cpp index f8164f7d..16e52bf2 100644 --- a/luprex/core/cpp/driver-linux.cpp +++ b/luprex/core/cpp/driver-linux.cpp @@ -169,14 +169,15 @@ public: } } - void handle_new_closed_sockets() { - std::set chans; - driven_->drv_get_new_closed(chans); - for (int chid : chans) { - if (socket_[chid] != INVALID_SOCKET) { - assert(close(socket_[chid]) == 0); - socket_[chid] = INVALID_SOCKET; - connected_[chid] = false; + void handle_new_closed_sockets() { + for (int chid = 1; chid < MAX_CHAN; chid++) { + if (driven_->drv_get_channel_released(chid)) { + if (socket_[chid] != INVALID_SOCKET) { + assert(close(socket_[chid]) == 0); + socket_[chid] = INVALID_SOCKET; + connected_[chid] = false; + } + driven_->drv_notify_close(chid, ""); } } } diff --git a/luprex/core/cpp/driver-mingw.cpp b/luprex/core/cpp/driver-mingw.cpp index ddfdff31..5c5694a9 100644 --- a/luprex/core/cpp/driver-mingw.cpp +++ b/luprex/core/cpp/driver-mingw.cpp @@ -12,6 +12,33 @@ +class monotonic_clock { +public: + double freq_; + + monotonic_clock() { + LARGE_INTEGER x; + BOOL status = QueryPerformanceFrequency(&x); + assert(status != 0); + freq_ = 1.0 / double(x.QuadPart); + } + + double get() { + LARGE_INTEGER x; + BOOL status = QueryPerformanceCounter(&x); + assert(status != 0); + return double(x.QuadPart) * freq_; + } +); +#endif + +static monotonic_clock monoclock; + +namespace util { + double profiling_clock() { + return monoclock.get(); + } +} class Driver { public: diff --git a/luprex/core/cpp/drivertests.cpp b/luprex/core/cpp/drivertests.cpp index 5b152ee4..3518ca62 100644 --- a/luprex/core/cpp/drivertests.cpp +++ b/luprex/core/cpp/drivertests.cpp @@ -23,23 +23,23 @@ static void dump_lines(StreamBuffer *in, StreamBuffer *out, int chid) { // You can type lines and see them echoed. class DriverListenTest : public DrivenEngine { public: - std::vector channels_; + std::vector channels_; virtual void event_init(int argc, char *argv[]) { listen_port(8085); } virtual void event_update() { while (true) { - UniqueChannel ch = new_incoming_channel(); + SharedChannel ch = new_incoming_channel(); if (ch == nullptr) break; ch->set_readline(true); channels_.emplace_back(std::move(ch)); } - Channel *stdioch = get_stdio_channel(); + SharedChannel stdioch = get_stdio_channel(); dump_lines(stdioch->in(), stdioch->out(), 0); - std::vector keep; - for (UniqueChannel &ch : channels_) { + std::vector keep; + for (SharedChannel &ch : channels_) { dump_lines(ch->in(), stdioch->out(), ch->chid()); if (ch->closed()) { write_closed_message(ch.get(), stdioch->out()); @@ -55,18 +55,18 @@ public: // the output from the server. class DriverWebServerTest : public DrivenEngine { public: - std::vector channels_; + std::vector channels_; virtual void event_init(int argc, char *argv[]) { - UniqueChannel ch = new_outgoing_channel("stanford.edu:80"); + SharedChannel ch = new_outgoing_channel("stanford.edu:80"); ch->out()->write_bytes("GET http://stanford.edu/index.html HTTP/1.1\n\n"); channels_.emplace_back(std::move(ch)); } virtual void event_update() { - Channel *stdioch = get_stdio_channel(); + SharedChannel stdioch = get_stdio_channel(); dump_lines(stdioch->in(), stdioch->out(), 0); - std::vector keep; - for (UniqueChannel &ch : channels_) { + std::vector keep; + for (SharedChannel &ch : channels_) { dump_lines(ch->in(), stdioch->out(), ch->chid()); if (ch->closed()) { write_closed_message(ch.get(), stdioch->out()); @@ -81,18 +81,18 @@ public: // This test produces a DNS resolution failure. class DriverDNSFailTest : public DrivenEngine { public: - std::vector channels_; + std::vector channels_; virtual void event_init(int argc, char *argv[]) { - UniqueChannel ch = new_outgoing_channel("akjsdkajshdakjshd.alk:80"); + SharedChannel ch = new_outgoing_channel("akjsdkajshdakjshd.alk:80"); ch->out()->write_bytes("GET http://stanford.edu/index.html HTTP/1.1\n\n"); channels_.emplace_back(std::move(ch)); } virtual void event_update() { - Channel *stdioch = get_stdio_channel(); + SharedChannel stdioch = get_stdio_channel(); dump_lines(stdioch->in(), stdioch->out(), 0); - std::vector keep; - for (UniqueChannel &ch : channels_) { + std::vector keep; + for (SharedChannel &ch : channels_) { dump_lines(ch->in(), stdioch->out(), ch->chid()); if (ch->closed()) { write_closed_message(ch.get(), stdioch->out()); diff --git a/luprex/core/cpp/lpxclient.cpp b/luprex/core/cpp/lpxclient.cpp index e54353d7..3a45cd0a 100644 --- a/luprex/core/cpp/lpxclient.cpp +++ b/luprex/core/cpp/lpxclient.cpp @@ -14,7 +14,7 @@ public: UniqueWorld world_; int64_t actor_id_; InvocationQueue unack_; - UniqueChannel channel_; + SharedChannel channel_; LuaConsole console_; PrintChanneler print_channeler_; Gui gui_; diff --git a/luprex/core/cpp/lpxserver.cpp b/luprex/core/cpp/lpxserver.cpp index 5c2dc79d..c9896ee1 100644 --- a/luprex/core/cpp/lpxserver.cpp +++ b/luprex/core/cpp/lpxserver.cpp @@ -10,7 +10,7 @@ class Client { public: int64_t actor_id_; - UniqueChannel channel_; + SharedChannel channel_; UniqueWorld sync_; }; using UniqueClient = std::unique_ptr; @@ -151,7 +151,7 @@ public: // Check for new incoming channels, set up client structures. while (true) { - UniqueChannel chan = new_incoming_channel(); + SharedChannel chan = new_incoming_channel(); if (chan == nullptr) break; Client *client = new Client; client->actor_id_ = master_->create_login_actor();