refactor driver-linux to use a channel list

This commit is contained in:
2022-01-06 18:33:09 -05:00
parent 9c00853eeb
commit 860b6f60b1

View File

@@ -12,13 +12,15 @@
#include <termios.h> #include <termios.h>
#include <unistd.h> #include <unistd.h>
#include <sys/select.h> #include <sys/select.h>
#include <sys/poll.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <sys/types.h> #include <sys/types.h>
#include <netdb.h> #include <netdb.h>
using SOCKET=int; using SOCKET=int;
const int INVALID_SOCKET = -1; const int INVALID_SOCKET = -1;
using SocketVector = std::vector<SOCKET>;
using PollVector = std::vector<struct pollfd>;
struct termios orig_termios; struct termios orig_termios;
@@ -117,6 +119,27 @@ SOCKET listen_on_port(int port, std::string &err) {
return sock; return sock;
} }
SocketVector accept_on_socket(SOCKET listen_socket) {
SocketVector result;
while (true) {
SOCKET chsock = accept(listen_socket, nullptr, nullptr);
if (chsock >= 0) {
result.push_back(chsock);
} else {
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
// Normal completion - we're out of incoming sockets.
return result;
} else if (errno == ECONNABORTED) {
// The remote disconnected before we had a chance to accept.
// Just pretend it never happened.
} else {
// Unexpected error.
assert(false);
}
}
}
}
class MonoClock { class MonoClock {
private: private:
struct timespec base_; struct timespec base_;
@@ -151,23 +174,21 @@ public:
CHAN_OPEN, CHAN_OPEN,
}; };
struct ChanInfo { struct ChanInfo {
int chid;
ChanState state; ChanState state;
SOCKET socket; SOCKET socket;
}; };
static const int MAX_CHAN = DrivenEngine::MAX_CHAN;
DrivenEngine *driven_; DrivenEngine *driven_;
ChanInfo channels_[MAX_CHAN]; std::vector<ChanInfo> chans_;
bool any_inactive_;
bool short_sleep_; bool short_sleep_;
std::map<int, SOCKET> listen_sockets_; std::map<int, SOCKET> listen_sockets_;
std::unique_ptr<char[]> chbuf; std::unique_ptr<char[]> chbuf;
void init(DrivenEngine *de) { void init(DrivenEngine *de) {
driven_ = de; driven_ = de;
for (int i = 0; i < MAX_CHAN; i++) { any_inactive_ = false;
channels_[i].state = CHAN_INACTIVE;
channels_[i].socket = INVALID_SOCKET;
}
short_sleep_ = false; short_sleep_ = false;
chbuf.reset(new char[65536]); chbuf.reset(new char[65536]);
} }
@@ -193,39 +214,62 @@ public:
} }
} }
void handle_new_closed_sockets() { void close_channel(ChanInfo &chan, const std::string &err) {
for (int chid = 1; chid < MAX_CHAN; chid++) { assert(chan.state != CHAN_INACTIVE);
if (driven_->drv_get_channel_released(chid)) { assert(close(chan.socket) == 0);
if (channels_[chid].state != CHAN_INACTIVE) { driven_->drv_notify_close(chan.chid, err);
assert(close(channels_[chid].socket) == 0); chan.state = CHAN_INACTIVE;
channels_[chid].state = CHAN_INACTIVE; chan.socket = INVALID_SOCKET;
channels_[chid].socket = INVALID_SOCKET; chan.chid = -1;
any_inactive_ = true;
short_sleep_ = true;
} }
driven_->drv_notify_close(chid, "");
void cleanup_channels() {
if (any_inactive_) {
for (int i = 0; i < int(chans_.size()); ) {
if (chans_[i].state == CHAN_INACTIVE) {
chans_[i] = chans_.back();
chans_.pop_back();
} else {
i += 1;
} }
} }
any_inactive_ = false;
}
}
void handle_released_channels() {
for (ChanInfo &chan : chans_) {
if (driven_->drv_get_channel_released(chan.chid)) {
close_channel(chan, "");
}
}
cleanup_channels();
} }
void handle_new_outgoing_sockets() { void handle_new_outgoing_sockets() {
std::set<int> chans; std::set<int> chans;
driven_->drv_get_new_outgoing(chans); driven_->drv_get_new_outgoing(chans);
for (int chid : chans) { for (int chid : chans) {
assert(channels_[chid].state == CHAN_INACTIVE);
std::string err; std::string err;
SOCKET sock = open_connection(driven_->drv_get_target(chid), err); SOCKET sock = open_connection(driven_->drv_get_target(chid), err);
if (sock == INVALID_SOCKET) { if (sock == INVALID_SOCKET) {
driven_->drv_notify_close(chid, err); driven_->drv_notify_close(chid, err);
short_sleep_ = true; short_sleep_ = true;
} else { } else {
channels_[chid].state = CHAN_CONNECTING; ChanInfo newchan;
channels_[chid].socket = sock; newchan.chid = chid;
newchan.state = CHAN_CONNECTING;
newchan.socket = sock;
chans_.push_back(newchan);
} }
} }
} }
void handle_console_output() { void handle_console_output() {
int nbytes; const char *bytes;
while (true) { while (true) {
int nbytes; const char *bytes;
driven_->drv_peek_outgoing(0, &nbytes, &bytes); driven_->drv_peek_outgoing(0, &nbytes, &bytes);
if (nbytes == 0) break; if (nbytes == 0) break;
int nwrote = write(1, bytes, nbytes); int nwrote = write(1, bytes, nbytes);
@@ -235,51 +279,25 @@ public:
} }
void handle_console_input() { void handle_console_input() {
char buffer[32]; char buffer[256];
while (true) { while (true) {
int nread = read(0, buffer, 32); int nread = read(0, buffer, 256);
if (nread == 0) break; if (nread == 0) break;
assert(nread > 0); assert(nread > 0);
driven_->drv_recv_incoming(0, nread, buffer); driven_->drv_recv_incoming(0, nread, buffer);
} }
} }
void handle_clock() {
driven_->drv_set_clock(monoclock.get());
}
void close_channel(int chid, const std::string err) {
assert(channels_[chid].state != CHAN_INACTIVE);
assert(close(channels_[chid].socket) == 0);
driven_->drv_notify_close(chid, err);
channels_[chid].state = CHAN_INACTIVE;
channels_[chid].socket = INVALID_SOCKET;
short_sleep_ = true;
}
void accept_connections(int port, SOCKET sock) { void accept_connections(int port, SOCKET sock) {
while (true) { SocketVector sockets = accept_on_socket(sock);
SOCKET chsock = accept(sock, nullptr, nullptr); for (SOCKET sock : sockets) {
if (chsock > 0) {
int chid = driven_->drv_notify_accept(port); int chid = driven_->drv_notify_accept(port);
assert(channels_[chid].state == CHAN_INACTIVE); ChanInfo newchan;
channels_[chid].state = CHAN_OPEN; newchan.chid = chid;
channels_[chid].socket = chsock; newchan.state = CHAN_OPEN;
newchan.socket = sock;
chans_.push_back(newchan);
short_sleep_ = true; short_sleep_ = true;
continue;
}
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
return;
}
if (errno == ECONNABORTED) {
// The remote disconnected before we had a chance to accept.
// Just pretend it never happened.
continue;
}
// If a listening port fails in a non-transient way,
// we don't really have any good way of handling
// that.
assert(false);
} }
} }
@@ -293,12 +311,12 @@ public:
FD_SET(p.second, &efds); FD_SET(p.second, &efds);
if (p.second > largest) largest = p.second; if (p.second > largest) largest = p.second;
} }
for (int chid = 1; chid < MAX_CHAN; chid++) { for (const ChanInfo &chan : chans_) {
SOCKET sock = channels_[chid].socket; SOCKET sock = chan.socket;
if (sock == INVALID_SOCKET) continue; if (sock == INVALID_SOCKET) continue;
FD_SET(sock, &rfds); FD_SET(sock, &rfds);
FD_SET(sock, &efds); FD_SET(sock, &efds);
if (!driven_->drv_outgoing_empty(chid)) { if (!driven_->drv_outgoing_empty(chan.chid)) {
FD_SET(sock, &wfds); FD_SET(sock, &wfds);
} }
if (sock > largest) largest = sock; if (sock > largest) largest = sock;
@@ -322,34 +340,35 @@ public:
} }
} }
for (int chid = 1; chid < MAX_CHAN; chid++) { for (ChanInfo &chan : chans_) {
ChanInfo *chan = channels_ + chid; SOCKET sock = chan.socket;
SOCKET sock = chan->socket;
if (sock == INVALID_SOCKET) continue; if (sock == INVALID_SOCKET) continue;
if (FD_ISSET(sock, &wfds)) { if (FD_ISSET(sock, &wfds)) {
chan->state = CHAN_OPEN; chan.state = CHAN_OPEN;
driven_->drv_peek_outgoing(chid, &nbytes, &bytes); driven_->drv_peek_outgoing(chan.chid, &nbytes, &bytes);
if (nbytes > 0) { if (nbytes > 0) {
int wbytes = send(sock, bytes, nbytes, 0); int wbytes = send(sock, bytes, nbytes, 0);
if (wbytes < 0) { if (wbytes < 0) {
close_channel(chid, "send failure"); close_channel(chan, "send failure");
continue; continue;
} else { } else {
driven_->drv_sent_outgoing(chid, wbytes); driven_->drv_sent_outgoing(chan.chid, wbytes);
} }
} }
} }
if (FD_ISSET(sock, &rfds) || FD_ISSET(sock, &efds)) { if (FD_ISSET(sock, &rfds) || FD_ISSET(sock, &efds)) {
// Someday, find a way to avoid this copy.
int nrecv = recv(sock, chbuf.get(), 65536, 0); int nrecv = recv(sock, chbuf.get(), 65536, 0);
if (nrecv <= 0) { if (nrecv <= 0) {
close_channel(chid, "recv failure"); close_channel(chan, "recv failure");
continue; continue;
} else { } else {
driven_->drv_recv_incoming(chid, nrecv, chbuf.get()); driven_->drv_recv_incoming(chan.chid, nrecv, chbuf.get());
short_sleep_ = true; short_sleep_ = true;
} }
} }
} }
cleanup_channels();
} }
void drive(DrivenEngine *de, int argc, char *argv[]) { void drive(DrivenEngine *de, int argc, char *argv[]) {
@@ -362,14 +381,14 @@ public:
while (!de->drv_get_stop_driver()) { while (!de->drv_get_stop_driver()) {
short_sleep_ = false; short_sleep_ = false;
handle_lua_source(); handle_lua_source();
handle_new_closed_sockets();
handle_new_outgoing_sockets();
handle_console_output(); handle_console_output();
handle_console_input(); handle_console_input();
handle_console_output(); handle_console_output();
handle_released_channels();
handle_new_outgoing_sockets();
int mstimeout = short_sleep_ ? 0 : 100; int mstimeout = short_sleep_ ? 0 : 100;
handle_socket_input_output(mstimeout); handle_socket_input_output(mstimeout);
handle_clock(); driven_->drv_set_clock(monoclock.get());
de->drv_invoke_event_update(); de->drv_invoke_event_update();
} }
DrivenEngine::set(nullptr); DrivenEngine::set(nullptr);