#include "driver.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using SOCKET=int; const int INVALID_SOCKET = -1; struct termios orig_termios; void set_nonblocking(int fd) { int flags = fcntl(fd, F_GETFL, 0); assert(flags != -1); int status = fcntl(fd, F_SETFL, flags | O_NONBLOCK); assert(status != -1); } static void disableRawMode() { tcsetattr(0, TCSAFLUSH, &orig_termios); } static void enableRawMode() { int status = tcgetattr(0, &orig_termios); assert(status >= 0); atexit(disableRawMode); struct termios raw = orig_termios; raw.c_iflag &= ~(BRKINT | ICRNL | INPCK | ISTRIP | IXON); raw.c_lflag &= ~(ECHO | ICANON); raw.c_oflag |= OPOST; raw.c_cc[VMIN] = 0; raw.c_cc[VTIME] = 0; status = tcsetattr(0, TCSAFLUSH, &raw); assert(status >= 0); } SOCKET open_connection(const std::string &target, std::string &err) { struct addrinfo *addrs = nullptr; struct addrinfo *goodaddr = nullptr; struct addrinfo hints; SOCKET sock = INVALID_SOCKET; std::string host, port; char errbuf[1024]; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = IPPROTO_TCP; hints.ai_flags = AI_NUMERICSERV; err = ""; util::split_host_port(target, host, port); int status = getaddrinfo(host.c_str(), port.c_str(), &hints, &addrs); if (status != 0) { err = gai_strerror(status); goto error; } if (addrs == nullptr) { err = "no such host found"; goto error; } goodaddr = addrs; assert(goodaddr->ai_family == AF_INET); assert(goodaddr->ai_socktype == SOCK_STREAM); assert(goodaddr->ai_protocol == IPPROTO_TCP); sock = socket(goodaddr->ai_family, goodaddr->ai_socktype, goodaddr->ai_protocol); assert(sock > 0); set_nonblocking(sock); status = connect(sock, goodaddr->ai_addr, goodaddr->ai_addrlen); if ((status != 0) && (errno != EINPROGRESS)) { goto error_errno; } freeaddrinfo(addrs); return sock; error_errno: err = strerror_r(errno, errbuf, 1024); error: if (sock != INVALID_SOCKET) close(sock); if (addrs != nullptr) freeaddrinfo(addrs); return INVALID_SOCKET; } SOCKET listen_on_port(int port, std::string &err) { int status; err = ""; SOCKET sock = socket(AF_INET, SOCK_STREAM, 0); assert(sock > 0); int enable = 1; status = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)); assert(status == 0); struct sockaddr_in server; server.sin_family = AF_INET; server.sin_addr.s_addr = INADDR_ANY; server.sin_port = htons(port); status = bind(sock, (struct sockaddr *)&server, sizeof(server)); assert(status == 0); status = listen(sock, 10); assert(status == 0); set_nonblocking(sock); return sock; } class Driver { public: enum ChanState { CHAN_INACTIVE, CHAN_CONNECTING, CHAN_OPEN, }; struct ChanInfo { ChanState state; SOCKET socket; }; static const int MAX_CHAN = DrivenEngine::MAX_CHAN; DrivenEngine *driven_; ChanInfo channels_[MAX_CHAN]; bool short_sleep_; std::map listen_sockets_; std::unique_ptr chbuf; int64_t basetime_; int64_t get_now() { struct timeval tv; gettimeofday(&tv, nullptr); int64_t tv_sec = tv.tv_sec; int64_t tv_usec = tv.tv_usec; return tv_sec * 1000000 + tv_usec; } void init(DrivenEngine *de) { driven_ = de; for (int i = 0; i < MAX_CHAN; i++) { channels_[i].state = CHAN_INACTIVE; channels_[i].socket = INVALID_SOCKET; } short_sleep_ = false; chbuf.reset(new char[65536]); } void handle_listen_ports() { std::set listenports; driven_->drv_get_listen_ports(listenports); for (int port : listenports) { if (listen_sockets_.find(port) == listen_sockets_.end()) { std::string err; SOCKET sock = listen_on_port(port, err); if (sock != INVALID_SOCKET) { listen_sockets_[port] = sock; } } } } void handle_lua_source() { if (driven_->drv_get_rescan_lua_source()) { driven_->drv_set_lua_source(util::read_lua_source("lua")); short_sleep_ = true; } } void handle_new_closed_sockets() { for (int chid = 1; chid < MAX_CHAN; chid++) { if (driven_->drv_get_channel_released(chid)) { if (channels_[chid].state != CHAN_INACTIVE) { assert(close(channels_[chid].socket) == 0); channels_[chid].state = CHAN_INACTIVE; channels_[chid].socket = INVALID_SOCKET; } driven_->drv_notify_close(chid, ""); } } } void handle_new_outgoing_sockets() { std::set chans; driven_->drv_get_new_outgoing(chans); for (int chid : chans) { assert(channels_[chid].state == CHAN_INACTIVE); std::string err; SOCKET sock = open_connection(driven_->drv_get_target(chid), err); if (sock == INVALID_SOCKET) { driven_->drv_notify_close(chid, err); short_sleep_ = true; } else { channels_[chid].state = CHAN_CONNECTING; channels_[chid].socket = sock; } } } void handle_console_output() { int nbytes; const char *bytes; while (true) { driven_->drv_peek_outgoing(0, &nbytes, &bytes); if (nbytes == 0) break; int nwrote = write(1, bytes, nbytes); assert(nwrote > 0); driven_->drv_sent_outgoing(0, nwrote); } } void handle_console_input() { char buffer[32]; while (true) { int nread = read(0, buffer, 32); if (nread == 0) break; assert(nread > 0); driven_->drv_recv_incoming(0, nread, buffer); } } void handle_clock() { int64_t now = get_now() - basetime_; driven_->drv_set_clock(double(now) / 1000000.0); } void close_channel(int chid, const std::string err) { assert(channels_[chid].state != CHAN_INACTIVE); assert(close(channels_[chid].socket) == 0); driven_->drv_notify_close(chid, err); channels_[chid].state = CHAN_INACTIVE; channels_[chid].socket = INVALID_SOCKET; short_sleep_ = true; } void accept_connections(int port, SOCKET sock) { while (true) { SOCKET chsock = accept(sock, nullptr, nullptr); if (chsock > 0) { int chid = driven_->drv_notify_accept(port); assert(channels_[chid].state == CHAN_INACTIVE); channels_[chid].state = CHAN_OPEN; channels_[chid].socket = chsock; short_sleep_ = true; continue; } if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { return; } if (errno == ECONNABORTED) { // The remote disconnected before we had a chance to accept. // Just pretend it never happened. continue; } // If a listening port fails in a non-transient way, // we don't really have any good way of handling // that. assert(false); } } int calc_select_sets(fd_set &rfds, fd_set &wfds, fd_set &efds) const { FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds); int largest = -1; for (const auto &p : listen_sockets_) { FD_SET(p.second, &rfds); FD_SET(p.second, &efds); if (p.second > largest) largest = p.second; } for (int chid = 1; chid < MAX_CHAN; chid++) { SOCKET sock = channels_[chid].socket; if (sock == INVALID_SOCKET) continue; FD_SET(sock, &rfds); FD_SET(sock, &efds); if (!driven_->drv_outgoing_empty(chid)) { FD_SET(sock, &wfds); } if (sock > largest) largest = sock; } return largest + 1; } void handle_socket_input_output(int mstimeout) { fd_set rfds, wfds, efds; int nbytes; const char *bytes; int nfds = calc_select_sets(rfds, wfds, efds); struct timeval timeout; timeout.tv_sec = mstimeout / 1000; timeout.tv_usec = (mstimeout - (timeout.tv_sec*1000)) * 1000; int status = select(nfds, &rfds, &wfds, &efds, &timeout); assert(status >= 0); for (auto &p : listen_sockets_) { if (FD_ISSET(p.second, &rfds) || FD_ISSET(p.second, &efds)) { accept_connections(p.first, p.second); } } for (int chid = 1; chid < MAX_CHAN; chid++) { ChanInfo *chan = channels_ + chid; SOCKET sock = chan->socket; if (sock == INVALID_SOCKET) continue; if (FD_ISSET(sock, &wfds)) { chan->state = CHAN_OPEN; driven_->drv_peek_outgoing(chid, &nbytes, &bytes); if (nbytes > 0) { int wbytes = send(sock, bytes, nbytes, 0); if (wbytes < 0) { close_channel(chid, "send failure"); continue; } else { driven_->drv_sent_outgoing(chid, wbytes); } } } if (FD_ISSET(sock, &rfds) || FD_ISSET(sock, &efds)) { int nrecv = recv(sock, chbuf.get(), 65536, 0); if (nrecv <= 0) { close_channel(chid, "recv failure"); continue; } else { driven_->drv_recv_incoming(chid, nrecv, chbuf.get()); short_sleep_ = true; } } } } void drive(DrivenEngine *de, int argc, char *argv[]) { enableRawMode(); init(de); DrivenEngine::set(de); basetime_ = get_now(); driven_->drv_set_lua_source(util::read_lua_source("lua")); driven_->drv_invoke_event_init(argc, argv); handle_listen_ports(); while (!de->drv_get_stop_driver()) { short_sleep_ = false; handle_lua_source(); handle_new_closed_sockets(); handle_new_outgoing_sockets(); handle_console_output(); handle_console_input(); handle_console_output(); int mstimeout = short_sleep_ ? 0 : 100; handle_socket_input_output(mstimeout); handle_clock(); de->drv_invoke_event_update(); } DrivenEngine::set(nullptr); } }; void driver_drive(DrivenEngine *de, int argc, char *argv[]) { Driver driver; driver.drive(de, argc, argv); }