diff --git a/luprex/core/cpp/drivenengine.cpp b/luprex/core/cpp/drivenengine.cpp new file mode 100644 index 00000000..fad987ec --- /dev/null +++ b/luprex/core/cpp/drivenengine.cpp @@ -0,0 +1 @@ +#include "channel.hpp" \ No newline at end of file diff --git a/luprex/core/cpp/drivenengine.hpp b/luprex/core/cpp/drivenengine.hpp new file mode 100644 index 00000000..eb08d184 --- /dev/null +++ b/luprex/core/cpp/drivenengine.hpp @@ -0,0 +1,339 @@ +////////////////////////////////////////////////////////////// +// +// DrivenEngine +// +// This module achieves two goals: +// +// * Makes it possible to do replay logging. +// * Makes the engine act like a deterministic state machine. +// +// We want to implement replay logging. That means we want to be able to run +// the engine, keeping a log, then "replay" that execution later. +// +// Unfortunately, I/O makes this difficult. Suppose that during the execution +// of the server, the server reads a lua source file. Later, we want to replay +// that execution, but the lua source file has changed on disk. When the engine +// reads the lua source file, we need it to "read" the data that *used* to be +// there, not the data that's currently there. +// +// To make it work, all I/O has to operate in one of two modes: either it can be +// real I/O, or it can be simulated I/O that retrieves data from the logfile. +// +// This module, DrivenEngine, provides an API for the engine to do all its I/O. +// It provides a variety of methods to open sockets, read and write sockets, +// read lua source, get the clock, and so forth. +// +// But in reality, these I/O functions don't ever call operating system +// functions like "read" or "write" or "connect." They don't call the operating +// system at all - not even indirectly, through a wrapper. Therefore, they +// can't really do any I/O. When you use one of these I/O functions to (say) +// write some data to a communication channel, the only thing that happens +// is that the data is put into a buffer. The actual transmission of the +// data happens elsewhere, in what is called the "Driver." Likewise, when you +// use one of these I/O functions to read data, it only returns data that +// was previously stored by the "Driver." +// +// The "Driver" is a module that implements the actual I/O. It is highly +// OS-dependent code, because it contains code to manipulate sockets, time +// clocks, and the like. +// +// From the perspective of the driver, the DrivenEngine is a C++ object that +// acts like a state machine. This state machine is driven forward by I/O +// events. The DrivenEngine provides an API where the driver can feed in these +// I/O events. +// +// Notice that the usual call graph is inverted: in most application programs, +// the application calls the operating system to do I/O. But when using class +// DrivenEngine, it's the other way around: the driver calls into class +// DrivenEngine to drive it forward. I/O routines drive computation. +// +// So the upshot of all this is that the DrivenEngine is a deterministic +// state machine, free of all OS-specific code. +// +////////////////////////////////////////////////////////////// +// +// +// * List all existing channels using drv_list_channels. +// +// * If there are any new channels in the channel list, use +// drv_get_channel_target to fetch the target host, then create a socket and +// open the connection. Associate the socket with the channel. +// +// * Do an OS 'poll'. The poll should include the sockets for all channels in +// the channel list, all listening ports, and stdio. +// +// * If the poll indicates that a listening port has acceptable connections, +// accept and call drv_notify_accept. Associate the accepted socket with the +// channel. +// +// * If the poll indicates that a connection can accept outgoing data, use +// drv_peek_outgoing to fetch some data to write, and write it. Use +// drv_sent_outgoing_bytes to indicate that the data was sent. +// +// * If the poll indicates that a connection has incoming data, read the data +// then push it into the channel using drv_recv_incoming. +// +// * If the poll indicates that STDIO can be read/written, use +// drv_peek_outgoing, drv_sent_outgoing, and drv_recv_incoming in the same +// manner as you would for a socket. +// +////////////////////////////////////////////////////////////// + + +#ifndef DRIVENENGINE_HPP +#define DRIVENENGINE_HPP + +#include +#include +#include "streambuffer.hpp" +#include "util.hpp" + +class Channel { +private: + int chid_; + std::unique_ptr sb_in_; + std::unique_ptr sb_out_; + int port_; + bool remote_closed_; + std::string target_; + +public: + // Get the buffers associated with this channel. + // + StreamBuffer *out(); + StreamBuffer *in(); + const StreamBuffer *out() const; + const StreamBuffer *in() const; + + // If this is a socket connection, the receiver's port number. + // + int port() { return port_; } + + // If this is an outgoing socket connection, get the target host. + const std::string &target() { return target_; } + + // True if the remote closed the connection, or a failure occurred. + // + bool remote_closed() const { return remote_closed_; } + + // If communications were closed by the remote, there may + // be an error message. + // + std::string errmsg() const { return errmsg_; } +}; + +class DrivenEngine { +public: + // Constructor. + // + // Most initialization is achieved by 'drv_xxx' functions, so + // this constructor takes no arguments. + // + DrivenEngine(); + + // Destructor. + // + // It is necessary to delete all channels before deleting the + // DrivenEngine. The destructor will verify that this has been done. + // + ~DrivenEngine(); + + ////////////////////////////////////////////////////////////// + // + // The following methods are the 'engine' side of the pipe. + // + ////////////////////////////////////////////////////////////// + + // The initialization function. You should override this in a subclass. + // This will be called to initialize the logic engine, shortly after the lua + // source is loaded. + // + virtual void init() { } + + // The update function. You should override this in a subclass. This will + // be called to give the engine a chance to respond to new data. + // + virtual void update() {} + + // Get the current time. + // + // DRIVER: This returns the time most recently stored by the driver + // using drv_set_clock. + // + double get_clock(); + + // Create a channel and open an outgoing connection. + // + // DRIVER: The channel object is created instantly, but it does nothing + // until the driver notices the new channel. The driver is responsible for + // actually opening the connection and relaying data into the channel using + // drv_get_target, drv_peek_outgoing, drv_sent_outgoing, drv_recv_incoming. + // + std::unique_ptr new_outgoing_channel(const std::string &target) + + // Create a new channel from any pending incoming connection. If there is no + // incoming connection, returns nullptr. + // + // DRIVER: The driver must be hardwired to know what ports to listen on. + // When the driver notices a new incoming connection, it calls + // drv_notify_accept, which triggers the creation of the channel. The + // channel is put into the incoming channel queue, which is fetched by this + // method. The driver is responsible for relaying data into the channel + // using drv_get_target, drv_peek_outgoing, drv_sent_outgoing, + // drv_recv_incoming. + // + std::unique_ptr new_incoming_channel(); + + // Obtain the stdio channel. There is only one stdio channel. It is owned + // by the DrivenEngine. It is an error to delete the stdio channel. + // + // DRIVER: the stdio channel is created automatically when the DrivenEngine + // is created. The driver is responsible for relaying data into the channel + // using drv_get_target, drv_peek_outgoing, drv_sent_outgoing, + // drv_recv_incoming. + // + Channel *get_stdio_channel(); + + // Fetches the entire contents of the lua source directory. The keys in the + // map are filenames, and the values in the map are file contents. By + // default, the lua source is actually read from disk just once. If you call + // get_lua_source a second time, it will return a nullptr. The nullptr + // indicates that the source has not been refreshed recently. If you want + // to reread the source code, you must trigger the process by calling + // 'refresh_lua_source'. After some delay, it will again be possible to + // get_lua_source. + // + // DRIVER: the driver is responsible for storing the lua source into the + // DrivenEngine, once, at startup, using drv_set_source. The driver will + // periodically poll to see if the engine has called rescan_lua_source, + // using drv_get_rescan + // + std::unique_ptr get_lua_source(); + + // Rescan the lua source directory. The lua source directory is read once, + // automatically, at engine creation time. If you want to read it again, + // you must trigger a rescan. The rescan is not instantaneous. + // + // DRIVER: this merely sets a flag, which the driver will notice later, + // causing the driver to update the lua source. + // + void rescan_lua_source(); + + ////////////////////////////////////////////////////////////// + // + // The following methods are the 'driver' side of the pipe. + // + ////////////////////////////////////////////////////////////// + + // Run in logging mode. In this mode, the 'drv' methods will write records + // into the logfile, making it possible to replay the exact sequence of drv + // methods that were invoked. The replay log will be flushed and closed + // automatically in the event of a crash, or if it exceeds the maximum size. + // + void drv_logmode_write(const std::string &filename, int64_t maxsize); + + // Run in replay mode. In this mode, the only function the driver may call + // is 'drv_step_logfile.' Each time this method is called, a single drv_xxx + // method will be called, as requested by the logfile. + // + void drv_logmode_replay(const std::string &filename); + + // Run in non-logging mode. In this mode, no logfile will be read or written + // in this mode. + // + void drv_logmode_none(); + + // Invoke the engine's init function. The driver must call drv_set_lua_source + // before calling this. + // + void drv_invoke_engine_init(); + + // Invoke the engine's update function. This typically causes the engine + // to check the I/O buffers, and respond to I/O, if any. It also typically + // causes the engine to check the clock, and do any scheduled calculation. + // + void drv_invoke_engine_update(); + + // Get a list of all non-closed existing channels. This may include new + // channels that were created using 'new_outgoing_channel'. It may also be + // missing channels that were deleted. It is up to the driver to update its + // internal data structures to 'match' this list. Channel number zero is + // always present, it is the stdio channel. + // + void drv_list_channels(std::vector &channels); + + // Get the target of a channel. A target is a string like + // "www.whatever.com:80". It indicates the host and port that the channel + // is supposed to be talking to. Non-socket channels and incoming channels + // have empty targets. + // + std::string drv_get_target(int chid); + + // Get a pointer to the bytes in the outgoing buffer. The pointer returned + // here is naturally only valid until the buffer is changed. This function + // is used for all channels, including sockets and stdio. + // + void drv_peek_outgoing(int chid, int *nbytes, const char **bytes); + + // Notifies the channel that some bytes were transmitted. This causes those + // bytes to be removed from the outgoing buffer. This function is used for + // all channels, including sockets and stdio. + // + void drv_sent_outgoing(int chid, int nbytes); + + // Notifies the channel that some bytes were received. This causes those + // bytes to be appended to the incoming buffer. This function is used for + // all channels, including sockets and stdio. + // + void drv_recv_incoming(int chid, int nbytes, char *bytes); + + // Notify the channel that the connection was closed. This includes all + // sorts of closes, including friendly termination, all the way to network + // failure. Closing the channel doesn't delete it. The engine is + // responsible for noticing that the channel closed and the engine must + // delete it. Closing a channel prevents it from showing up in + // 'drv_list_channels'. + // + void drv_notify_close(int chid); + + // Notify the DrivenEngine that somebody connected to an incoming port. + // This will cause the DrivenEngine to allocate a new channel and put the + // new channel into the incoming channels queue. Returns the new channel + // ID. The new incoming channel appears in the 'drv_list_channels' list, + // even before the engine pops the channel from the incoming channels queue. + // + int drv_notify_accept(int port); + + // Set the clock. The driver is expected to periodically check the system + // clock and feed the value into the engine. + // + void drv_set_clock(double t); + + // Set the lua source code. The driver is expected to read the lua source + // code and store it (using this function) once before invoking + // + void drv_set_lua_source(const util::StringMap &source); + + // Check the 'rescan_lua_source' flag. If this flag is set, it means + // that the engine wants the driver to rescan the lua source code. + // When the driver sees this flag, it should rescan the source and call + // drv_set_source. + // + bool drv_get_rescan_lua_source(); + + // In replay mode, perform a single step of the logfile. Returns true + // if the logfile was not empty. + // + bool drv_step_logfile(); + +private: + Channel *stdio_channel_; + int next_channel_id_; + std::map channels_; + Channel *recent_channel_; + std::vector accepted_channels_; + bool rescan_lua_source_; + std::unique_ptr lua_source_; +}; + +#endif // DRIVENENGINE_HPP diff --git a/luprex/core/cpp/streambuffer.cpp b/luprex/core/cpp/streambuffer.cpp index a555820c..2dd7f067 100644 --- a/luprex/core/cpp/streambuffer.cpp +++ b/luprex/core/cpp/streambuffer.cpp @@ -45,6 +45,10 @@ int64_t StreamBuffer::fill() const { return write_cursor_ - read_cursor_; } +const char *StreamBuffer::data() const { + return read_cursor_; +} + bool StreamBuffer::layout_is(int64_t a, int64_t b, int64_t c) { if (read_cursor_ - buf_lo_ != a) return false; if (write_cursor_ - read_cursor_ != b) return false; diff --git a/luprex/core/cpp/streambuffer.hpp b/luprex/core/cpp/streambuffer.hpp index b564bdec..a34512e7 100644 --- a/luprex/core/cpp/streambuffer.hpp +++ b/luprex/core/cpp/streambuffer.hpp @@ -259,6 +259,9 @@ public: // Amount of data inside the buffer. int64_t fill() const; + // Get a pointer to the data. + const char *data() const; + // Discard all data. Reset total read and write counts. // Frees up as much space as possible. void clear(); diff --git a/luprex/core/cpp/util.hpp b/luprex/core/cpp/util.hpp index 17f2d687..b90fb498 100644 --- a/luprex/core/cpp/util.hpp +++ b/luprex/core/cpp/util.hpp @@ -21,6 +21,7 @@ enum WorldType { }; using StringVec = std::vector; +using StringMap = std::map; using HashValue = std::pair; using IdVector = std::vector;