Files
integration/luprex/core/cpp/drivenengine.hpp

409 lines
16 KiB
C++

//////////////////////////////////////////////////////////////
//
// DrivenEngine
//
// This module achieves two goals:
//
// * Makes it possible to do replay logging.
// * Makes the engine act like a deterministic state machine.
//
// We want to implement replay logging. That means we want to be able to run
// the engine, keeping a log, then "replay" that execution later.
//
// Unfortunately, I/O makes this difficult. Suppose that during the execution
// of the server, the server reads a lua source file. Later, we want to replay
// that execution, but the lua source file has changed on disk. When the engine
// reads the lua source file, we need it to "read" the data that *used* to be
// there, not the data that's currently there.
//
// To make it work, all I/O has to operate in one of two modes: either it can be
// real I/O, or it can be simulated I/O that retrieves data from the logfile.
//
// This module, DrivenEngine, provides an API for the engine to do all its I/O.
// It provides a variety of methods to open sockets, read and write sockets,
// read lua source, get the clock, and so forth.
//
// But in reality, these I/O functions don't ever call operating system
// functions like "read" or "write" or "connect." They don't call the operating
// system at all - not even indirectly, through a wrapper. Therefore, they
// can't really do any I/O. When you use one of these I/O functions to (say)
// write some data to a communication channel, the only thing that happens is
// that the data is put into a buffer. The actual transmission of the data
// happens elsewhere, in what is called the "Driver." Likewise, when you use
// one of these I/O functions to read data, it only returns data that was
// previously stored by the "Driver."
//
// The "Driver" is a module that implements the actual I/O. It is highly
// OS-dependent code, because it contains code to manipulate sockets, time
// clocks, and the like.
//
// From the perspective of the driver, the DrivenEngine is a C++ object that
// acts like a state machine. This state machine is driven forward by I/O
// events. The DrivenEngine provides an API where the driver can feed in these
// I/O events.
//
// Notice that the usual call graph is inverted: in most application programs,
// the application calls the operating system to do I/O. But when using class
// DrivenEngine, it's the other way around: the driver calls into class
// DrivenEngine to drive it forward. I/O routines drive computation.
//
// So the upshot of all this is that the DrivenEngine is a deterministic state
// machine, free of all OS-specific code.
//
//////////////////////////////////////////////////////////////
//
// Here are the rules for what the driver must do:
//
// * Before doing anything else, the driver must select one of the three
// logmodes.
//
// * If 'logmode_replay' is selected, then the driver must proceed to invoke
// 'drv_step_logfile' over and over until it returns false. In replay mode,
// the driver should not do anything else.
//
// * If 'logmode_write' or 'logmode_none' is selected, the driver must proceed
// to drive the application. Follow the remainder of these steps.
//
// * Read the lua source from disk, and call 'drv_set_lua_source'.
//
// * Invoke the DrivenEngine's init callback by calling
// 'drv_invoke_engine_init'.
//
// * Open a hardwired list of ports for listening.
//
// * Repeat the following steps over and over:
//
// - If the engine asked that the lua source be refreshed, read the source
// from disk and call 'drv_set_lua_source'.
//
// - List all existing channels using drv_list_channels.
//
// - If there are any new channels in the channel list, use
// drv_get_channel_target to fetch the target host, then create a socket
// and open the connection. Associate the socket with the channel.
//
// - If the channel list is missing a previously-known channel, then that
// channel was deleted by the engine. Clean up the channel's data and
// close the socket, if any.
//
// - Do an OS 'poll'. The poll should include the sockets for all channels
// in the channel list, all listening ports, and stdio.
//
// - If the poll indicates that a listening port has acceptable
// connections, accept and call drv_notify_accept. Associate the
// accepted socket with the channel.
//
// - If the poll indicates that a connection can accept outgoing data, use
// drv_peek_outgoing to fetch some data to write, and write it. Use
// drv_sent_outgoing_bytes to indicate that the data was sent.
//
// - If the poll indicates that a connection has incoming data, read the
// data then push it into the channel using drv_recv_incoming.
//
// - If the poll indicates that STDIO can be read/written, use
// drv_peek_outgoing, drv_sent_outgoing, and drv_recv_incoming in the
// same manner as you would for a socket.
//
// - Use 'drv_invoke_engine_update' to invoke the engine's update callback.
//
//////////////////////////////////////////////////////////////
#ifndef DRIVENENGINE_HPP
#define DRIVENENGINE_HPP
#include <memory>
#include <string>
#include <vector>
#include <map>
#include "streambuffer.hpp"
#include "util.hpp"
class DrivenEngine;
class Channel {
public:
// Get the buffers associated with this channel.
//
StreamBuffer *out() { return sb_out_.get(); }
StreamBuffer *in() { return sb_in_.get(); }
// If this is a socket connection, the receiver's port number.
//
int port() { return port_; }
// If this is an outgoing socket connection, get the target host.
const std::string &target() { return target_; }
// True if the remote closed the connection, or a failure occurred.
//
bool closed() const { return closed_; }
// You may delete any channel except for stdio. This closes
// the channel.
//
~Channel();
private:
// Constructor is deliberately private. Use
// DrivenEngine::new_outgoing_channel to create outgoing socket channels.
//
Channel(DrivenEngine *de, int chid, int port, const std::string &target);
private:
DrivenEngine *driven_;
int chid_;
std::unique_ptr<StreamBuffer> sb_in_;
std::unique_ptr<StreamBuffer> sb_out_;
int port_;
bool closed_;
std::string target_;
friend class DrivenEngine;
};
class DrivenEngine {
public:
//////////////////////////////////////////////////////////////
//
// The following methods are the 'engine' side of the pipe.
//
//////////////////////////////////////////////////////////////
// The initialization function. You should override this in a subclass.
// This will be called to initialize the logic engine, shortly after the lua
// source is loaded.
//
virtual void init() {}
// The update function. You should override this in a subclass. This will
// be called to give the engine a chance to respond to new data.
//
virtual void update() {}
// Get the current time.
//
// DRIVER: This returns the time most recently stored by the driver
// using drv_set_clock.
//
double get_clock();
// Create a channel and open an outgoing connection.
//
// DRIVER: The channel object is created instantly, but it does nothing
// until the driver notices the new channel. The driver is responsible for
// actually opening the connection and relaying data into the channel using
// drv_get_target, drv_peek_outgoing, drv_sent_outgoing, drv_recv_incoming.
//
std::unique_ptr<Channel> new_outgoing_channel(const std::string &target);
// Create a new channel from any pending incoming connection. If there is no
// incoming connection, returns nullptr.
//
// DRIVER: The driver must be hardwired to know what ports to listen on.
// When the driver notices a new incoming connection, it calls
// drv_notify_accept, which triggers the creation of the channel. The
// channel is put into the incoming channel queue, which is fetched by this
// method. The driver is responsible for relaying data into the channel
// using drv_get_target, drv_peek_outgoing, drv_sent_outgoing,
// drv_recv_incoming.
//
std::unique_ptr<Channel> new_incoming_channel();
// Obtain the stdio channel. There is only one stdio channel. It is owned
// by the DrivenEngine. It is an error to delete the stdio channel.
//
// DRIVER: the stdio channel is created automatically when the DrivenEngine
// is created. The driver is responsible for relaying data into the channel
// using drv_get_target, drv_peek_outgoing, drv_sent_outgoing,
// drv_recv_incoming.
//
Channel *get_stdio_channel();
// Fetches the entire contents of the lua source directory. The keys in the
// map are filenames, and the values in the map are file contents. By
// default, the lua source is actually read from disk just once. If you call
// get_lua_source a second time, it will return a nullptr. The nullptr
// indicates that the source has not been refreshed recently. If you want
// to reread the source code, you must trigger the process by calling
// 'refresh_lua_source'. After some delay, it will again be possible to
// get_lua_source.
//
// DRIVER: the driver is responsible for storing the lua source into the
// DrivenEngine, once, at startup, using drv_set_source. The driver will
// periodically poll to see if the engine has called rescan_lua_source,
// using drv_get_rescan
//
std::unique_ptr<util::LuaSource> get_lua_source();
// Rescan the lua source directory. The lua source directory is read once,
// automatically, at engine creation time. If you want to read it again,
// you must trigger a rescan. The rescan is not instantaneous.
//
// DRIVER: this merely sets a flag, which the driver will notice later,
// causing the driver to update the lua source.
//
void rescan_lua_source();
// Stop the driver. The engine should call this when it's done
// and there's nothing left to do.
//
void stop_driver();
//////////////////////////////////////////////////////////////
//
// The following methods are the 'driver' side of the pipe.
//
//////////////////////////////////////////////////////////////
// Run in logging mode. In this mode, the 'drv' methods will write records
// into the logfile, making it possible to replay the exact sequence of drv
// methods that were invoked. The replay log will be flushed and closed
// automatically in the event of a crash, or if it exceeds the maximum size.
//
void drv_logmode_write(const std::string &filename, int64_t maxsize);
// Run in replay mode. In this mode, the only function the driver may call
// is 'drv_step_logfile.' Each time this method is called, a single drv_xxx
// method will be called, as requested by the logfile.
//
void drv_logmode_replay(const std::string &filename);
// Run in non-logging mode. In this mode, no logfile will be read or written
// in this mode.
//
void drv_logmode_none();
// Invoke the engine's init function. The driver must call drv_set_lua_source
// before calling this.
//
void drv_invoke_engine_init();
// Invoke the engine's update function. This typically causes the engine
// to check the I/O buffers, and respond to I/O, if any. It also typically
// causes the engine to check the clock, and do any scheduled calculation.
//
void drv_invoke_engine_update();
// Get a list of all non-closed existing channels. This may include new
// channels that were created using 'new_outgoing_channel'. It may also be
// missing channels that were deleted. It is up to the driver to update its
// internal data structures to 'match' this list. Channel number zero is
// always present, it is the stdio channel.
//
void drv_list_channels(std::vector<int> &channels);
// Get the target of a channel. A target is a string like
// "www.whatever.com:80". It indicates the host and port that the channel
// is supposed to be talking to. Non-socket channels and incoming channels
// have empty targets.
//
const std::string &drv_get_target(int chid);
// Get a pointer to the bytes in the outgoing buffer. The pointer returned
// here is naturally only valid until the buffer is changed. This function
// is used for all channels, including sockets and stdio.
//
void drv_peek_outgoing(int chid, int *nbytes, const char **bytes);
// Notifies the channel that some bytes were transmitted. This causes those
// bytes to be removed from the outgoing buffer. This function is used for
// all channels, including sockets and stdio.
//
void drv_sent_outgoing(int chid, int nbytes);
// Notifies the channel that some bytes were received. This causes those
// bytes to be appended to the incoming buffer. This function is used for
// all channels, including sockets and stdio.
//
void drv_recv_incoming(int chid, int nbytes, char *bytes);
// Notify the channel that the connection was closed. This includes all
// sorts of closes, including friendly termination, all the way to network
// failure. Closing the channel doesn't delete it. The engine is
// responsible for noticing that the channel closed and the engine must
// delete it. Closing a channel prevents it from showing up in
// 'drv_list_channels'.
//
void drv_notify_close(int chid);
// Notify the DrivenEngine that somebody connected to an incoming port.
// This will cause the DrivenEngine to allocate a new channel and put the
// new channel into the incoming channels queue. Returns the new channel
// ID. The new incoming channel appears in the 'drv_list_channels' list,
// even before the engine pops the channel from the incoming channels queue.
//
int drv_notify_accept(int port);
// Set the clock. The driver is expected to periodically check the system
// clock and feed the value into the engine.
//
void drv_set_clock(double t);
// Set the lua source code. The driver is expected to read the lua source
// code and store it (using this function) once before invoking
//
void drv_set_lua_source(const util::LuaSource &source);
// Check the 'rescan_lua_source' flag. If this flag is set, it means
// that the engine wants the driver to rescan the lua source code.
// When the driver sees this flag, it should rescan the source and call
// drv_set_source.
//
bool drv_get_rescan_lua_source();
// If true, the engine is done. Stop the driver.
//
bool drv_get_stop_driver();
// In replay mode, perform a single step of the logfile. Returns true
// if the logfile was not empty.
//
bool drv_step_logfile();
//////////////////////////////////////////////////////////////
//
// Creation and Destruction.
//
//////////////////////////////////////////////////////////////
// Constructor.
//
// Most initialization is achieved by 'drv_xxx' functions, so
// this constructor takes no arguments.
//
DrivenEngine();
// Destructor.
//
// It is necessary to delete all channels before deleting the
// DrivenEngine. The destructor will verify that this has been done.
//
virtual ~DrivenEngine();
// Set/Get Global Pointer.
//
// Normally, there is a single global "DrivenEngine" instance.
// We provide a global pointer to store this instance. This is
// a raw pointer, you must manually delete the DrivenEngine.
//
static void set(DrivenEngine *de);
static DrivenEngine *get();
private:
// Get a channel by channel ID.
Channel *get_chid(int chid);
private:
std::unique_ptr<Channel> stdio_channel_;
int next_channel_id_;
std::map<int, Channel *> channels_;
Channel *recent_channel_;
std::vector<std::unique_ptr<Channel>> accepted_channels_;
bool rescan_lua_source_;
std::unique_ptr<util::LuaSource> lua_source_;
double clock_;
bool stop_driver_;
friend class Channel;
};
#endif // DRIVENENGINE_HPP