Code for logging and replay (doesn't work because of nondet in lua)

This commit is contained in:
2022-03-11 18:35:51 -05:00
parent cfeeb2eaf3
commit 377f913aa7
12 changed files with 698 additions and 79 deletions

View File

@@ -2,9 +2,6 @@
#define CHBUF_SIZE (256*1024)
#define POLLVEC_SIZE (DrivenEngine::MAX_CHAN+1)
static std::unique_ptr<char[]> chbuf;
static std::unique_ptr<struct pollfd[]> pollvec;
static MonoClock monoclock;
namespace util {
@@ -13,11 +10,6 @@ namespace util {
}
}
static void allocate_buffers() {
chbuf.reset(new char[CHBUF_SIZE]);
pollvec.reset(new struct pollfd[POLLVEC_SIZE]);
}
static void if_error_print_and_exit(const std::string &str) {
if (!str.empty()) {
std::cerr << std::endl << "error: " << str << std::endl;
@@ -120,10 +112,12 @@ public:
int last_write_nbytes;
};
UniqueDrivenEngine driven_;
std::vector<ChanInfo> chans_;
std::map<int, SOCKET> listen_sockets_;
bool read_console_recently_;
std::unique_ptr<char[]> chbuf_;
std::unique_ptr<struct pollfd[]> pollvec_;
drv::ReplayRecorder recorder_;
SSL_CTX *ssl_ctx_with_root_certs_;
SSL_CTX *ssl_ctx_with_server_certs_;
@@ -131,7 +125,7 @@ public:
void handle_listen_ports() {
const auto &listenports = driven_->drv_get_listen_ports();
const auto &listenports = recorder_.drv_get_listen_ports();
for (int port : listenports) {
if (listen_sockets_.find(port) == listen_sockets_.end()) {
std::string err;
@@ -144,17 +138,17 @@ public:
}
void handle_lua_source() {
if (driven_->drv_get_rescan_lua_source()) {
if (recorder_.drv_get_rescan_lua_source()) {
std::string err;
std::string_view ctrl = read_file("lua/control.lst", chbuf.get(), CHBUF_SIZE, err);
std::string_view ctrl = read_file("lua/control.lst", chbuf_.get(), CHBUF_SIZE, err);
if_error_print_and_exit(err);
std::vector<std::string> names = drv::parse_control_lst(ctrl);
driven_->drv_clear_lua_source();
recorder_.drv_clear_lua_source();
for (const std::string &str : names) {
std::string lfn = std::string("lua/") + str;
std::string_view data = read_file(lfn.c_str(), chbuf.get(), CHBUF_SIZE, err);
std::string_view data = read_file(lfn.c_str(), chbuf_.get(), CHBUF_SIZE, err);
if_error_print_and_exit(err);
driven_->drv_add_lua_source(str, data);
recorder_.drv_add_lua_source(str, data);
}
}
}
@@ -172,7 +166,7 @@ public:
assert(socket_close(chan.socket) == 0);
chan.socket = INVALID_SOCKET;
// Close everything else.
driven_->drv_notify_close(chan.chid, err);
recorder_.drv_notify_close(chan.chid, err);
chan.state = CHAN_INACTIVE;
chan.chid = -1;
chan.nbytes = 0;
@@ -199,11 +193,11 @@ public:
void handle_console_output() {
while (true) {
std::string_view s = driven_->drv_peek_outgoing(0);
std::string_view s = recorder_.drv_peek_outgoing(0);
if (s.size() == 0) break;
int nwrote = console_write(s.data(), s.size());
if (nwrote <= 0) break;
driven_->drv_sent_outgoing(0, nwrote);
recorder_.drv_sent_outgoing(0, nwrote);
}
}
@@ -214,7 +208,7 @@ public:
int nread = console_read(buffer, 256);
if (nread <= 0) break;
read_console_recently_ = true;
driven_->drv_recv_incoming(0, std::string_view(buffer, nread));
recorder_.drv_recv_incoming(0, std::string_view(buffer, nread));
}
}
@@ -240,19 +234,19 @@ public:
}
void handle_new_outgoing_sockets() {
const auto &chans = driven_->drv_get_new_outgoing();
const auto &chans = recorder_.drv_get_new_outgoing();
for (int chid : chans) {
std::string err;
SOCKET sock = open_connection(driven_->drv_get_target(chid), err);
SOCKET sock = open_connection(recorder_.drv_get_target(chid), err);
if (sock == INVALID_SOCKET) {
driven_->drv_notify_close(chid, err);
recorder_.drv_notify_close(chid, err);
} else {
//std::cerr << "Opening channel " << chid << std::endl;
make_channel(sock, chid, ssl_ctx_with_no_certs_, CHAN_SSL_CONNECTING);
}
}
if (!chans.empty()) {
driven_->drv_clear_new_outgoing();
recorder_.drv_clear_new_outgoing();
}
}
@@ -261,7 +255,7 @@ public:
SOCKET socket = accept_on_socket(sock, err);
if_error_print_and_exit(err);
if (socket != INVALID_SOCKET) {
int chid = driven_->drv_notify_accept(port);
int chid = recorder_.drv_notify_accept(port);
// std::cerr << "Accepted channel " << chid << std::endl;
make_channel(socket, chid, ssl_ctx_with_server_certs_, CHAN_SSL_ACCEPTING);
}
@@ -278,7 +272,7 @@ public:
}
// Try to write plaintext to the channel.
std::string_view s = driven_->drv_peek_outgoing(chan.chid);
std::string_view s = recorder_.drv_peek_outgoing(chan.chid);
if (s.size() > 0) {
int sbytes = s.size();
if (sbytes > 65536) sbytes = 65536;
@@ -286,17 +280,17 @@ public:
if (wbytes < 0) {
close_channel(chan, err);
} else {
driven_->drv_sent_outgoing(chan.chid, wbytes);
recorder_.drv_sent_outgoing(chan.chid, wbytes);
}
}
// Try to read plaintext from the channel.
// Someday, find a way to avoid this copy.
int nrecv = socket_recv(chan.socket, chbuf.get(), 65536, err);
int nrecv = socket_recv(chan.socket, chbuf_.get(), 65536, err);
if (nrecv < 0) {
close_channel(chan, err);
} else {
driven_->drv_recv_incoming(chan.chid, std::string_view(chbuf.get(), nrecv));
recorder_.drv_recv_incoming(chan.chid, std::string_view(chbuf_.get(), nrecv));
}
// Update the ready-flags for next time.
@@ -344,9 +338,9 @@ public:
void advance_ssl_readwrite(ChanInfo &chan) {
// std::cerr << "In advance_ssl_readwrite" << std::endl;
// Try to read data.
int read_result = SSL_read(chan.ssl, chbuf.get(), 65536);
int read_result = SSL_read(chan.ssl, chbuf_.get(), 65536);
if (read_result > 0) {
driven_->drv_recv_incoming(chan.chid, std::string_view(chbuf.get(), read_result));
recorder_.drv_recv_incoming(chan.chid, std::string_view(chbuf_.get(), read_result));
chan.ready_now = true;
} else {
process_ssl_error(chan, read_result);
@@ -365,7 +359,7 @@ public:
if (wbytes > 0) {
int write_result = SSL_write(chan.ssl, chan.bytes, wbytes);
if (write_result > 0) {
driven_->drv_sent_outgoing(chan.chid, write_result);
recorder_.drv_sent_outgoing(chan.chid, write_result);
chan.last_write_nbytes = 0;
chan.ready_on_outgoing = true;
} else {
@@ -407,12 +401,12 @@ public:
// Peek output buffers and determine channel release flags.
for (ChanInfo &chan : chans_) {
std::string_view s = driven_->drv_peek_outgoing(chan.chid);
std::string_view s = recorder_.drv_peek_outgoing(chan.chid);
chan.nbytes = s.size();
chan.bytes = s.data();
chan.just_released = false;
if ((chan.nbytes == 0)&&(!chan.released)) {
chan.released = driven_->drv_get_channel_released(chan.chid);
chan.released = recorder_.drv_get_channel_released(chan.chid);
chan.just_released = chan.released;
}
}
@@ -420,13 +414,13 @@ public:
// Construct the struct pollfd vector.
int pollsize = 0;
for (const auto &p : listen_sockets_) {
struct pollfd &pfd = pollvec[pollsize++];
struct pollfd &pfd = pollvec_[pollsize++];
pfd.fd = p.second;
pfd.events = POLLIN;
pfd.revents = 0;
}
for (const ChanInfo &chan : chans_) {
struct pollfd &pfd = pollvec[pollsize++];
struct pollfd &pfd = pollvec_[pollsize++];
assert(chan.socket != INVALID_SOCKET);
pfd.fd = chan.socket;
pfd.events = 0;
@@ -440,13 +434,13 @@ public:
}
// Do the poll.
socket_poll(pollvec.get(), pollsize, mstimeout, err);
socket_poll(pollvec_.get(), pollsize, mstimeout, err);
if_error_print_and_exit(err);
// Check listening sockets.
int index = 0;
for (auto &p : listen_sockets_) {
struct pollfd &pfd = pollvec[index++];
struct pollfd &pfd = pollvec_[index++];
if (pfd.revents & (POLLIN | POLLERR)) {
accept_connection(p.first, p.second);
}
@@ -454,7 +448,7 @@ public:
// Advance channels where possible.
for (ChanInfo &chan : chans_) {
struct pollfd &pfd = pollvec[index++];
struct pollfd &pfd = pollvec_[index++];
bool pollin = ((pfd.revents & POLLIN) != 0);
bool pollout = ((pfd.revents & POLLOUT) != 0);
bool pollerr = ((pfd.revents & (POLLERR | POLLHUP)) != 0);
@@ -476,46 +470,94 @@ public:
cleanup_channels();
}
void drive(int argc, char *argv[]) {
if (argc < 2) {
DrivenEngine::print_usage(std::cerr, argv[0]);
int replay_logfile(const char *fn) {
drv::ReplayPlayer player;
player.open_logfile(fn);
while (true) {
drv::ReplayPlayer::Error err = player.step();
if (err != drv::ReplayPlayer::ERR_NONE) {
player.print_error(std::cerr);
return (err != drv::ReplayPlayer::ERR_LOGFILE_EOF) ? 1 : 0;
}
}
}
int drive(int argc, char *argv[]) {
// Remove the program name from argv.
if (argc < 1) {
DrivenEngine::print_usage(std::cerr, "<unknown>");
exit(1);
}
driven_ = DrivenEngine::make(argv[1]);
if (driven_ == nullptr) {
DrivenEngine::print_usage(std::cerr, argv[0]);
exit(1);
std::string program = argv[0];
argc -= 1; argv += 1;
// If argv contains "replay <filename>", do a replay,
// and then skip everything else.
if ((argc >= 1) && (strcmp(argv[0], "replay") == 0)) {
if (argc != 2) {
std::cerr << "usage: " << program << " replay <filename>" << std::endl;
exit(1);
}
return replay_logfile(argv[1]);
}
// If argv contains "record <filename>", start recording,
// and remove the "record <filename>" from argv.
if ((argc >= 1) && (strcmp(argv[0], "record") == 0)) {
if (argc < 2) {
DrivenEngine::print_usage(std::cerr, program);
return 1;
}
bool ok = recorder_.open_logfile(argv[1]);
if (!ok) {
std::cerr << "Could not open logfile: " << argv[1] << std::endl;
return 1;
}
argc -= 2; argv += 2;
}
// Create the engine.
if (argc < 1) {
DrivenEngine::print_usage(std::cerr, program);
return 1;
}
bool engine_made = recorder_.create_engine(argv[0]);
if (!engine_made) {
DrivenEngine::print_usage(std::cerr, program);
return 1;
}
DrivenEngine::set(driven_.get());
read_console_recently_ = false;
chbuf_.reset(new char[CHBUF_SIZE]);
pollvec_.reset(new struct pollfd[POLLVEC_SIZE]);
ssl_ctx_with_root_certs_ = new_ssl_context(false, true, "");
ssl_ctx_with_server_certs_ = new_ssl_context(true, false, "");
ssl_ctx_with_no_certs_ = new_ssl_context(false, false, "");
if (ssl_ctx_use_certificate_str(ssl_ctx_with_server_certs_, dummycert::certificate) <= 0) {
ERR_print_errors_fp(stderr);
exit(1);
return 1;
}
if (ssl_ctx_use_privatekey_str(ssl_ctx_with_server_certs_, dummycert::privatekey) <= 0 ) {
ERR_print_errors_fp(stderr);
exit(1);
return 1;
}
handle_lua_source();
driven_->drv_invoke_event_init(argc, argv);
recorder_.drv_invoke_event_init(argc, argv);
handle_listen_ports();
while (!driven_->drv_get_stop_driver()) {
while (!recorder_.drv_get_stop_driver()) {
handle_lua_source();
handle_console_output();
handle_new_outgoing_sockets();
handle_socket_input_output();
handle_console_input();
handle_console_output();
driven_->drv_invoke_event_update(monoclock.get());
recorder_.drv_invoke_event_update(monoclock.get());
}
for (ChanInfo &chan : chans_) {
@@ -525,6 +567,7 @@ public:
SSL_CTX_free(ssl_ctx_with_root_certs_);
SSL_CTX_free(ssl_ctx_with_server_certs_);
DrivenEngine::set(nullptr);
return 0;
}
};