HTTP server networking side, also, fix the releasing of channels in driver

This commit is contained in:
2022-05-17 15:00:20 -04:00
parent 1ac1ab9420
commit cd3064eb05
4 changed files with 80 additions and 56 deletions

View File

@@ -17,7 +17,8 @@ static void if_error_print_and_exit(const std::string &str) {
} }
} }
static std::string_view read_file(const char *fn, char *buf, int bufsize, std::string &err) { static std::string_view read_file(const char *fn, char *buf, int bufsize,
std::string &err) {
FILE *f = fopen(fn, "r"); FILE *f = fopen(fn, "r");
if (f == 0) { if (f == 0) {
err = std::string("cannot read file") + fn; err = std::string("cannot read file") + fn;
@@ -55,13 +56,13 @@ class Driver {
ChanState state; ChanState state;
int nbytes; int nbytes;
const char *bytes; const char *bytes;
bool released;
bool just_released;
bool ready_now; bool ready_now;
bool ready_on_pollin; bool ready_on_pollin;
bool ready_on_pollout; bool ready_on_pollout;
bool ready_on_outgoing; bool ready_on_outgoing;
int last_write_nbytes; int last_write_nbytes;
bool marked_for_deletion() const { return state == CHAN_INACTIVE; }
}; };
std::vector<ChanInfo> chans_; std::vector<ChanInfo> chans_;
@@ -124,8 +125,6 @@ class Driver {
chan.chid = -1; chan.chid = -1;
chan.nbytes = 0; chan.nbytes = 0;
chan.bytes = 0; chan.bytes = 0;
chan.released = false;
chan.just_released = false;
chan.ready_now = false; chan.ready_now = false;
chan.ready_on_pollin = false; chan.ready_on_pollin = false;
chan.ready_on_pollout = false; chan.ready_on_pollout = false;
@@ -133,17 +132,6 @@ class Driver {
chan.last_write_nbytes = 0; chan.last_write_nbytes = 0;
} }
void cleanup_channels() {
for (int i = 0; i < int(chans_.size());) {
if (chans_[i].state == CHAN_INACTIVE) {
chans_[i] = chans_.back();
chans_.pop_back();
} else {
i += 1;
}
}
}
void handle_console_output() { void handle_console_output() {
while (true) { while (true) {
std::string_view s = recorder_.drv_peek_outgoing(0); std::string_view s = recorder_.drv_peek_outgoing(0);
@@ -173,8 +161,6 @@ class Driver {
newchan.state = state; newchan.state = state;
newchan.nbytes = 0; newchan.nbytes = 0;
newchan.bytes = 0; newchan.bytes = 0;
newchan.released = false;
newchan.just_released = false;
newchan.ready_now = false; newchan.ready_now = false;
newchan.ready_on_pollin = false; newchan.ready_on_pollin = false;
newchan.ready_on_pollout = true; newchan.ready_on_pollout = true;
@@ -235,13 +221,6 @@ class Driver {
void advance_plaintext(ChanInfo &chan) { void advance_plaintext(ChanInfo &chan) {
std::string err; std::string err;
// If the channel has no outgoing bytes and has been released,
// just close it.
if (chan.released) {
close_channel(chan, "");
return;
}
// Try to write plaintext to the channel. // Try to write plaintext to the channel.
std::string_view s = recorder_.drv_peek_outgoing(chan.chid); std::string_view s = recorder_.drv_peek_outgoing(chan.chid);
if (s.size() > 0) { if (s.size() > 0) {
@@ -376,17 +355,24 @@ class Driver {
int mstimeout = read_console_recently_ ? 100 : 1000; int mstimeout = read_console_recently_ ? 100 : 1000;
// Peek output buffers and determine channel release flags. // Peek output buffers and determine channel release flags.
bool any_released = false;
for (ChanInfo &chan : chans_) { for (ChanInfo &chan : chans_) {
std::string_view s = recorder_.drv_peek_outgoing(chan.chid); std::string_view s = recorder_.drv_peek_outgoing(chan.chid);
chan.nbytes = s.size(); chan.nbytes = s.size();
chan.bytes = s.data(); chan.bytes = s.data();
chan.just_released = false; if (chan.nbytes == 0) {
if ((chan.nbytes == 0) && (!chan.released)) { if (recorder_.drv_get_channel_released(chan.chid)) {
chan.released = recorder_.drv_get_channel_released(chan.chid); close_channel(chan, "");
chan.just_released = chan.released; any_released = true;
}
} }
} }
// Delete any released channels
if (any_released) {
util::remove_marked_items(chans_);
}
// Construct the struct pollfd vector. // Construct the struct pollfd vector.
int pollsize = 0; int pollsize = 0;
for (const auto &p : listen_sockets_) { for (const auto &p : listen_sockets_) {
@@ -402,13 +388,12 @@ class Driver {
pfd.events = 0; pfd.events = 0;
pfd.revents = 0; pfd.revents = 0;
if (chan.ready_now) mstimeout = 0; if (chan.ready_now) mstimeout = 0;
if (chan.just_released) mstimeout = 0;
if (chan.ready_on_pollin) pfd.events |= POLLIN; if (chan.ready_on_pollin) pfd.events |= POLLIN;
if (chan.ready_on_pollout) pfd.events |= POLLOUT; if (chan.ready_on_pollout) pfd.events |= POLLOUT;
if (chan.ready_on_outgoing && (chan.nbytes > 0)) if (chan.ready_on_outgoing && (chan.nbytes > 0))
pfd.events |= POLLOUT; pfd.events |= POLLOUT;
// std::cerr << "evt=" << pfd.events << ".nb=" << chan.nbytes << " // std::cerr << "evt=" << pfd.events << ".nb=" << chan.nbytes <<
// "; // std::endl;
} }
// Do the poll. // Do the poll.
@@ -430,7 +415,7 @@ class Driver {
bool pollin = ((pfd.revents & POLLIN) != 0); bool pollin = ((pfd.revents & POLLIN) != 0);
bool pollout = ((pfd.revents & POLLOUT) != 0); bool pollout = ((pfd.revents & POLLOUT) != 0);
bool pollerr = ((pfd.revents & (POLLERR | POLLHUP)) != 0); bool pollerr = ((pfd.revents & (POLLERR | POLLHUP)) != 0);
if (chan.ready_now || pollerr || chan.just_released || if (chan.ready_now || pollerr ||
(chan.ready_on_pollin && pollin) || (chan.ready_on_pollin && pollin) ||
(chan.ready_on_pollout && pollout) || (chan.ready_on_pollout && pollout) ||
(chan.ready_on_outgoing && (chan.nbytes > 0) && pollout)) { (chan.ready_on_outgoing && (chan.nbytes > 0) && pollout)) {
@@ -445,7 +430,7 @@ class Driver {
} }
// Delete any newly-inactive channels // Delete any newly-inactive channels
cleanup_channels(); util::remove_marked_items(chans_);
} }
int replay_logfile(const char *fn, bool verbose) { int replay_logfile(const char *fn, bool verbose) {

View File

@@ -269,6 +269,10 @@ public:
// //
HttpParser(); HttpParser();
// Get the parsed status.
//
int status() const { return status_; }
// Store the parsed fields into a lua table. // Store the parsed fields into a lua table.
// //
void store(LuaStack &LS, LuaSlot tab) const; void store(LuaStack &LS, LuaSlot tab) const;
@@ -318,17 +322,19 @@ public:
using HttpParserVec = eng::vector<HttpParser>; using HttpParserVec = eng::vector<HttpParser>;
// This class associates an HTTP request to an actual communication // This class is used by LpxServer to store the
// channel that is executing that request. // incoming and outgoing http channels.
//
class HttpClientChannel { class HttpChannel {
public: public:
SharedChannel channel_; SharedChannel channel_;
int64_t parsed_bytes_; int64_t parsed_bytes_;
HttpClientChannel() : parsed_bytes_(0) {} bool marked_for_deletion() const { return channel_ == nullptr; }
HttpChannel() : parsed_bytes_(0) {}
}; };
using HttpClientChannelMap = eng::map<int64_t, HttpClientChannel>; using HttpChannelMap = eng::map<int64_t, HttpChannel>;
using HttpChannelVec = eng::vector<HttpChannel>;
#endif // HTTP_HPP #endif // HTTP_HPP

View File

@@ -25,7 +25,8 @@ public:
LuaConsole console_; LuaConsole console_;
ClientVector clients_; ClientVector clients_;
PrintChanneler print_channeler_; PrintChanneler print_channeler_;
HttpClientChannelMap http_client_channels_; HttpChannelMap http_client_channels_;
HttpChannelVec http_server_channels_;
int64_t admin_id_; int64_t admin_id_;
Gui gui_; Gui gui_;
@@ -43,9 +44,12 @@ public:
// Print out admin ID for debugging purposes. // Print out admin ID for debugging purposes.
stdostream() << "Admin actor id = " << admin_id_ << std::endl; stdostream() << "Admin actor id = " << admin_id_ << std::endl;
// Enable listening on port 8085. // Enable listening on port 8085 (client connections)
listen_port(8085); listen_port(8085);
// Enable listening on port 8080 (http server connections)
listen_port(8080);
// Set the console prompt. // Set the console prompt.
get_stdio_channel()->set_prompt(console_.get_prompt()); get_stdio_channel()->set_prompt(console_.get_prompt());
} }
@@ -181,14 +185,21 @@ public:
while (true) { while (true) {
SharedChannel chan = new_incoming_channel(); SharedChannel chan = new_incoming_channel();
if (chan == nullptr) break; if (chan == nullptr) break;
Client *client = new Client; if (chan->port() == 8085) {
client->actor_id_ = master_->create_login_actor(); Client *client = new Client;
client->channel_ = std::move(chan); client->actor_id_ = master_->create_login_actor();
client->sync_.reset(new World(util::WORLD_TYPE_S_SYNC)); client->channel_ = std::move(chan);
client->sync_->create_login_actor(); client->sync_.reset(new World(util::WORLD_TYPE_S_SYNC));
clients_.emplace_back(client); client->sync_->create_login_actor();
stdostream() << "New client: actor id=" << client->actor_id_ << std::endl; clients_.emplace_back(client);
send_diffs(clients_.back()); stdostream() << "New client: actor id=" << client->actor_id_ << std::endl;
send_diffs(clients_.back());
} else if (chan->port() == 8080) {
HttpChannel htchan;
htchan.channel_ = chan;
http_server_channels_.push_back(htchan);
stdostream() << "Http Server got new client " << chan->chid() << std::endl;
}
} }
// Traverse all existing channels, process any communication. // Traverse all existing channels, process any communication.
@@ -206,7 +217,7 @@ public:
// Look for new outgoing HTTP client requests. // Look for new outgoing HTTP client requests.
for (const auto &pair : master_->http_requests()) { for (const auto &pair : master_->http_requests()) {
const HttpClientRequest &request = pair.second; const HttpClientRequest &request = pair.second;
HttpClientChannel &channel = http_client_channels_[request.request_id()]; HttpChannel &channel = http_client_channels_[request.request_id()];
if (channel.channel_ == nullptr) { if (channel.channel_ == nullptr) {
channel.channel_ = new_outgoing_channel(request.target()); channel.channel_ = new_outgoing_channel(request.target());
channel.parsed_bytes_ = 0; channel.parsed_bytes_ = 0;
@@ -217,7 +228,7 @@ public:
// Maintain existing outgoing HTTP client requests. // Maintain existing outgoing HTTP client requests.
HttpParserVec http_responses; HttpParserVec http_responses;
for (auto &pair : http_client_channels_) { for (auto &pair : http_client_channels_) {
HttpClientChannel &htchan = pair.second; HttpChannel &htchan = pair.second;
Channel &channel = *htchan.channel_; Channel &channel = *htchan.channel_;
if (channel.closed() || (channel.in()->fill() > htchan.parsed_bytes_)) { if (channel.closed() || (channel.in()->fill() > htchan.parsed_bytes_)) {
HttpParser response; HttpParser response;
@@ -237,6 +248,22 @@ public:
http_client_channels_.erase(response.request_id()); http_client_channels_.erase(response.request_id());
} }
master_->http_responses(http_responses); master_->http_responses(http_responses);
// Maintain incoming HTTP server channels.
for (HttpChannel &htchan : http_server_channels_) {
SharedChannel &chan = htchan.channel_;
if (chan->in()->fill() > htchan.parsed_bytes_) {
HttpParser parser;
parser.parse_request(chan->in()->view(), chan->closed());
htchan.parsed_bytes_ = chan->in()->fill();
if (parser.complete()) {
StreamBuffer *sb = chan->out();
sb->ostream() << "HTTP/1.1 200 OK\n\n";
htchan.channel_ = nullptr;
}
}
}
util::remove_marked_items(http_server_channels_);
} }
}; };

View File

@@ -221,11 +221,17 @@ double distance_squared(double x1, double y1, double x2, double y2);
// Make a LuaSourceVec with one element, for unit testing. // Make a LuaSourceVec with one element, for unit testing.
LuaSourcePtr make_lua_source(const eng::string &code); LuaSourcePtr make_lua_source(const eng::string &code);
// Remove nullptrs from a vector of unique pointers. // Remove items from a vector that are nullptr.
template<class T> template<class T>
void remove_nullptrs(eng::vector<std::unique_ptr<T>> &vec) { void remove_nullptrs(T &vec) {
std::unique_ptr<T> nullp; auto iter = std::partition(vec.begin(), vec.end(), [] (const auto &x) { return x != nullptr; });
auto iter = std::remove(vec.begin(), vec.end(), nullp); vec.erase(iter, vec.end());
}
// Remove items from a vector that are marked for deletion.
template<class T>
void remove_marked_items(T &vec) {
auto iter = std::partition(vec.begin(), vec.end(), [] (const auto &x) { return !x.marked_for_deletion(); });
vec.erase(iter, vec.end()); vec.erase(iter, vec.end());
} }