Files
integration/luprex/core/cpp/driver-util.cpp

529 lines
14 KiB
C++

#include "wrap-string.hpp"
#include "wrap-vector.hpp"
#include "util.hpp"
#include "spookyv2.hpp"
#include "driver-util.hpp"
#include "luastack.hpp"
#include <string_view>
#include <fstream>
#include <ios>
#include <iostream>
#define RLOG_BUFSIZE (1024 * 1024)
#define MAX_ARGC 1024
namespace drv {
std::vector<std::string_view> split_view(std::string_view v, char sep) {
std::vector<std::string_view> result;
while (true) {
size_t pos = v.find(sep);
if (pos == std::string_view::npos) break;
result.push_back(v.substr(0, pos));
v = v.substr(pos + 1);
}
result.push_back(v);
return result;
}
void split_target(std::string_view target, std::string &cert, std::string &host, std::string &port) {
std::vector<std::string_view> split = split_view(target, ':');
if (split.size() != 3) {
cert.clear(); host.clear(); port.clear();
return;
}
if (split[0].empty() || split[1].empty() || split[2].empty()) {
cert.clear(); host.clear(); port.clear();
return;
}
cert = std::string(split[0]);
host = std::string(split[1]);
port = std::string(split[2]);
}
std::vector<std::string> parse_control_lst(std::string_view ctrl) {
std::vector<std::string> result;
while (!ctrl.empty()) {
std::string_view line = sv::read_to_line(ctrl);
std::string_view trimmed = sv::trim(line);
if ((trimmed.size() > 0) && (trimmed[0] != '#')) {
result.emplace_back(trimmed);
}
}
return result;
}
enum DrvAction {
CREATE_ENGINE,
CLEAN_EXIT,
DRV_CLEAR_NEW_OUTGOING,
DRV_SENT_OUTGOING,
DRV_RECV_INCOMING,
DRV_NOTIFY_CLOSE,
DRV_NOTIFY_ACCEPT,
DRV_CLEAR_LUA_SOURCE,
DRV_ADD_LUA_SOURCE,
DRV_INVOKE_EVENT_INIT,
DRV_INVOKE_EVENT_UPDATE
};
inline static const char *action_string(DrvAction act) {
switch(act) {
case CREATE_ENGINE: return "CREATE_ENGINE";
case CLEAN_EXIT: return "CLEAN_EXIT";
case DRV_CLEAR_NEW_OUTGOING: return "DRV_CLEAR_NEW_OUTGOING";
case DRV_SENT_OUTGOING: return "DRV_SENT_OUTGOING";
case DRV_RECV_INCOMING: return "DRV_RECV_INCOMING";
case DRV_NOTIFY_CLOSE: return "DRV_NOTIFY_CLOSE";
case DRV_NOTIFY_ACCEPT: return "DRV_NOTIFY_ACCEPT";
case DRV_CLEAR_LUA_SOURCE: return "DRV_CLEAR_LUA_SOURCE";
case DRV_ADD_LUA_SOURCE: return "DRV_ADD_LUA_SOURCE";
case DRV_INVOKE_EVENT_INIT: return "DRV_INVOKE_EVENT_INIT";
case DRV_INVOKE_EVENT_UPDATE: return "DRV_INVOKE_EVENT_UPDATE";
default: return "unknown";
}
}
static void wlog_uint8(std::ofstream &s, uint8_t v) {
s.put((char)v);
}
static void wlog_uint16(std::ofstream &s, uint16_t v) {
s.write((const char *)&v, 2);
}
static void wlog_uint32(std::ofstream &s, uint32_t v) {
s.write((const char *)&v, 4);
}
static void wlog_uint64(std::ofstream &s, uint64_t v) {
s.write((const char *)&v, 8);
}
static void wlog_double(std::ofstream &s, double v) {
s.write((const char *)&v, 8);
}
static void wlog_string(std::ofstream &s, std::string_view v) {
assert(v.size() < RLOG_BUFSIZE);
if (v.size() >= 255) {
wlog_uint8(s, 0xFF);
wlog_uint32(s, v.size());
} else {
wlog_uint8(s, v.size());
}
s.write(v.data(), v.size());
}
static void wlog_cmd_hash(std::ofstream &s, DrvAction act, uint32_t hash) {
// std::cerr << "Logging " << action_string(act) << " " << hash << std::endl;
wlog_uint8(s, act);
wlog_uint32(s, hash);
}
// After doing an rlog operation, you should check the stream
// for "s.good()" to find out if there was any error.
static uint8_t rlog_uint8(std::ifstream &s) {
uint8_t result;
s.read((char *)&result, 1);
if (!s.good()) return 0;
return result;
}
static uint16_t rlog_uint16(std::ifstream &s) {
uint16_t result;
s.read((char *)&result, 2);
if (!s.good()) return 0;
return result;
}
static uint32_t rlog_uint32(std::ifstream &s) {
uint32_t result;
s.read((char *)&result, 4);
if (!s.good()) return 0;
return result;
}
static uint64_t rlog_uint64(std::ifstream &s) {
uint64_t result;
s.read((char *)&result, 8);
if (!s.good()) return 0;
return result;
}
static double rlog_double(std::ifstream &s) {
double result;
s.read((char *)&result, 8);
if (!s.good()) return 0.0;
return result;
}
std::string_view rlog_string(std::ifstream &s, char *rlog_buf) {
uint32_t len = rlog_uint8(s);
if (len == 255) {
len = rlog_uint32(s);
}
assert(len <= RLOG_BUFSIZE);
if (len > 0) s.read(rlog_buf, len);
if (!s.good()) return std::string_view();
return std::string_view(rlog_buf, len);
}
ReplayPlayer::ReplayPlayer() {
status_ = ST_REPLAYING;
enable_stdout_ = false;
buf_.reset(new char[RLOG_BUFSIZE]);
}
void ReplayRecorder::flush() {
f_.flush();
if (!f_.good()) {
std::cerr << "Logfile write failed, replay logging abandoned." << std::endl;
f_.close();
f_.clear();
logging_ = false;
}
}
bool ReplayRecorder::open_logfile(const char *fn) {
f_.open(fn, std::ios_base::out | std::ios_base::binary | std::ios_base::trunc);
if (f_.good()) {
logging_ = true;
return true;
} else {
f_.close();
f_.clear();
return false;
}
}
ReplayPlayer::Status ReplayPlayer::step() {
if (status_ != ST_REPLAYING) return status_;
uint8_t code = rlog_uint8(f_);
if (f_.eof()) {
set_status(ST_LOGFILE_ENDS_ABRUPTLY);
return status_;
}
int hash = rlog_uint32(f_);
if (!f_.good()) {
set_status(ST_LOGFILE_CORRUPT);
return status_;
}
// std::cerr << "Executing: " << action_string(DrvAction(code)) << " " << eng::memhash() << std::endl;
if (hash != eng::memhash()) {
set_status(ST_NONDERMINISTIC);
return status_;
}
switch (code) {
case CREATE_ENGINE: create_engine(); break;
case CLEAN_EXIT: clean_exit(); break;
case DRV_CLEAR_NEW_OUTGOING: drv_clear_new_outgoing(); break;
case DRV_SENT_OUTGOING: drv_sent_outgoing(); break;
case DRV_RECV_INCOMING: drv_recv_incoming(); break;
case DRV_NOTIFY_CLOSE: drv_notify_close(); break;
case DRV_NOTIFY_ACCEPT: drv_notify_accept(); break;
case DRV_CLEAR_LUA_SOURCE: drv_clear_lua_source(); break;
case DRV_ADD_LUA_SOURCE: drv_add_lua_source(); break;
case DRV_INVOKE_EVENT_INIT: drv_invoke_event_init(); break;
case DRV_INVOKE_EVENT_UPDATE: drv_invoke_event_update(); break;
default:
assert(false && "Replay Log contains invalid command.");
}
return status_;
}
void ReplayPlayer::set_status(Status e) {
status_ = e;
f_.close();
f_.clear();
}
void ReplayPlayer::print_status(std::ostream &s) {
switch (status_) {
case ST_REPLAYING:
s << "No errors detected: " << logfn_ << std::endl;
return;
case ST_CLEAN_EXIT:
s << "Engine exited cleanly without errors: " << logfn_ << std::endl;
return;
case ST_ERR_OPENING_LOGFILE:
s << "Could not open logfile: " << logfn_ << std::endl;
return;
case ST_LOGFILE_ENDS_ABRUPTLY:
s << "Logfile reached end-of-file: " << logfn_ << std::endl;
return;
case ST_LOGFILE_CORRUPT:
s << "Logfile corrupt: " << logfn_ << std::endl;
return;
case ST_NONDERMINISTIC:
s << "Nondeterminism detected: " << logfn_ << std::endl;
return;
case ST_COULDNT_CREATE_ENGINE:
s << "Could not create engine: " << logfn_ << " " << engine_ << std::endl;
return;
}
}
bool ReplayPlayer::open_logfile(const char *fn) {
logfn_ = fn;
f_.clear();
f_.open(fn, std::ios_base::in | std::ios_base::binary);
if (!f_.good()) {
set_status(ST_ERR_OPENING_LOGFILE);
return false;
}
return true;
}
bool ReplayRecorder::create_engine(const char *kind) {
if (logging_) {
wlog_cmd_hash(f_, CREATE_ENGINE, eng::memhash());
wlog_string(f_, kind);
flush();
}
e_ = DrivenEngine::make(kind);
DrivenEngine::set(e_.get());
return e_ != nullptr;
}
void ReplayPlayer::create_engine() {
std::string_view kind = rlog_string(f_, buf_.get());
if (!f_.good()) {
set_status(ST_LOGFILE_CORRUPT);
return;
}
engine_ = std::string(kind);
e_ = DrivenEngine::make(kind);
DrivenEngine::set(e_.get());
if (e_ == nullptr) {
set_status(ST_COULDNT_CREATE_ENGINE);
return;
}
}
void ReplayRecorder::clean_exit() {
if (logging_) {
wlog_cmd_hash(f_, CLEAN_EXIT, eng::memhash());
flush();
}
}
void ReplayPlayer::clean_exit() {
set_status(ST_CLEAN_EXIT);
}
void ReplayRecorder::drv_clear_new_outgoing() {
if (logging_) {
wlog_cmd_hash(f_, DRV_CLEAR_NEW_OUTGOING, eng::memhash());
flush();
}
e_->drv_clear_new_outgoing();
}
void ReplayPlayer::drv_clear_new_outgoing() {
e_->drv_clear_new_outgoing();
}
void ReplayRecorder::drv_sent_outgoing(int chid, int nbytes) {
assert ((nbytes >= 0) && (nbytes <= 65535));
if (logging_) {
std::string_view data = e_->drv_peek_outgoing(chid);
assert(nbytes <= int(data.size()));
wlog_cmd_hash(f_, DRV_SENT_OUTGOING, eng::memhash());
wlog_uint16(f_, chid);
wlog_uint16(f_, nbytes);
wlog_uint64(f_, SpookyHash::QkHash64(data.data(), nbytes));
flush();
}
e_->drv_sent_outgoing(chid, nbytes);
}
void ReplayPlayer::drv_sent_outgoing() {
int chid = rlog_uint16(f_);
int nbytes = rlog_uint16(f_);
uint64_t hash = rlog_uint64(f_);
if (!f_.good()) {
set_status(ST_LOGFILE_CORRUPT);
return;
}
std::string_view data = e_->drv_peek_outgoing(chid);
if (nbytes > int(data.size())) {
set_status(ST_NONDERMINISTIC);
return;
}
if (hash != SpookyHash::QkHash64(data.data(), nbytes)) {
set_status(ST_NONDERMINISTIC);
return;
}
if ((chid == 0) && (enable_stdout_)) {
std::string_view sub = data.substr(0, nbytes);
std::cout << sub;
}
e_->drv_sent_outgoing(chid, nbytes);
}
void ReplayRecorder::drv_recv_incoming(int chid, std::string_view data) {
if (logging_) {
wlog_cmd_hash(f_, DRV_RECV_INCOMING, eng::memhash());
wlog_uint16(f_, chid);
wlog_string(f_, data);
flush();
}
e_->drv_recv_incoming(chid, data);
}
void ReplayPlayer::drv_recv_incoming() {
int chid = rlog_uint16(f_);
std::string_view data = rlog_string(f_, buf_.get());
if (!f_.good()) {
set_status(ST_LOGFILE_CORRUPT);
return;
}
e_->drv_recv_incoming(chid, data);
}
void ReplayRecorder::drv_notify_close(int chid, std::string_view err) {
if (logging_) {
wlog_cmd_hash(f_, DRV_NOTIFY_CLOSE, eng::memhash());
wlog_uint16(f_, chid);
wlog_string(f_, err);
flush();
}
e_->drv_notify_close(chid, err);
}
void ReplayPlayer::drv_notify_close() {
int chid = rlog_uint16(f_);
std::string_view err = rlog_string(f_, buf_.get());
if (!f_.good()) {
set_status(ST_LOGFILE_CORRUPT);
return;
}
e_->drv_notify_close(chid, err);
}
int ReplayRecorder::drv_notify_accept(int port) {
if (logging_) {
wlog_cmd_hash(f_, DRV_NOTIFY_ACCEPT, eng::memhash());
wlog_uint16(f_, port);
flush();
}
return e_->drv_notify_accept(port);
}
void ReplayPlayer::drv_notify_accept() {
int port = rlog_uint16(f_);
if (!f_.good()) {
set_status(ST_LOGFILE_CORRUPT);
return;
}
e_->drv_notify_accept(port);
}
void ReplayRecorder::drv_clear_lua_source() {
if (logging_) {
wlog_cmd_hash(f_, DRV_CLEAR_LUA_SOURCE, eng::memhash());
flush();
}
e_->drv_clear_lua_source();
}
void ReplayPlayer::drv_clear_lua_source() {
e_->drv_clear_lua_source();
}
void ReplayRecorder::drv_add_lua_source(std::string_view fn, std::string_view data) {
if (logging_) {
wlog_cmd_hash(f_, DRV_ADD_LUA_SOURCE, eng::memhash());
wlog_string(f_, fn);
wlog_string(f_, data);
flush();
}
e_->drv_add_lua_source(fn, data);
}
void ReplayPlayer::drv_add_lua_source() {
std::string fn(rlog_string(f_, buf_.get()));
std::string_view data = rlog_string(f_, buf_.get());
if (!f_.good()) {
set_status(ST_LOGFILE_CORRUPT);
return;
}
e_->drv_add_lua_source(fn, data);
}
void ReplayRecorder::drv_invoke_event_init(int argc, char *argv[]) {
assert(argc <= MAX_ARGC);
if (logging_) {
wlog_cmd_hash(f_, DRV_INVOKE_EVENT_INIT, eng::memhash());
wlog_uint16(f_, argc);
for (int i = 0; i < argc; i++) {
wlog_string(f_, argv[i]);
}
flush();
}
e_->drv_invoke_event_init(argc, argv);
}
void ReplayPlayer::drv_invoke_event_init() {
std::vector<std::string> argv;
int argc = rlog_uint16(f_);
if (!f_.good()) {
set_status(ST_LOGFILE_CORRUPT);
return;
}
if (argc > MAX_ARGC) {
set_status(ST_LOGFILE_CORRUPT);
return;
}
for (int i = 0; i < argc; i++) {
std::string_view arg = rlog_string(f_, buf_.get());
if (!f_.good()) {
set_status(ST_LOGFILE_CORRUPT);
return;
}
argv.emplace_back(arg);
}
std::vector<char *> cargv;
for (int i = 0; i < argc; i++) {
cargv.emplace_back(const_cast<char *>(argv[i].c_str()));
}
e_->drv_invoke_event_init(argc, &(cargv[0]));
}
void ReplayRecorder::drv_invoke_event_update(double clock) {
if (logging_) {
wlog_cmd_hash(f_, DRV_INVOKE_EVENT_UPDATE, eng::memhash());
wlog_double(f_, clock);
flush();
}
e_->drv_invoke_event_update(clock);
}
void ReplayPlayer::drv_invoke_event_update() {
double clock = rlog_double(f_);
if (!f_.good()) {
set_status(ST_LOGFILE_CORRUPT);
return;
}
e_->drv_invoke_event_update(clock);
}
} // namespace drv
LuaDefine(unittests_driverutil, "", "some unit tests") {
// Test split_target
std::string cert, host, port;
drv::split_target("cert:stanford.edu:80", cert, host, port);
LuaAssertStrEq(L, cert, "cert");
LuaAssertStrEq(L, host, "stanford.edu");
LuaAssertStrEq(L, port, "80");
return 0;
}