diff --git a/luprex/core/cpp/driver-common.cpp b/luprex/core/cpp/driver-common.cpp index 87c61565..32adb630 100644 --- a/luprex/core/cpp/driver-common.cpp +++ b/luprex/core/cpp/driver-common.cpp @@ -17,7 +17,8 @@ static void if_error_print_and_exit(const std::string &str) { } } -static std::string_view read_file(const char *fn, char *buf, int bufsize, std::string &err) { +static std::string_view read_file(const char *fn, char *buf, int bufsize, + std::string &err) { FILE *f = fopen(fn, "r"); if (f == 0) { err = std::string("cannot read file") + fn; @@ -55,13 +56,13 @@ class Driver { ChanState state; int nbytes; const char *bytes; - bool released; - bool just_released; bool ready_now; bool ready_on_pollin; bool ready_on_pollout; bool ready_on_outgoing; int last_write_nbytes; + + bool marked_for_deletion() const { return state == CHAN_INACTIVE; } }; std::vector chans_; @@ -124,8 +125,6 @@ class Driver { chan.chid = -1; chan.nbytes = 0; chan.bytes = 0; - chan.released = false; - chan.just_released = false; chan.ready_now = false; chan.ready_on_pollin = false; chan.ready_on_pollout = false; @@ -133,17 +132,6 @@ class Driver { chan.last_write_nbytes = 0; } - void cleanup_channels() { - for (int i = 0; i < int(chans_.size());) { - if (chans_[i].state == CHAN_INACTIVE) { - chans_[i] = chans_.back(); - chans_.pop_back(); - } else { - i += 1; - } - } - } - void handle_console_output() { while (true) { std::string_view s = recorder_.drv_peek_outgoing(0); @@ -173,8 +161,6 @@ class Driver { newchan.state = state; newchan.nbytes = 0; newchan.bytes = 0; - newchan.released = false; - newchan.just_released = false; newchan.ready_now = false; newchan.ready_on_pollin = false; newchan.ready_on_pollout = true; @@ -235,13 +221,6 @@ class Driver { void advance_plaintext(ChanInfo &chan) { std::string err; - // If the channel has no outgoing bytes and has been released, - // just close it. - if (chan.released) { - close_channel(chan, ""); - return; - } - // Try to write plaintext to the channel. std::string_view s = recorder_.drv_peek_outgoing(chan.chid); if (s.size() > 0) { @@ -376,17 +355,24 @@ class Driver { int mstimeout = read_console_recently_ ? 100 : 1000; // Peek output buffers and determine channel release flags. + bool any_released = false; for (ChanInfo &chan : chans_) { std::string_view s = recorder_.drv_peek_outgoing(chan.chid); chan.nbytes = s.size(); chan.bytes = s.data(); - chan.just_released = false; - if ((chan.nbytes == 0) && (!chan.released)) { - chan.released = recorder_.drv_get_channel_released(chan.chid); - chan.just_released = chan.released; + if (chan.nbytes == 0) { + if (recorder_.drv_get_channel_released(chan.chid)) { + close_channel(chan, ""); + any_released = true; + } } } + // Delete any released channels + if (any_released) { + util::remove_marked_items(chans_); + } + // Construct the struct pollfd vector. int pollsize = 0; for (const auto &p : listen_sockets_) { @@ -402,13 +388,12 @@ class Driver { pfd.events = 0; pfd.revents = 0; if (chan.ready_now) mstimeout = 0; - if (chan.just_released) mstimeout = 0; if (chan.ready_on_pollin) pfd.events |= POLLIN; if (chan.ready_on_pollout) pfd.events |= POLLOUT; if (chan.ready_on_outgoing && (chan.nbytes > 0)) pfd.events |= POLLOUT; - // std::cerr << "evt=" << pfd.events << ".nb=" << chan.nbytes << " - // "; + // std::cerr << "evt=" << pfd.events << ".nb=" << chan.nbytes << + // std::endl; } // Do the poll. @@ -430,7 +415,7 @@ class Driver { bool pollin = ((pfd.revents & POLLIN) != 0); bool pollout = ((pfd.revents & POLLOUT) != 0); bool pollerr = ((pfd.revents & (POLLERR | POLLHUP)) != 0); - if (chan.ready_now || pollerr || chan.just_released || + if (chan.ready_now || pollerr || (chan.ready_on_pollin && pollin) || (chan.ready_on_pollout && pollout) || (chan.ready_on_outgoing && (chan.nbytes > 0) && pollout)) { @@ -445,7 +430,7 @@ class Driver { } // Delete any newly-inactive channels - cleanup_channels(); + util::remove_marked_items(chans_); } int replay_logfile(const char *fn, bool verbose) { diff --git a/luprex/core/cpp/http.hpp b/luprex/core/cpp/http.hpp index b39cdbde..dc46e5a9 100644 --- a/luprex/core/cpp/http.hpp +++ b/luprex/core/cpp/http.hpp @@ -269,6 +269,10 @@ public: // HttpParser(); + // Get the parsed status. + // + int status() const { return status_; } + // Store the parsed fields into a lua table. // void store(LuaStack &LS, LuaSlot tab) const; @@ -318,17 +322,19 @@ public: using HttpParserVec = eng::vector; -// This class associates an HTTP request to an actual communication -// channel that is executing that request. - -class HttpClientChannel { +// This class is used by LpxServer to store the +// incoming and outgoing http channels. +// +class HttpChannel { public: SharedChannel channel_; int64_t parsed_bytes_; - HttpClientChannel() : parsed_bytes_(0) {} + bool marked_for_deletion() const { return channel_ == nullptr; } + HttpChannel() : parsed_bytes_(0) {} }; -using HttpClientChannelMap = eng::map; +using HttpChannelMap = eng::map; +using HttpChannelVec = eng::vector; #endif // HTTP_HPP diff --git a/luprex/core/cpp/lpxserver.cpp b/luprex/core/cpp/lpxserver.cpp index 8867222c..4d555f80 100644 --- a/luprex/core/cpp/lpxserver.cpp +++ b/luprex/core/cpp/lpxserver.cpp @@ -25,7 +25,8 @@ public: LuaConsole console_; ClientVector clients_; PrintChanneler print_channeler_; - HttpClientChannelMap http_client_channels_; + HttpChannelMap http_client_channels_; + HttpChannelVec http_server_channels_; int64_t admin_id_; Gui gui_; @@ -43,9 +44,12 @@ public: // Print out admin ID for debugging purposes. stdostream() << "Admin actor id = " << admin_id_ << std::endl; - // Enable listening on port 8085. + // Enable listening on port 8085 (client connections) listen_port(8085); + // Enable listening on port 8080 (http server connections) + listen_port(8080); + // Set the console prompt. get_stdio_channel()->set_prompt(console_.get_prompt()); } @@ -181,14 +185,21 @@ public: while (true) { SharedChannel chan = new_incoming_channel(); if (chan == nullptr) break; - Client *client = new Client; - client->actor_id_ = master_->create_login_actor(); - client->channel_ = std::move(chan); - client->sync_.reset(new World(util::WORLD_TYPE_S_SYNC)); - client->sync_->create_login_actor(); - clients_.emplace_back(client); - stdostream() << "New client: actor id=" << client->actor_id_ << std::endl; - send_diffs(clients_.back()); + if (chan->port() == 8085) { + Client *client = new Client; + client->actor_id_ = master_->create_login_actor(); + client->channel_ = std::move(chan); + client->sync_.reset(new World(util::WORLD_TYPE_S_SYNC)); + client->sync_->create_login_actor(); + clients_.emplace_back(client); + stdostream() << "New client: actor id=" << client->actor_id_ << std::endl; + send_diffs(clients_.back()); + } else if (chan->port() == 8080) { + HttpChannel htchan; + htchan.channel_ = chan; + http_server_channels_.push_back(htchan); + stdostream() << "Http Server got new client " << chan->chid() << std::endl; + } } // Traverse all existing channels, process any communication. @@ -206,7 +217,7 @@ public: // Look for new outgoing HTTP client requests. for (const auto &pair : master_->http_requests()) { const HttpClientRequest &request = pair.second; - HttpClientChannel &channel = http_client_channels_[request.request_id()]; + HttpChannel &channel = http_client_channels_[request.request_id()]; if (channel.channel_ == nullptr) { channel.channel_ = new_outgoing_channel(request.target()); channel.parsed_bytes_ = 0; @@ -217,7 +228,7 @@ public: // Maintain existing outgoing HTTP client requests. HttpParserVec http_responses; for (auto &pair : http_client_channels_) { - HttpClientChannel &htchan = pair.second; + HttpChannel &htchan = pair.second; Channel &channel = *htchan.channel_; if (channel.closed() || (channel.in()->fill() > htchan.parsed_bytes_)) { HttpParser response; @@ -237,6 +248,22 @@ public: http_client_channels_.erase(response.request_id()); } master_->http_responses(http_responses); + + // Maintain incoming HTTP server channels. + for (HttpChannel &htchan : http_server_channels_) { + SharedChannel &chan = htchan.channel_; + if (chan->in()->fill() > htchan.parsed_bytes_) { + HttpParser parser; + parser.parse_request(chan->in()->view(), chan->closed()); + htchan.parsed_bytes_ = chan->in()->fill(); + if (parser.complete()) { + StreamBuffer *sb = chan->out(); + sb->ostream() << "HTTP/1.1 200 OK\n\n"; + htchan.channel_ = nullptr; + } + } + } + util::remove_marked_items(http_server_channels_); } }; diff --git a/luprex/core/cpp/util.hpp b/luprex/core/cpp/util.hpp index 64c42ca5..bbf57a51 100644 --- a/luprex/core/cpp/util.hpp +++ b/luprex/core/cpp/util.hpp @@ -221,11 +221,17 @@ double distance_squared(double x1, double y1, double x2, double y2); // Make a LuaSourceVec with one element, for unit testing. LuaSourcePtr make_lua_source(const eng::string &code); -// Remove nullptrs from a vector of unique pointers. +// Remove items from a vector that are nullptr. template -void remove_nullptrs(eng::vector> &vec) { - std::unique_ptr nullp; - auto iter = std::remove(vec.begin(), vec.end(), nullp); +void remove_nullptrs(T &vec) { + auto iter = std::partition(vec.begin(), vec.end(), [] (const auto &x) { return x != nullptr; }); + vec.erase(iter, vec.end()); +} + +// Remove items from a vector that are marked for deletion. +template +void remove_marked_items(T &vec) { + auto iter = std::partition(vec.begin(), vec.end(), [] (const auto &x) { return !x.marked_for_deletion(); }); vec.erase(iter, vec.end()); }