#include "wrap-string.hpp" #include "wrap-vector.hpp" #include #include #include #include #include "spookyv2.hpp" #include "driver-util.hpp" #include "luastack.hpp" #include "util.hpp" #define RLOG_BUFSIZE (1024 * 1024) #define MAX_ARGC 1024 namespace drv { void split_host_port(std::string_view target, std::string &host, std::string &port) { size_t lastcolon = target.rfind(':'); if (lastcolon == std::string_view::npos) { host = ""; port = ""; return; } host = target.substr(0, lastcolon); port = target.substr(lastcolon + 1); if ((host == "") || (port == "")) { host = ""; port = ""; return; } } std::vector parse_control_lst(std::string_view ctrl) { std::vector result; while (!ctrl.empty()) { std::string_view line = util::sv_read_line(ctrl); std::string_view trimmed = util::sv_trim(line); if ((trimmed.size() > 0) && (trimmed[0] != '#')) { result.emplace_back(trimmed); } } return result; } enum DrvAction { CREATE_ENGINE, CLEAN_EXIT, DRV_CLEAR_NEW_OUTGOING, DRV_SENT_OUTGOING, DRV_RECV_INCOMING, DRV_NOTIFY_CLOSE, DRV_NOTIFY_ACCEPT, DRV_CLEAR_LUA_SOURCE, DRV_ADD_LUA_SOURCE, DRV_INVOKE_EVENT_INIT, DRV_INVOKE_EVENT_UPDATE }; inline static const char *action_string(DrvAction act) { switch(act) { case CREATE_ENGINE: return "CREATE_ENGINE"; case CLEAN_EXIT: return "CLEAN_EXIT"; case DRV_CLEAR_NEW_OUTGOING: return "DRV_CLEAR_NEW_OUTGOING"; case DRV_SENT_OUTGOING: return "DRV_SENT_OUTGOING"; case DRV_RECV_INCOMING: return "DRV_RECV_INCOMING"; case DRV_NOTIFY_CLOSE: return "DRV_NOTIFY_CLOSE"; case DRV_NOTIFY_ACCEPT: return "DRV_NOTIFY_ACCEPT"; case DRV_CLEAR_LUA_SOURCE: return "DRV_CLEAR_LUA_SOURCE"; case DRV_ADD_LUA_SOURCE: return "DRV_ADD_LUA_SOURCE"; case DRV_INVOKE_EVENT_INIT: return "DRV_INVOKE_EVENT_INIT"; case DRV_INVOKE_EVENT_UPDATE: return "DRV_INVOKE_EVENT_UPDATE"; default: return "unknown"; } } static void wlog_uint8(std::ofstream &s, uint8_t v) { s.put((char)v); } static void wlog_uint16(std::ofstream &s, uint16_t v) { s.write((const char *)&v, 2); } static void wlog_uint32(std::ofstream &s, uint32_t v) { s.write((const char *)&v, 4); } static void wlog_uint64(std::ofstream &s, uint64_t v) { s.write((const char *)&v, 8); } static void wlog_double(std::ofstream &s, double v) { s.write((const char *)&v, 8); } static void wlog_string(std::ofstream &s, std::string_view v) { assert(v.size() < RLOG_BUFSIZE); if (v.size() >= 255) { wlog_uint8(s, 0xFF); wlog_uint32(s, v.size()); } else { wlog_uint8(s, v.size()); } s.write(v.data(), v.size()); } static void wlog_cmd_hash(std::ofstream &s, DrvAction act, uint32_t hash) { // std::cerr << "Logging " << action_string(act) << " " << hash << std::endl; wlog_uint8(s, act); wlog_uint32(s, hash); } // After doing an rlog operation, you should check the stream // for "s.good()" to find out if there was any error. static uint8_t rlog_uint8(std::ifstream &s) { uint8_t result; s.read((char *)&result, 1); if (!s.good()) return 0; return result; } static uint16_t rlog_uint16(std::ifstream &s) { uint16_t result; s.read((char *)&result, 2); if (!s.good()) return 0; return result; } static uint32_t rlog_uint32(std::ifstream &s) { uint32_t result; s.read((char *)&result, 4); if (!s.good()) return 0; return result; } static uint64_t rlog_uint64(std::ifstream &s) { uint64_t result; s.read((char *)&result, 8); if (!s.good()) return 0; return result; } static double rlog_double(std::ifstream &s) { double result; s.read((char *)&result, 8); if (!s.good()) return 0.0; return result; } std::string_view rlog_string(std::ifstream &s, char *rlog_buf) { uint32_t len = rlog_uint8(s); if (len == 255) { len = rlog_uint32(s); } assert(len <= RLOG_BUFSIZE); if (len > 0) s.read(rlog_buf, len); if (!s.good()) return std::string_view(); return std::string_view(rlog_buf, len); } ReplayPlayer::ReplayPlayer() { status_ = ST_REPLAYING; enable_stdout_ = false; buf_.reset(new char[RLOG_BUFSIZE]); } void ReplayRecorder::flush() { f_.flush(); if (!f_.good()) { std::cerr << "Logfile write failed, replay logging abandoned." << std::endl; f_.close(); f_.clear(); logging_ = false; } } bool ReplayRecorder::open_logfile(const char *fn) { f_.open(fn, std::ios_base::out | std::ios_base::binary | std::ios_base::trunc); if (f_.good()) { logging_ = true; return true; } else { f_.close(); f_.clear(); return false; } } ReplayPlayer::Status ReplayPlayer::step() { if (status_ != ST_REPLAYING) return status_; uint8_t code = rlog_uint8(f_); if (f_.eof()) { set_status(ST_LOGFILE_ENDS_ABRUPTLY); return status_; } int hash = rlog_uint32(f_); if (!f_.good()) { set_status(ST_LOGFILE_CORRUPT); return status_; } // std::cerr << "Executing: " << action_string(DrvAction(code)) << " " << eng::memhash() << std::endl; if (hash != eng::memhash()) { set_status(ST_NONDERMINISTIC); return status_; } switch (code) { case CREATE_ENGINE: create_engine(); break; case CLEAN_EXIT: clean_exit(); break; case DRV_CLEAR_NEW_OUTGOING: drv_clear_new_outgoing(); break; case DRV_SENT_OUTGOING: drv_sent_outgoing(); break; case DRV_RECV_INCOMING: drv_recv_incoming(); break; case DRV_NOTIFY_CLOSE: drv_notify_close(); break; case DRV_NOTIFY_ACCEPT: drv_notify_accept(); break; case DRV_CLEAR_LUA_SOURCE: drv_clear_lua_source(); break; case DRV_ADD_LUA_SOURCE: drv_add_lua_source(); break; case DRV_INVOKE_EVENT_INIT: drv_invoke_event_init(); break; case DRV_INVOKE_EVENT_UPDATE: drv_invoke_event_update(); break; default: assert(false && "Replay Log contains invalid command."); } return status_; } void ReplayPlayer::set_status(Status e) { status_ = e; f_.close(); f_.clear(); } void ReplayPlayer::print_status(std::ostream &s) { switch (status_) { case ST_REPLAYING: s << "No errors detected: " << logfn_ << std::endl; return; case ST_CLEAN_EXIT: s << "Engine exited cleanly without errors: " << logfn_ << std::endl; return; case ST_ERR_OPENING_LOGFILE: s << "Could not open logfile: " << logfn_ << std::endl; return; case ST_LOGFILE_ENDS_ABRUPTLY: s << "Logfile reached end-of-file: " << logfn_ << std::endl; return; case ST_LOGFILE_CORRUPT: s << "Logfile corrupt: " << logfn_ << std::endl; return; case ST_NONDERMINISTIC: s << "Nondeterminism detected: " << logfn_ << std::endl; return; case ST_COULDNT_CREATE_ENGINE: s << "Could not create engine: " << logfn_ << " " << engine_ << std::endl; return; } } bool ReplayPlayer::open_logfile(const char *fn) { logfn_ = fn; f_.clear(); f_.open(fn, std::ios_base::in | std::ios_base::binary); if (!f_.good()) { set_status(ST_ERR_OPENING_LOGFILE); return false; } return true; } bool ReplayRecorder::create_engine(const char *kind) { if (logging_) { wlog_cmd_hash(f_, CREATE_ENGINE, eng::memhash()); wlog_string(f_, kind); flush(); } e_ = DrivenEngine::make(kind); DrivenEngine::set(e_.get()); return e_ != nullptr; } void ReplayPlayer::create_engine() { std::string_view kind = rlog_string(f_, buf_.get()); if (!f_.good()) { set_status(ST_LOGFILE_CORRUPT); return; } engine_ = std::string(kind); e_ = DrivenEngine::make(kind); DrivenEngine::set(e_.get()); if (e_ == nullptr) { set_status(ST_COULDNT_CREATE_ENGINE); return; } } void ReplayRecorder::clean_exit() { if (logging_) { wlog_cmd_hash(f_, CLEAN_EXIT, eng::memhash()); flush(); } } void ReplayPlayer::clean_exit() { set_status(ST_CLEAN_EXIT); } void ReplayRecorder::drv_clear_new_outgoing() { if (logging_) { wlog_cmd_hash(f_, DRV_CLEAR_NEW_OUTGOING, eng::memhash()); flush(); } e_->drv_clear_new_outgoing(); } void ReplayPlayer::drv_clear_new_outgoing() { e_->drv_clear_new_outgoing(); } void ReplayRecorder::drv_sent_outgoing(int chid, int nbytes) { assert ((nbytes >= 0) && (nbytes <= 65535)); if (logging_) { std::string_view data = e_->drv_peek_outgoing(chid); assert(nbytes <= int(data.size())); wlog_cmd_hash(f_, DRV_SENT_OUTGOING, eng::memhash()); wlog_uint16(f_, chid); wlog_uint16(f_, nbytes); wlog_uint64(f_, SpookyHash::QkHash64(data.data(), nbytes)); flush(); } e_->drv_sent_outgoing(chid, nbytes); } void ReplayPlayer::drv_sent_outgoing() { int chid = rlog_uint16(f_); int nbytes = rlog_uint16(f_); uint64_t hash = rlog_uint64(f_); if (!f_.good()) { set_status(ST_LOGFILE_CORRUPT); return; } std::string_view data = e_->drv_peek_outgoing(chid); if (nbytes > int(data.size())) { set_status(ST_NONDERMINISTIC); return; } if (hash != SpookyHash::QkHash64(data.data(), nbytes)) { set_status(ST_NONDERMINISTIC); return; } if ((chid == 0) && (enable_stdout_)) { std::string_view sub = data.substr(0, nbytes); std::cout << sub; } e_->drv_sent_outgoing(chid, nbytes); } void ReplayRecorder::drv_recv_incoming(int chid, std::string_view data) { if (logging_) { wlog_cmd_hash(f_, DRV_RECV_INCOMING, eng::memhash()); wlog_uint16(f_, chid); wlog_string(f_, data); flush(); } e_->drv_recv_incoming(chid, data); } void ReplayPlayer::drv_recv_incoming() { int chid = rlog_uint16(f_); std::string_view data = rlog_string(f_, buf_.get()); if (!f_.good()) { set_status(ST_LOGFILE_CORRUPT); return; } e_->drv_recv_incoming(chid, data); } void ReplayRecorder::drv_notify_close(int chid, std::string_view err) { if (logging_) { wlog_cmd_hash(f_, DRV_NOTIFY_CLOSE, eng::memhash()); wlog_uint16(f_, chid); wlog_string(f_, err); flush(); } e_->drv_notify_close(chid, err); } void ReplayPlayer::drv_notify_close() { int chid = rlog_uint16(f_); std::string_view err = rlog_string(f_, buf_.get()); if (!f_.good()) { set_status(ST_LOGFILE_CORRUPT); return; } e_->drv_notify_close(chid, err); } int ReplayRecorder::drv_notify_accept(int port) { if (logging_) { wlog_cmd_hash(f_, DRV_NOTIFY_ACCEPT, eng::memhash()); wlog_uint16(f_, port); flush(); } return e_->drv_notify_accept(port); } void ReplayPlayer::drv_notify_accept() { int port = rlog_uint16(f_); if (!f_.good()) { set_status(ST_LOGFILE_CORRUPT); return; } e_->drv_notify_accept(port); } void ReplayRecorder::drv_clear_lua_source() { if (logging_) { wlog_cmd_hash(f_, DRV_CLEAR_LUA_SOURCE, eng::memhash()); flush(); } e_->drv_clear_lua_source(); } void ReplayPlayer::drv_clear_lua_source() { e_->drv_clear_lua_source(); } void ReplayRecorder::drv_add_lua_source(std::string_view fn, std::string_view data) { if (logging_) { wlog_cmd_hash(f_, DRV_ADD_LUA_SOURCE, eng::memhash()); wlog_string(f_, fn); wlog_string(f_, data); flush(); } e_->drv_add_lua_source(fn, data); } void ReplayPlayer::drv_add_lua_source() { std::string fn(rlog_string(f_, buf_.get())); std::string_view data = rlog_string(f_, buf_.get()); if (!f_.good()) { set_status(ST_LOGFILE_CORRUPT); return; } e_->drv_add_lua_source(fn, data); } void ReplayRecorder::drv_invoke_event_init(int argc, char *argv[]) { assert(argc <= MAX_ARGC); if (logging_) { wlog_cmd_hash(f_, DRV_INVOKE_EVENT_INIT, eng::memhash()); wlog_uint16(f_, argc); for (int i = 0; i < argc; i++) { wlog_string(f_, argv[i]); } flush(); } e_->drv_invoke_event_init(argc, argv); } void ReplayPlayer::drv_invoke_event_init() { std::vector argv; int argc = rlog_uint16(f_); if (!f_.good()) { set_status(ST_LOGFILE_CORRUPT); return; } if (argc > MAX_ARGC) { set_status(ST_LOGFILE_CORRUPT); return; } for (int i = 0; i < argc; i++) { std::string_view arg = rlog_string(f_, buf_.get()); if (!f_.good()) { set_status(ST_LOGFILE_CORRUPT); return; } argv.emplace_back(arg); } std::vector cargv; for (int i = 0; i < argc; i++) { cargv.emplace_back(const_cast(argv[i].c_str())); } e_->drv_invoke_event_init(argc, &(cargv[0])); } void ReplayRecorder::drv_invoke_event_update(double clock) { if (logging_) { wlog_cmd_hash(f_, DRV_INVOKE_EVENT_UPDATE, eng::memhash()); wlog_double(f_, clock); flush(); } e_->drv_invoke_event_update(clock); } void ReplayPlayer::drv_invoke_event_update() { double clock = rlog_double(f_); if (!f_.good()) { set_status(ST_LOGFILE_CORRUPT); return; } e_->drv_invoke_event_update(clock); } } // namespace drv LuaDefine(unittests_driverutil, "", "some unit tests") { // Test split_host_port std::string host, port; drv::split_host_port("stanford.edu:80", host, port); LuaAssertStrEq(L, host, "stanford.edu"); LuaAssertStrEq(L, port, "80"); return 0; }