diff --git a/luprex/core/cpp/drivenengine.cpp b/luprex/core/cpp/drivenengine.cpp index 1fdde0e4..1aa30f98 100644 --- a/luprex/core/cpp/drivenengine.cpp +++ b/luprex/core/cpp/drivenengine.cpp @@ -16,17 +16,18 @@ DrivenEngineReg::DrivenEngineReg(const char *n, DrivenEngineMaker fn) { All = this; } -void DrivenEngine::print_usage(std::ostream &strm, const char *progname) { +void DrivenEngine::print_usage(std::ostream &strm, std::string_view progname) { strm << "Usage: " << progname << " " << std::endl; for (auto reg = DrivenEngineReg::All; reg != nullptr; reg=reg->next) { strm << " Mode can be: " << reg->name << std::endl; } } -UniqueDrivenEngine DrivenEngine::make(const char *kind) { +UniqueDrivenEngine DrivenEngine::make(std::string_view kind) { for (auto reg = DrivenEngineReg::All; reg != nullptr; reg=reg->next) { - if (strcmp(reg->name, kind) == 0) { - return reg->maker(); + if (kind == std::string_view(reg->name)) { + UniqueDrivenEngine result = reg->maker(); + return result; } } return nullptr; diff --git a/luprex/core/cpp/drivenengine.hpp b/luprex/core/cpp/drivenengine.hpp index 9fd72361..5942913c 100644 --- a/luprex/core/cpp/drivenengine.hpp +++ b/luprex/core/cpp/drivenengine.hpp @@ -198,9 +198,9 @@ public: // ////////////////////////////////////////////////////////////// - static UniqueDrivenEngine make(const char *name); + static UniqueDrivenEngine make(std::string_view name); - static void print_usage(std::ostream &strm, const char *progname); + static void print_usage(std::ostream &strm, std::string_view progname); ////////////////////////////////////////////////////////////// // diff --git a/luprex/core/cpp/driver-common.cpp b/luprex/core/cpp/driver-common.cpp index c1160926..f642e851 100644 --- a/luprex/core/cpp/driver-common.cpp +++ b/luprex/core/cpp/driver-common.cpp @@ -2,9 +2,6 @@ #define CHBUF_SIZE (256*1024) #define POLLVEC_SIZE (DrivenEngine::MAX_CHAN+1) -static std::unique_ptr chbuf; -static std::unique_ptr pollvec; - static MonoClock monoclock; namespace util { @@ -13,11 +10,6 @@ namespace util { } } -static void allocate_buffers() { - chbuf.reset(new char[CHBUF_SIZE]); - pollvec.reset(new struct pollfd[POLLVEC_SIZE]); -} - static void if_error_print_and_exit(const std::string &str) { if (!str.empty()) { std::cerr << std::endl << "error: " << str << std::endl; @@ -120,10 +112,12 @@ public: int last_write_nbytes; }; - UniqueDrivenEngine driven_; std::vector chans_; std::map listen_sockets_; bool read_console_recently_; + std::unique_ptr chbuf_; + std::unique_ptr pollvec_; + drv::ReplayRecorder recorder_; SSL_CTX *ssl_ctx_with_root_certs_; SSL_CTX *ssl_ctx_with_server_certs_; @@ -131,7 +125,7 @@ public: void handle_listen_ports() { - const auto &listenports = driven_->drv_get_listen_ports(); + const auto &listenports = recorder_.drv_get_listen_ports(); for (int port : listenports) { if (listen_sockets_.find(port) == listen_sockets_.end()) { std::string err; @@ -144,17 +138,17 @@ public: } void handle_lua_source() { - if (driven_->drv_get_rescan_lua_source()) { + if (recorder_.drv_get_rescan_lua_source()) { std::string err; - std::string_view ctrl = read_file("lua/control.lst", chbuf.get(), CHBUF_SIZE, err); + std::string_view ctrl = read_file("lua/control.lst", chbuf_.get(), CHBUF_SIZE, err); if_error_print_and_exit(err); std::vector names = drv::parse_control_lst(ctrl); - driven_->drv_clear_lua_source(); + recorder_.drv_clear_lua_source(); for (const std::string &str : names) { std::string lfn = std::string("lua/") + str; - std::string_view data = read_file(lfn.c_str(), chbuf.get(), CHBUF_SIZE, err); + std::string_view data = read_file(lfn.c_str(), chbuf_.get(), CHBUF_SIZE, err); if_error_print_and_exit(err); - driven_->drv_add_lua_source(str, data); + recorder_.drv_add_lua_source(str, data); } } } @@ -172,7 +166,7 @@ public: assert(socket_close(chan.socket) == 0); chan.socket = INVALID_SOCKET; // Close everything else. - driven_->drv_notify_close(chan.chid, err); + recorder_.drv_notify_close(chan.chid, err); chan.state = CHAN_INACTIVE; chan.chid = -1; chan.nbytes = 0; @@ -199,11 +193,11 @@ public: void handle_console_output() { while (true) { - std::string_view s = driven_->drv_peek_outgoing(0); + std::string_view s = recorder_.drv_peek_outgoing(0); if (s.size() == 0) break; int nwrote = console_write(s.data(), s.size()); if (nwrote <= 0) break; - driven_->drv_sent_outgoing(0, nwrote); + recorder_.drv_sent_outgoing(0, nwrote); } } @@ -214,7 +208,7 @@ public: int nread = console_read(buffer, 256); if (nread <= 0) break; read_console_recently_ = true; - driven_->drv_recv_incoming(0, std::string_view(buffer, nread)); + recorder_.drv_recv_incoming(0, std::string_view(buffer, nread)); } } @@ -240,19 +234,19 @@ public: } void handle_new_outgoing_sockets() { - const auto &chans = driven_->drv_get_new_outgoing(); + const auto &chans = recorder_.drv_get_new_outgoing(); for (int chid : chans) { std::string err; - SOCKET sock = open_connection(driven_->drv_get_target(chid), err); + SOCKET sock = open_connection(recorder_.drv_get_target(chid), err); if (sock == INVALID_SOCKET) { - driven_->drv_notify_close(chid, err); + recorder_.drv_notify_close(chid, err); } else { //std::cerr << "Opening channel " << chid << std::endl; make_channel(sock, chid, ssl_ctx_with_no_certs_, CHAN_SSL_CONNECTING); } } if (!chans.empty()) { - driven_->drv_clear_new_outgoing(); + recorder_.drv_clear_new_outgoing(); } } @@ -261,7 +255,7 @@ public: SOCKET socket = accept_on_socket(sock, err); if_error_print_and_exit(err); if (socket != INVALID_SOCKET) { - int chid = driven_->drv_notify_accept(port); + int chid = recorder_.drv_notify_accept(port); // std::cerr << "Accepted channel " << chid << std::endl; make_channel(socket, chid, ssl_ctx_with_server_certs_, CHAN_SSL_ACCEPTING); } @@ -278,7 +272,7 @@ public: } // Try to write plaintext to the channel. - std::string_view s = driven_->drv_peek_outgoing(chan.chid); + std::string_view s = recorder_.drv_peek_outgoing(chan.chid); if (s.size() > 0) { int sbytes = s.size(); if (sbytes > 65536) sbytes = 65536; @@ -286,17 +280,17 @@ public: if (wbytes < 0) { close_channel(chan, err); } else { - driven_->drv_sent_outgoing(chan.chid, wbytes); + recorder_.drv_sent_outgoing(chan.chid, wbytes); } } // Try to read plaintext from the channel. // Someday, find a way to avoid this copy. - int nrecv = socket_recv(chan.socket, chbuf.get(), 65536, err); + int nrecv = socket_recv(chan.socket, chbuf_.get(), 65536, err); if (nrecv < 0) { close_channel(chan, err); } else { - driven_->drv_recv_incoming(chan.chid, std::string_view(chbuf.get(), nrecv)); + recorder_.drv_recv_incoming(chan.chid, std::string_view(chbuf_.get(), nrecv)); } // Update the ready-flags for next time. @@ -344,9 +338,9 @@ public: void advance_ssl_readwrite(ChanInfo &chan) { // std::cerr << "In advance_ssl_readwrite" << std::endl; // Try to read data. - int read_result = SSL_read(chan.ssl, chbuf.get(), 65536); + int read_result = SSL_read(chan.ssl, chbuf_.get(), 65536); if (read_result > 0) { - driven_->drv_recv_incoming(chan.chid, std::string_view(chbuf.get(), read_result)); + recorder_.drv_recv_incoming(chan.chid, std::string_view(chbuf_.get(), read_result)); chan.ready_now = true; } else { process_ssl_error(chan, read_result); @@ -365,7 +359,7 @@ public: if (wbytes > 0) { int write_result = SSL_write(chan.ssl, chan.bytes, wbytes); if (write_result > 0) { - driven_->drv_sent_outgoing(chan.chid, write_result); + recorder_.drv_sent_outgoing(chan.chid, write_result); chan.last_write_nbytes = 0; chan.ready_on_outgoing = true; } else { @@ -407,12 +401,12 @@ public: // Peek output buffers and determine channel release flags. for (ChanInfo &chan : chans_) { - std::string_view s = driven_->drv_peek_outgoing(chan.chid); + std::string_view s = recorder_.drv_peek_outgoing(chan.chid); chan.nbytes = s.size(); chan.bytes = s.data(); chan.just_released = false; if ((chan.nbytes == 0)&&(!chan.released)) { - chan.released = driven_->drv_get_channel_released(chan.chid); + chan.released = recorder_.drv_get_channel_released(chan.chid); chan.just_released = chan.released; } } @@ -420,13 +414,13 @@ public: // Construct the struct pollfd vector. int pollsize = 0; for (const auto &p : listen_sockets_) { - struct pollfd &pfd = pollvec[pollsize++]; + struct pollfd &pfd = pollvec_[pollsize++]; pfd.fd = p.second; pfd.events = POLLIN; pfd.revents = 0; } for (const ChanInfo &chan : chans_) { - struct pollfd &pfd = pollvec[pollsize++]; + struct pollfd &pfd = pollvec_[pollsize++]; assert(chan.socket != INVALID_SOCKET); pfd.fd = chan.socket; pfd.events = 0; @@ -440,13 +434,13 @@ public: } // Do the poll. - socket_poll(pollvec.get(), pollsize, mstimeout, err); + socket_poll(pollvec_.get(), pollsize, mstimeout, err); if_error_print_and_exit(err); // Check listening sockets. int index = 0; for (auto &p : listen_sockets_) { - struct pollfd &pfd = pollvec[index++]; + struct pollfd &pfd = pollvec_[index++]; if (pfd.revents & (POLLIN | POLLERR)) { accept_connection(p.first, p.second); } @@ -454,7 +448,7 @@ public: // Advance channels where possible. for (ChanInfo &chan : chans_) { - struct pollfd &pfd = pollvec[index++]; + struct pollfd &pfd = pollvec_[index++]; bool pollin = ((pfd.revents & POLLIN) != 0); bool pollout = ((pfd.revents & POLLOUT) != 0); bool pollerr = ((pfd.revents & (POLLERR | POLLHUP)) != 0); @@ -476,46 +470,94 @@ public: cleanup_channels(); } - void drive(int argc, char *argv[]) { - if (argc < 2) { - DrivenEngine::print_usage(std::cerr, argv[0]); + int replay_logfile(const char *fn) { + drv::ReplayPlayer player; + player.open_logfile(fn); + while (true) { + drv::ReplayPlayer::Error err = player.step(); + if (err != drv::ReplayPlayer::ERR_NONE) { + player.print_error(std::cerr); + return (err != drv::ReplayPlayer::ERR_LOGFILE_EOF) ? 1 : 0; + } + } + } + + int drive(int argc, char *argv[]) { + // Remove the program name from argv. + if (argc < 1) { + DrivenEngine::print_usage(std::cerr, ""); exit(1); } - driven_ = DrivenEngine::make(argv[1]); - if (driven_ == nullptr) { - DrivenEngine::print_usage(std::cerr, argv[0]); - exit(1); + std::string program = argv[0]; + argc -= 1; argv += 1; + + // If argv contains "replay ", do a replay, + // and then skip everything else. + if ((argc >= 1) && (strcmp(argv[0], "replay") == 0)) { + if (argc != 2) { + std::cerr << "usage: " << program << " replay " << std::endl; + exit(1); + } + return replay_logfile(argv[1]); + } + + // If argv contains "record ", start recording, + // and remove the "record " from argv. + if ((argc >= 1) && (strcmp(argv[0], "record") == 0)) { + if (argc < 2) { + DrivenEngine::print_usage(std::cerr, program); + return 1; + } + bool ok = recorder_.open_logfile(argv[1]); + if (!ok) { + std::cerr << "Could not open logfile: " << argv[1] << std::endl; + return 1; + } + argc -= 2; argv += 2; + } + + // Create the engine. + if (argc < 1) { + DrivenEngine::print_usage(std::cerr, program); + return 1; + } + bool engine_made = recorder_.create_engine(argv[0]); + if (!engine_made) { + DrivenEngine::print_usage(std::cerr, program); + return 1; } - DrivenEngine::set(driven_.get()); read_console_recently_ = false; + chbuf_.reset(new char[CHBUF_SIZE]); + pollvec_.reset(new struct pollfd[POLLVEC_SIZE]); + ssl_ctx_with_root_certs_ = new_ssl_context(false, true, ""); ssl_ctx_with_server_certs_ = new_ssl_context(true, false, ""); ssl_ctx_with_no_certs_ = new_ssl_context(false, false, ""); if (ssl_ctx_use_certificate_str(ssl_ctx_with_server_certs_, dummycert::certificate) <= 0) { ERR_print_errors_fp(stderr); - exit(1); + return 1; } if (ssl_ctx_use_privatekey_str(ssl_ctx_with_server_certs_, dummycert::privatekey) <= 0 ) { ERR_print_errors_fp(stderr); - exit(1); + return 1; } handle_lua_source(); - driven_->drv_invoke_event_init(argc, argv); + recorder_.drv_invoke_event_init(argc, argv); handle_listen_ports(); - while (!driven_->drv_get_stop_driver()) { + while (!recorder_.drv_get_stop_driver()) { handle_lua_source(); handle_console_output(); handle_new_outgoing_sockets(); handle_socket_input_output(); handle_console_input(); handle_console_output(); - driven_->drv_invoke_event_update(monoclock.get()); + recorder_.drv_invoke_event_update(monoclock.get()); } for (ChanInfo &chan : chans_) { @@ -525,6 +567,7 @@ public: SSL_CTX_free(ssl_ctx_with_root_certs_); SSL_CTX_free(ssl_ctx_with_server_certs_); DrivenEngine::set(nullptr); + return 0; } }; diff --git a/luprex/core/cpp/driver-linux.cpp b/luprex/core/cpp/driver-linux.cpp index 0e4debf2..152fab5d 100644 --- a/luprex/core/cpp/driver-linux.cpp +++ b/luprex/core/cpp/driver-linux.cpp @@ -264,11 +264,10 @@ public: int main(int argc, char **argv) { disable_randomization(argc, argv); - allocate_buffers(); enable_tty_raw(); OPENSSL_init_ssl(0, NULL); SourceDB::register_lua_builtins(); Driver driver; - driver.drive(argc, argv); + return driver.drive(argc, argv); } diff --git a/luprex/core/cpp/driver-mingw.cpp b/luprex/core/cpp/driver-mingw.cpp index 17c7d4bd..1fa476b4 100644 --- a/luprex/core/cpp/driver-mingw.cpp +++ b/luprex/core/cpp/driver-mingw.cpp @@ -269,11 +269,10 @@ public: int main(int argc, char **argv) { - allocate_buffers(); init_winsock(); OPENSSL_init_ssl(0, NULL); SourceDB::register_lua_builtins(); Driver driver; - driver.drive(argc, argv); + return driver.drive(argc, argv); } diff --git a/luprex/core/cpp/driver-util.cpp b/luprex/core/cpp/driver-util.cpp index 8bf3d8fa..dd85278e 100644 --- a/luprex/core/cpp/driver-util.cpp +++ b/luprex/core/cpp/driver-util.cpp @@ -3,11 +3,18 @@ #include "wrap-vector.hpp" #include +#include +#include +#include +#include "spookyv2.hpp" #include "driver-util.hpp" #include "luastack.hpp" #include "util.hpp" +#define RLOG_BUFSIZE (1024 * 1024) +#define MAX_ARGC 1024 + namespace drv { void split_host_port(std::string_view target, std::string &host, std::string &port) { @@ -35,6 +42,442 @@ std::vector parse_control_lst(std::string_view ctrl) { } +enum DrvAction { + CREATE_ENGINE, + DRV_CLEAR_NEW_OUTGOING, + DRV_SENT_OUTGOING, + DRV_RECV_INCOMING, + DRV_NOTIFY_CLOSE, + DRV_NOTIFY_ACCEPT, + DRV_CLEAR_LUA_SOURCE, + DRV_ADD_LUA_SOURCE, + DRV_INVOKE_EVENT_INIT, + DRV_INVOKE_EVENT_UPDATE +}; + +static const char *action_string(DrvAction act) { + switch(act) { + case CREATE_ENGINE: return "CREATE_ENGINE"; + case DRV_CLEAR_NEW_OUTGOING: return "DRV_CLEAR_NEW_OUTGOING"; + case DRV_SENT_OUTGOING: return "DRV_SENT_OUTGOING"; + case DRV_RECV_INCOMING: return "DRV_RECV_INCOMING"; + case DRV_NOTIFY_CLOSE: return "DRV_NOTIFY_CLOSE"; + case DRV_NOTIFY_ACCEPT: return "DRV_NOTIFY_ACCEPT"; + case DRV_CLEAR_LUA_SOURCE: return "DRV_CLEAR_LUA_SOURCE"; + case DRV_ADD_LUA_SOURCE: return "DRV_ADD_LUA_SOURCE"; + case DRV_INVOKE_EVENT_INIT: return "DRV_INVOKE_EVENT_INIT"; + case DRV_INVOKE_EVENT_UPDATE: return "DRV_INVOKE_EVENT_UPDATE"; + default: return "unknown"; + } +} + + +static void wlog_uint8(std::ofstream &s, uint8_t v) { + s.put((char)v); +} + +static void wlog_uint16(std::ofstream &s, uint16_t v) { + s.write((const char *)&v, 2); +} + +static void wlog_uint32(std::ofstream &s, uint32_t v) { + s.write((const char *)&v, 4); +} + +static void wlog_uint64(std::ofstream &s, uint64_t v) { + s.write((const char *)&v, 8); +} + +static void wlog_double(std::ofstream &s, double v) { + s.write((const char *)&v, 8); +} + +static void wlog_string(std::ofstream &s, std::string_view v) { + assert(v.size() < RLOG_BUFSIZE); + if (v.size() >= 255) { + wlog_uint8(s, 0xFF); + wlog_uint32(s, v.size()); + } else { + wlog_uint8(s, v.size()); + } + s.write(v.data(), v.size()); +} + +static void wlog_cmd_hash(std::ofstream &s, DrvAction act, uint32_t hash) { + std::cerr << "Logging " << action_string(act) << " " << hash << std::endl; + wlog_uint8(s, act); + wlog_uint32(s, hash); +} + +// After doing an rlog operation, you should check the stream +// for "s.good()" to find out if there was any error. +static uint8_t rlog_uint8(std::ifstream &s) { + uint8_t result; + s.read((char *)&result, 1); + if (!s.good()) return 0; + return result; +} + +static uint16_t rlog_uint16(std::ifstream &s) { + uint16_t result; + s.read((char *)&result, 2); + if (!s.good()) return 0; + return result; +} + +static uint32_t rlog_uint32(std::ifstream &s) { + uint32_t result; + s.read((char *)&result, 4); + if (!s.good()) return 0; + return result; +} + +static uint64_t rlog_uint64(std::ifstream &s) { + uint64_t result; + s.read((char *)&result, 8); + if (!s.good()) return 0; + return result; +} + +static double rlog_double(std::ifstream &s) { + double result; + s.read((char *)&result, 8); + if (!s.good()) return 0.0; + return result; +} + +std::string_view rlog_string(std::ifstream &s, char *rlog_buf) { + uint32_t len = rlog_uint8(s); + if (len == 255) { + len = rlog_uint32(s); + } + assert(len <= RLOG_BUFSIZE); + if (len > 0) s.read(rlog_buf, len); + if (!s.good()) return std::string_view(); + return std::string_view(rlog_buf, len); +} + +ReplayPlayer::ReplayPlayer() { + error_ = ERR_NONE; + buf_.reset(new char[RLOG_BUFSIZE]); +} + +void ReplayRecorder::flush() { + f_.flush(); + if (!f_.good()) { + std::cerr << "Logfile write failed, replay logging abandoned." << std::endl; + f_.close(); + f_.clear(); + logging_ = false; + } +} + +bool ReplayRecorder::open_logfile(const char *fn) { + f_.open(fn, std::ios_base::out | std::ios_base::binary | std::ios_base::trunc); + if (f_.good()) { + logging_ = true; + return true; + } else { + f_.close(); + f_.clear(); + return false; + } +} + + +ReplayPlayer::Error ReplayPlayer::step() { + if (error_ != ERR_NONE) return error_; + uint8_t code = rlog_uint8(f_); + if (f_.eof()) { + set_error(ERR_LOGFILE_EOF); + return error_; + } + int hash = rlog_uint32(f_); + if (!f_.good()) { + set_error(ERR_LOGFILE_CORRUPT); + return error_; + } + std::cerr << "Executing: " << action_string(DrvAction(code)) << " " << eng::memhash() << std::endl; + if (hash != eng::memhash()) { + set_error(ERR_NONDERMINISTIC); + return error_; + } + switch (code) { + case CREATE_ENGINE: create_engine(); break; + case DRV_CLEAR_NEW_OUTGOING: drv_clear_new_outgoing(); break; + case DRV_SENT_OUTGOING: drv_sent_outgoing(); break; + case DRV_RECV_INCOMING: drv_recv_incoming(); break; + case DRV_NOTIFY_CLOSE: drv_notify_close(); break; + case DRV_NOTIFY_ACCEPT: drv_notify_accept(); break; + case DRV_CLEAR_LUA_SOURCE: drv_clear_lua_source(); break; + case DRV_ADD_LUA_SOURCE: drv_add_lua_source(); break; + case DRV_INVOKE_EVENT_INIT: drv_invoke_event_init(); break; + case DRV_INVOKE_EVENT_UPDATE: drv_invoke_event_update(); break; + default: + assert(false && "Replay Log contains invalid command."); + } + return error_; +} + +void ReplayPlayer::set_error(Error e) { + error_ = e; + f_.close(); + f_.clear(); +} + +void ReplayPlayer::print_error(std::ostream &s) { + switch (error_) { + case ERR_NONE: + s << "No errors detected: " << logfn_ << std::endl; + return; + case ERR_OPEN_LOGFILE: + s << "Could not open logfile: " << logfn_ << std::endl; + return; + case ERR_LOGFILE_EOF: + s << "Logfile reached end-of-file: " << logfn_ << std::endl; + return; + case ERR_LOGFILE_CORRUPT: + s << "Logfile corrupt: " << logfn_ << std::endl; + return; + case ERR_NONDERMINISTIC: + s << "Nondeterminism detected: " << logfn_ << std::endl; + return; + case ERR_CREATE_ENGINE: + s << "Could not create engine: " << logfn_ << " " << engine_ << std::endl; + return; + } +} + +bool ReplayPlayer::open_logfile(const char *fn) { + logfn_ = fn; + f_.clear(); + f_.open(fn, std::ios_base::in | std::ios_base::binary); + if (!f_.good()) { + set_error(ERR_OPEN_LOGFILE); + return false; + } + return true; +} + +bool ReplayRecorder::create_engine(const char *kind) { + if (logging_) { + wlog_cmd_hash(f_, CREATE_ENGINE, eng::memhash()); + wlog_string(f_, kind); + flush(); + } + e_ = DrivenEngine::make(kind); + DrivenEngine::set(e_.get()); + return e_ != nullptr; +} + +void ReplayPlayer::create_engine() { + std::string_view kind = rlog_string(f_, buf_.get()); + if (!f_.good()) { + set_error(ERR_LOGFILE_CORRUPT); + return; + } + engine_ = std::string(kind); + e_ = DrivenEngine::make(kind); + DrivenEngine::set(e_.get()); + if (e_ == nullptr) { + set_error(ERR_CREATE_ENGINE); + return; + } +} + +void ReplayRecorder::drv_clear_new_outgoing() { + if (logging_) { + wlog_cmd_hash(f_, DRV_CLEAR_NEW_OUTGOING, eng::memhash()); + flush(); + } + e_->drv_clear_new_outgoing(); +} + +void ReplayPlayer::drv_clear_new_outgoing() { + e_->drv_clear_new_outgoing(); +} + +void ReplayRecorder::drv_sent_outgoing(int chid, int nbytes) { + assert ((nbytes >= 0) && (nbytes <= 65535)); + if (logging_) { + std::string_view data = e_->drv_peek_outgoing(chid); + assert(nbytes <= int(data.size())); + wlog_cmd_hash(f_, DRV_SENT_OUTGOING, eng::memhash()); + wlog_uint16(f_, chid); + wlog_uint16(f_, nbytes); + wlog_uint64(f_, SpookyHash::QkHash64(data.data(), nbytes)); + flush(); + } + e_->drv_sent_outgoing(chid, nbytes); +} + +void ReplayPlayer::drv_sent_outgoing() { + int chid = rlog_uint16(f_); + int nbytes = rlog_uint16(f_); + uint64_t hash = rlog_uint64(f_); + if (!f_.good()) { + set_error(ERR_LOGFILE_CORRUPT); + return; + } + std::string_view data = e_->drv_peek_outgoing(chid); + if (nbytes > int(data.size())) { + set_error(ERR_NONDERMINISTIC); + return; + } + if (hash != SpookyHash::QkHash64(data.data(), nbytes)) { + set_error(ERR_NONDERMINISTIC); + return; + } + e_->drv_sent_outgoing(chid, nbytes); +} + +void ReplayRecorder::drv_recv_incoming(int chid, std::string_view data) { + if (logging_) { + wlog_cmd_hash(f_, DRV_RECV_INCOMING, eng::memhash()); + wlog_uint16(f_, chid); + wlog_string(f_, data); + flush(); + } + e_->drv_recv_incoming(chid, data); +} + +void ReplayPlayer::drv_recv_incoming() { + int chid = rlog_uint16(f_); + std::string_view data = rlog_string(f_, buf_.get()); + if (!f_.good()) { + set_error(ERR_LOGFILE_CORRUPT); + return; + } + e_->drv_recv_incoming(chid, data); +} + +void ReplayRecorder::drv_notify_close(int chid, std::string_view err) { + if (logging_) { + wlog_cmd_hash(f_, DRV_NOTIFY_CLOSE, eng::memhash()); + wlog_uint16(f_, chid); + wlog_string(f_, err); + flush(); + } + e_->drv_notify_close(chid, err); +} + +void ReplayPlayer::drv_notify_close() { + int chid = rlog_uint16(f_); + std::string_view err = rlog_string(f_, buf_.get()); + if (!f_.good()) { + set_error(ERR_LOGFILE_CORRUPT); + return; + } + e_->drv_notify_close(chid, err); +} + +int ReplayRecorder::drv_notify_accept(int port) { + if (logging_) { + wlog_cmd_hash(f_, DRV_NOTIFY_ACCEPT, eng::memhash()); + wlog_uint16(f_, port); + flush(); + } + return e_->drv_notify_accept(port); +} + +void ReplayPlayer::drv_notify_accept() { + int port = rlog_uint16(f_); + if (!f_.good()) { + set_error(ERR_LOGFILE_CORRUPT); + return; + } + e_->drv_notify_accept(port); +} + +void ReplayRecorder::drv_clear_lua_source() { + if (logging_) { + wlog_cmd_hash(f_, DRV_CLEAR_LUA_SOURCE, eng::memhash()); + flush(); + } + e_->drv_clear_lua_source(); +} + +void ReplayPlayer::drv_clear_lua_source() { + e_->drv_clear_lua_source(); +} + +void ReplayRecorder::drv_add_lua_source(std::string_view fn, std::string_view data) { + if (logging_) { + wlog_cmd_hash(f_, DRV_ADD_LUA_SOURCE, eng::memhash()); + wlog_string(f_, fn); + wlog_string(f_, data); + flush(); + } + e_->drv_add_lua_source(fn, data); +} + +void ReplayPlayer::drv_add_lua_source() { + std::string fn(rlog_string(f_, buf_.get())); + std::string_view data = rlog_string(f_, buf_.get()); + if (!f_.good()) { + set_error(ERR_LOGFILE_CORRUPT); + return; + } + e_->drv_add_lua_source(fn, data); +} + +void ReplayRecorder::drv_invoke_event_init(int argc, char *argv[]) { + assert(argc <= MAX_ARGC); + if (logging_) { + wlog_cmd_hash(f_, DRV_INVOKE_EVENT_INIT, eng::memhash()); + wlog_uint16(f_, argc); + for (int i = 0; i < argc; i++) { + wlog_string(f_, argv[i]); + } + flush(); + } + e_->drv_invoke_event_init(argc, argv); +} + +void ReplayPlayer::drv_invoke_event_init() { + std::vector argv; + int argc = rlog_uint16(f_); + if (!f_.good()) { + set_error(ERR_LOGFILE_CORRUPT); + return; + } + if (argc > MAX_ARGC) { + set_error(ERR_LOGFILE_CORRUPT); + return; + } + for (int i = 0; i < argc; i++) { + std::string_view arg = rlog_string(f_, buf_.get()); + if (!f_.good()) { + set_error(ERR_LOGFILE_CORRUPT); + return; + } + argv.emplace_back(arg); + } + std::vector cargv; + for (int i = 0; i < argc; i++) { + cargv.emplace_back(const_cast(argv[i].c_str())); + } + e_->drv_invoke_event_init(argc, &(cargv[0])); +} + +void ReplayRecorder::drv_invoke_event_update(double clock) { + if (logging_) { + wlog_cmd_hash(f_, DRV_INVOKE_EVENT_UPDATE, eng::memhash()); + wlog_double(f_, clock); + flush(); + } + e_->drv_invoke_event_update(clock); +} + +void ReplayPlayer::drv_invoke_event_update() { + double clock = rlog_double(f_); + if (!f_.good()) { + set_error(ERR_LOGFILE_CORRUPT); + return; + } + e_->drv_invoke_event_update(clock); +} + } // namespace drv LuaDefine(unittests_driverutil, "", "some unit tests") { diff --git a/luprex/core/cpp/driver-util.hpp b/luprex/core/cpp/driver-util.hpp index 458d8276..1494941c 100644 --- a/luprex/core/cpp/driver-util.hpp +++ b/luprex/core/cpp/driver-util.hpp @@ -4,10 +4,10 @@ #include "wrap-string.hpp" #include "wrap-vector.hpp" - -#include -#include #include +#include +#include +#include "drivenengine.hpp" namespace drv { @@ -16,17 +16,116 @@ void split_host_port(std::string_view target, std::string &host, std::string &po std::vector parse_control_lst(std::string_view ctrl); +class ReplayRecorder { +private: + std::ofstream f_; + UniqueDrivenEngine e_; + bool logging_; + + void flush(); +public: + // Initialization consists of three steps: + // + // 1. The constructor, which creates an empty ReplayRecorder. + // 2. Open the logfile for writing, if desired, using open_logfile. + // 3. Make the engine, using create_engine. + // + // After that, you can use drv_xxx methods to send messages to the + // engine. These messages will get logged if the replay log is open. + // + ReplayRecorder() : logging_(false) {} -// Write encoded data to a replay log. -void wlog_byte(std::ostream &s, uint8_t byte); -void wlog_uint16(std::ostream &s, uint16_t v); -void wlog_string(std::ostream &s, std::string_view v); + // Open the logfile. + // + // If you're going to open a logfile, you must do so before + // creating the engine. Returns false if the logfile couldn't be + // opened. + // + bool open_logfile(const char *fn); -// Read encoded data from a file. -uint8_t rlog_byte(std::istream &s); -uint16_t rlog_uint16(std::istream &s); -std::string_view rlog_string(std::istream &s, char *buf, size_t buflen); + // Create the DrivenEngine. + // + // Returns false if the DrivenEngine couldn't be created. + // + bool create_engine(const char *kind); -} + // These don't need to be logged. + // + const eng::vector &drv_get_listen_ports() const { return e_->drv_get_listen_ports(); } + const eng::vector &drv_get_new_outgoing() const { return e_->drv_get_new_outgoing(); } + const eng::string &drv_get_target(int chid) const { return e_->drv_get_target(chid); } + bool drv_outgoing_empty(int chid) const { return e_->drv_outgoing_empty(chid); } + bool drv_get_channel_released(int chid) const { return e_->drv_get_channel_released(chid); } + std::string_view drv_peek_outgoing(int chid) const { return e_->drv_peek_outgoing(chid); } + bool drv_get_rescan_lua_source() const { return e_->drv_get_rescan_lua_source(); } + bool drv_get_stop_driver() const { return e_->drv_get_stop_driver(); } + + // These operations do need to be logged. + // + void drv_clear_new_outgoing(); + void drv_sent_outgoing(int chid, int nbytes); + void drv_recv_incoming(int chid, std::string_view data); + void drv_notify_close(int chid, std::string_view err); + int drv_notify_accept(int port); + void drv_clear_lua_source(); + void drv_add_lua_source(std::string_view fn, std::string_view data); + void drv_invoke_event_init(int argc, char *argv[]); + void drv_invoke_event_update(double clock); +}; + +class ReplayPlayer { +public: + enum Error { + ERR_NONE, + ERR_OPEN_LOGFILE, + ERR_LOGFILE_EOF, + ERR_LOGFILE_CORRUPT, + ERR_NONDERMINISTIC, + ERR_CREATE_ENGINE, + }; +private: + std::ifstream f_; + UniqueDrivenEngine e_; + std::unique_ptr buf_; + Error error_; + std::string logfn_; + std::string engine_; + + void set_error(Error e); + + void create_engine(); + void drv_clear_new_outgoing(); + void drv_sent_outgoing(); + void drv_recv_incoming(); + void drv_notify_close(); + void drv_notify_accept(); + void drv_clear_lua_source(); + void drv_add_lua_source(); + void drv_invoke_event_init(); + void drv_invoke_event_update(); +public: + ReplayPlayer(); + + // Open the logfile for reading. + // + // Returns false if the logfile can't be opened. + // + bool open_logfile(const char *fn); + + // Execute a single step from the replay log. + // + // Returns an error code, which is usually ERR_NONE. + // If it's anything else, display an error and stop. + // + Error step(); + + // Print an error message. + // + // Print a message associated with the most recent error. + // + void print_error(std::ostream &s); +}; + +} // namespace drv #endif // DRIVER_UTIL_HPP diff --git a/luprex/core/cpp/eng-malloc.cpp b/luprex/core/cpp/eng-malloc.cpp index bccc6c38..51d13b2d 100644 --- a/luprex/core/cpp/eng-malloc.cpp +++ b/luprex/core/cpp/eng-malloc.cpp @@ -113,7 +113,7 @@ namespace eng { dlfree(p); } int memhash() { - return hash; + return (hash & 0x3FFFFFFF) | 0x40000000; } } // namespace eng diff --git a/luprex/core/cpp/spookyv2.cpp b/luprex/core/cpp/spookyv2.cpp index fbb9d11f..c6f587c8 100644 --- a/luprex/core/cpp/spookyv2.cpp +++ b/luprex/core/cpp/spookyv2.cpp @@ -292,7 +292,7 @@ static void Short( // do the whole hash in one call -void SpookyHash::Hash128( +void SpookyHash::ChainHash128( const void *message, size_t length, uint64_t *hash1, diff --git a/luprex/core/cpp/spookyv2.hpp b/luprex/core/cpp/spookyv2.hpp index 735fcc33..2b6d7997 100644 --- a/luprex/core/cpp/spookyv2.hpp +++ b/luprex/core/cpp/spookyv2.hpp @@ -31,18 +31,53 @@ #include #include +#include +#include class SpookyHash { public: + // A hash is two uint64's. + // + using HashValue = std::pair; + // // SpookyHash: hash a single message in one call, produce 128-bit output // - static void Hash128( + static void ChainHash128( const void *message, // message to hash size_t length, // length of message in bytes uint64_t *hash1, // input seed0, output hash0 uint64_t *hash2); // input seed1, output hash1 + + + static inline HashValue QkHash128(const void *message, size_t len) { + uint64_t hash1 = 0; + uint64_t hash2 = 0; + ChainHash128(message, len, &hash1, &hash2); + return std::make_pair(hash1, hash2); + } + + static inline HashValue QkHash128(std::string_view v) { + uint64_t hash1 = 0; + uint64_t hash2 = 0; + ChainHash128(v.data(), v.size(), &hash1, &hash2); + return std::make_pair(hash1, hash2); + } + + static inline uint64_t QkHash64(const void *message, size_t len) { + uint64_t hash1 = 0; + uint64_t hash2 = 0; + ChainHash128(message, len, &hash1, &hash2); + return hash1; + } + + static inline uint64_t QkHash64(std::string_view v) { + uint64_t hash1 = 0; + uint64_t hash2 = 0; + ChainHash128(v.data(), v.size(), &hash1, &hash2); + return hash1; + } }; #endif // SPOOKYV2_HPP diff --git a/luprex/core/cpp/streambuffer.cpp b/luprex/core/cpp/streambuffer.cpp index e7835fd4..63ac3bea 100644 --- a/luprex/core/cpp/streambuffer.cpp +++ b/luprex/core/cpp/streambuffer.cpp @@ -464,7 +464,7 @@ bool StreamBuffer::contents_equal(const StreamBuffer *other) const { util::HashValue StreamBuffer::hash() const { uint64_t hash1 = 0x82A7912E7893AC87; uint64_t hash2 = 0x81D402740DE458F3; - SpookyHash::Hash128(read_cursor_, write_cursor_ - read_cursor_, &hash1, &hash2); + SpookyHash::ChainHash128(read_cursor_, write_cursor_ - read_cursor_, &hash1, &hash2); return std::make_pair(hash1, hash2); } diff --git a/luprex/core/cpp/util.cpp b/luprex/core/cpp/util.cpp index d5728fb9..412c7c39 100644 --- a/luprex/core/cpp/util.cpp +++ b/luprex/core/cpp/util.cpp @@ -114,14 +114,14 @@ IdVector sort_union_id_vectors(const IdVector &v1, const IdVector &v2) { HashValue hash_string(const eng::string &s) { uint64_t hash1 = 0; uint64_t hash2 = 0; - SpookyHash::Hash128(s.c_str(), s.size(), &hash1, &hash2); + SpookyHash::ChainHash128(s.c_str(), s.size(), &hash1, &hash2); return util::HashValue(hash1, hash2); } HashValue hash_id_vector(const IdVector &idv) { uint64_t hash1 = 0; uint64_t hash2 = 0; - SpookyHash::Hash128(&idv[0], idv.size() * sizeof(int64_t), &hash1, &hash2); + SpookyHash::ChainHash128(&idv[0], idv.size() * sizeof(int64_t), &hash1, &hash2); return util::HashValue(hash1, hash2); }