From 860b6f60b10403f4a78d3acff87278114fb0814e Mon Sep 17 00:00:00 2001 From: jyelon Date: Thu, 6 Jan 2022 18:33:09 -0500 Subject: [PATCH] refactor driver-linux to use a channel list --- luprex/core/cpp/driver-linux.cpp | 161 +++++++++++++++++-------------- 1 file changed, 90 insertions(+), 71 deletions(-) diff --git a/luprex/core/cpp/driver-linux.cpp b/luprex/core/cpp/driver-linux.cpp index d11e9ac8..b6701174 100644 --- a/luprex/core/cpp/driver-linux.cpp +++ b/luprex/core/cpp/driver-linux.cpp @@ -12,13 +12,15 @@ #include #include #include +#include #include #include #include #include using SOCKET=int; const int INVALID_SOCKET = -1; - +using SocketVector = std::vector; +using PollVector = std::vector; struct termios orig_termios; @@ -117,6 +119,27 @@ SOCKET listen_on_port(int port, std::string &err) { return sock; } +SocketVector accept_on_socket(SOCKET listen_socket) { + SocketVector result; + while (true) { + SOCKET chsock = accept(listen_socket, nullptr, nullptr); + if (chsock >= 0) { + result.push_back(chsock); + } else { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { + // Normal completion - we're out of incoming sockets. + return result; + } else if (errno == ECONNABORTED) { + // The remote disconnected before we had a chance to accept. + // Just pretend it never happened. + } else { + // Unexpected error. + assert(false); + } + } + } +} + class MonoClock { private: struct timespec base_; @@ -151,23 +174,21 @@ public: CHAN_OPEN, }; struct ChanInfo { + int chid; ChanState state; SOCKET socket; }; - static const int MAX_CHAN = DrivenEngine::MAX_CHAN; DrivenEngine *driven_; - ChanInfo channels_[MAX_CHAN]; + std::vector chans_; + bool any_inactive_; bool short_sleep_; std::map listen_sockets_; std::unique_ptr chbuf; void init(DrivenEngine *de) { driven_ = de; - for (int i = 0; i < MAX_CHAN; i++) { - channels_[i].state = CHAN_INACTIVE; - channels_[i].socket = INVALID_SOCKET; - } + any_inactive_ = false; short_sleep_ = false; chbuf.reset(new char[65536]); } @@ -193,39 +214,62 @@ public: } } - void handle_new_closed_sockets() { - for (int chid = 1; chid < MAX_CHAN; chid++) { - if (driven_->drv_get_channel_released(chid)) { - if (channels_[chid].state != CHAN_INACTIVE) { - assert(close(channels_[chid].socket) == 0); - channels_[chid].state = CHAN_INACTIVE; - channels_[chid].socket = INVALID_SOCKET; + void close_channel(ChanInfo &chan, const std::string &err) { + assert(chan.state != CHAN_INACTIVE); + assert(close(chan.socket) == 0); + driven_->drv_notify_close(chan.chid, err); + chan.state = CHAN_INACTIVE; + chan.socket = INVALID_SOCKET; + chan.chid = -1; + any_inactive_ = true; + short_sleep_ = true; + } + + void cleanup_channels() { + if (any_inactive_) { + for (int i = 0; i < int(chans_.size()); ) { + if (chans_[i].state == CHAN_INACTIVE) { + chans_[i] = chans_.back(); + chans_.pop_back(); + } else { + i += 1; } - driven_->drv_notify_close(chid, ""); + } + any_inactive_ = false; + } + } + + void handle_released_channels() { + for (ChanInfo &chan : chans_) { + if (driven_->drv_get_channel_released(chan.chid)) { + close_channel(chan, ""); } } + cleanup_channels(); } void handle_new_outgoing_sockets() { std::set chans; driven_->drv_get_new_outgoing(chans); for (int chid : chans) { - assert(channels_[chid].state == CHAN_INACTIVE); std::string err; SOCKET sock = open_connection(driven_->drv_get_target(chid), err); if (sock == INVALID_SOCKET) { driven_->drv_notify_close(chid, err); short_sleep_ = true; } else { - channels_[chid].state = CHAN_CONNECTING; - channels_[chid].socket = sock; + ChanInfo newchan; + newchan.chid = chid; + newchan.state = CHAN_CONNECTING; + newchan.socket = sock; + chans_.push_back(newchan); } } } void handle_console_output() { - int nbytes; const char *bytes; while (true) { + int nbytes; const char *bytes; driven_->drv_peek_outgoing(0, &nbytes, &bytes); if (nbytes == 0) break; int nwrote = write(1, bytes, nbytes); @@ -235,51 +279,25 @@ public: } void handle_console_input() { - char buffer[32]; + char buffer[256]; while (true) { - int nread = read(0, buffer, 32); + int nread = read(0, buffer, 256); if (nread == 0) break; assert(nread > 0); driven_->drv_recv_incoming(0, nread, buffer); } } - void handle_clock() { - driven_->drv_set_clock(monoclock.get()); - } - - void close_channel(int chid, const std::string err) { - assert(channels_[chid].state != CHAN_INACTIVE); - assert(close(channels_[chid].socket) == 0); - driven_->drv_notify_close(chid, err); - channels_[chid].state = CHAN_INACTIVE; - channels_[chid].socket = INVALID_SOCKET; - short_sleep_ = true; - } - void accept_connections(int port, SOCKET sock) { - while (true) { - SOCKET chsock = accept(sock, nullptr, nullptr); - if (chsock > 0) { - int chid = driven_->drv_notify_accept(port); - assert(channels_[chid].state == CHAN_INACTIVE); - channels_[chid].state = CHAN_OPEN; - channels_[chid].socket = chsock; - short_sleep_ = true; - continue; - } - if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { - return; - } - if (errno == ECONNABORTED) { - // The remote disconnected before we had a chance to accept. - // Just pretend it never happened. - continue; - } - // If a listening port fails in a non-transient way, - // we don't really have any good way of handling - // that. - assert(false); + SocketVector sockets = accept_on_socket(sock); + for (SOCKET sock : sockets) { + int chid = driven_->drv_notify_accept(port); + ChanInfo newchan; + newchan.chid = chid; + newchan.state = CHAN_OPEN; + newchan.socket = sock; + chans_.push_back(newchan); + short_sleep_ = true; } } @@ -293,12 +311,12 @@ public: FD_SET(p.second, &efds); if (p.second > largest) largest = p.second; } - for (int chid = 1; chid < MAX_CHAN; chid++) { - SOCKET sock = channels_[chid].socket; + for (const ChanInfo &chan : chans_) { + SOCKET sock = chan.socket; if (sock == INVALID_SOCKET) continue; FD_SET(sock, &rfds); FD_SET(sock, &efds); - if (!driven_->drv_outgoing_empty(chid)) { + if (!driven_->drv_outgoing_empty(chan.chid)) { FD_SET(sock, &wfds); } if (sock > largest) largest = sock; @@ -322,34 +340,35 @@ public: } } - for (int chid = 1; chid < MAX_CHAN; chid++) { - ChanInfo *chan = channels_ + chid; - SOCKET sock = chan->socket; + for (ChanInfo &chan : chans_) { + SOCKET sock = chan.socket; if (sock == INVALID_SOCKET) continue; if (FD_ISSET(sock, &wfds)) { - chan->state = CHAN_OPEN; - driven_->drv_peek_outgoing(chid, &nbytes, &bytes); + chan.state = CHAN_OPEN; + driven_->drv_peek_outgoing(chan.chid, &nbytes, &bytes); if (nbytes > 0) { int wbytes = send(sock, bytes, nbytes, 0); if (wbytes < 0) { - close_channel(chid, "send failure"); + close_channel(chan, "send failure"); continue; } else { - driven_->drv_sent_outgoing(chid, wbytes); + driven_->drv_sent_outgoing(chan.chid, wbytes); } } } if (FD_ISSET(sock, &rfds) || FD_ISSET(sock, &efds)) { + // Someday, find a way to avoid this copy. int nrecv = recv(sock, chbuf.get(), 65536, 0); if (nrecv <= 0) { - close_channel(chid, "recv failure"); + close_channel(chan, "recv failure"); continue; } else { - driven_->drv_recv_incoming(chid, nrecv, chbuf.get()); + driven_->drv_recv_incoming(chan.chid, nrecv, chbuf.get()); short_sleep_ = true; } } } + cleanup_channels(); } void drive(DrivenEngine *de, int argc, char *argv[]) { @@ -362,14 +381,14 @@ public: while (!de->drv_get_stop_driver()) { short_sleep_ = false; handle_lua_source(); - handle_new_closed_sockets(); - handle_new_outgoing_sockets(); handle_console_output(); handle_console_input(); handle_console_output(); + handle_released_channels(); + handle_new_outgoing_sockets(); int mstimeout = short_sleep_ ? 0 : 100; handle_socket_input_output(mstimeout); - handle_clock(); + driven_->drv_set_clock(monoclock.get()); de->drv_invoke_event_update(); } DrivenEngine::set(nullptr);