DrivenEngine channels are now shared_ptr

This commit is contained in:
2022-01-05 12:50:16 -05:00
parent 6b23651606
commit c733023823
7 changed files with 103 additions and 87 deletions

View File

@@ -1,8 +1,7 @@
#include "drivenengine.hpp"
Channel::Channel(DrivenEngine *de, int chid, int port, const std::string &target) {
driven_ = de;
Channel::Channel(DrivenEngine *de, int chid, int port, const std::string &target, bool stop) {
chid_ = chid;
sb_in_.reset(new StreamBuffer);
sb_out_.reset(new StreamBuffer);
@@ -12,20 +11,12 @@ Channel::Channel(DrivenEngine *de, int chid, int port, const std::string &target
readline_enabled_ = (chid == 0);
readline_lastc_ = 0;
desired_prompt_ = "";
assert(driven_->channels_[chid_] == nullptr);
driven_->channels_[chid_] = this;
}
Channel::~Channel() {
assert(driven_->channels_[chid_] == this);
driven_->new_closed_.insert(chid_);
driven_->new_outgoing_.erase(chid_);
driven_->channels_[chid_] = nullptr;
stop_driver_ = stop;
}
void Channel::show_or_hide_command(bool ignore_sb_out) {
bool sb_out_empty = (sb_out_->fill() == 0) || ignore_sb_out;
if (!sb_out_empty || (!readline_enabled_) || (driven_->stop_driver_)) {
if (!sb_out_empty || (!readline_enabled_) || (stop_driver_)) {
int ccsize = current_prompt_.size() + current_command_.size();
readline_echo_ += util::repeat_string("\b \b", ccsize);
current_prompt_ = "";
@@ -140,8 +131,8 @@ int DrivenEngine::find_unused_chid() {
Channel *DrivenEngine::get_chid(int chid) {
assert(unsigned(chid) < MAX_CHAN);
assert(channels_[chid] != nullptr);
return channels_[chid];
assert(channels_[chid].get() != nullptr);
return channels_[chid].get();
}
void DrivenEngine::listen_port(int port) {
@@ -152,24 +143,26 @@ double DrivenEngine::get_clock() {
return clock_;
}
UniqueChannel DrivenEngine::new_outgoing_channel(const std::string &target) {
SharedChannel DrivenEngine::new_outgoing_channel(const std::string &target) {
int chid = find_unused_chid();
new_outgoing_.insert(chid);
return UniqueChannel(new Channel(this, chid, 0, target));
SharedChannel result = std::make_shared<Channel>(this, chid, 0, target, stop_driver_);
channels_[chid] = result;
return result;
}
UniqueChannel DrivenEngine::new_incoming_channel() {
SharedChannel DrivenEngine::new_incoming_channel() {
if (accepted_channels_.empty()) {
return nullptr;
} else {
UniqueChannel result = std::move(accepted_channels_.back());
SharedChannel result = std::move(accepted_channels_.back());
accepted_channels_.pop_back();
return result;
}
}
Channel *DrivenEngine::get_stdio_channel() {
return stdio_channel_.get();
SharedChannel DrivenEngine::get_stdio_channel() {
return stdio_channel_;
}
util::LuaSourcePtr DrivenEngine::get_lua_source() {
@@ -182,17 +175,17 @@ void DrivenEngine::rescan_lua_source() {
void DrivenEngine::stop_driver() {
stop_driver_ = true;
for (int i = 0; i < MAX_CHAN; i++) {
if (channels_[i] != nullptr) {
channels_[i]->stop_driver_ = true;
}
}
}
void DrivenEngine::drv_get_listen_ports(std::set<int> &ports) {
ports = listen_ports_;
}
void DrivenEngine::drv_get_new_closed(std::set<int> &channels) {
channels = std::move(new_closed_);
new_closed_.clear();
}
void DrivenEngine::drv_get_new_outgoing(std::set<int> &channels) {
channels = std::move(new_outgoing_);
new_outgoing_.clear();
@@ -206,6 +199,10 @@ bool DrivenEngine::drv_outgoing_empty(int chid) {
return get_chid(chid)->sb_out_->empty();
}
bool DrivenEngine::drv_get_channel_released(int chid) {
return channels_[chid].use_count() == 1;
}
void DrivenEngine::drv_peek_outgoing(int chid, int *nbytes, const char **bytes) {
return get_chid(chid)->peek_outgoing(nbytes, bytes);
}
@@ -229,11 +226,13 @@ void DrivenEngine::drv_notify_close(int chid, const std::string &err) {
Channel *ch = get_chid(chid);
ch->closed_ = true;
ch->error_ = err;
channels_[chid].reset();
}
int DrivenEngine::drv_notify_accept(int port) {
int chid = find_unused_chid();
accepted_channels_.emplace_back(new Channel(this, chid, port, ""));
channels_[chid] = std::make_shared<Channel>(this, chid, port, "", stop_driver_);
accepted_channels_.push_back(channels_[chid]);
return chid;
}
@@ -263,25 +262,15 @@ bool DrivenEngine::drv_get_stop_driver() {
}
DrivenEngine::DrivenEngine() {
for (int i = 0; i < MAX_CHAN; i++) {
channels_[i] = nullptr;
}
next_unused_chid_ = 1;
stdio_channel_.reset(new Channel(this, 0, 0, ""));
stdio_channel_ = std::make_shared<Channel>(this, 0, 0, "", false);
channels_[0] = stdio_channel_;
rescan_lua_source_ = true;
clock_ = 0.0;
stop_driver_ = false;
}
DrivenEngine::~DrivenEngine() {
// Delete the channels that we own.
stdio_channel_.reset();
accepted_channels_.clear();
// At this point, all channels should be gone.
for (int i = 0; i < MAX_CHAN; i++) {
assert(channels_[i] == nullptr);
}
}
DrivenEngine::~DrivenEngine() {}
static DrivenEngine *engine_;

View File

@@ -148,16 +148,18 @@ public:
//
void set_prompt(const std::string &prompt);
// You may delete any channel except for stdio. This closes
// the channel.
// Do not construct your own Channels. Instead,
// use methods of class DrivenEngine like new_outgoing_channel.
// Channels are referenced by shared_ptr. You can
// release your shared_ptr at any time.
//
~Channel();
Channel(DrivenEngine *de, int chid, int port, const std::string &target, bool stop);
~Channel() {};
private:
// Constructor is deliberately private. Use
// DrivenEngine::new_outgoing_channel to create outgoing socket channels.
//
Channel(DrivenEngine *de, int chid, int port, const std::string &target);
void feed_readline(int nbytes, const char *bytes);
void peek_outgoing(int *nbytes, const char **bytes);
@@ -166,7 +168,6 @@ private:
private:
static const int READLINE_MAX=512;
DrivenEngine *driven_;
int chid_;
std::unique_ptr<StreamBuffer> sb_in_;
std::unique_ptr<StreamBuffer> sb_out_;
@@ -174,6 +175,7 @@ private:
bool closed_;
std::string error_;
std::string target_;
bool stop_driver_;
// Readline stuff.
std::string desired_command_;
@@ -187,7 +189,7 @@ private:
friend class DrivenEngine;
};
using UniqueChannel = std::unique_ptr<Channel>;
using SharedChannel = std::shared_ptr<Channel>;
class DrivenEngine {
public:
@@ -229,7 +231,7 @@ public:
// actually opening the connection and relaying data into the channel using
// drv_get_target, drv_peek_outgoing, drv_sent_outgoing, drv_recv_incoming.
//
UniqueChannel new_outgoing_channel(const std::string &target);
SharedChannel new_outgoing_channel(const std::string &target);
// Create a new channel from any pending incoming connection. If there is no
// incoming connection, returns nullptr.
@@ -242,17 +244,16 @@ public:
// using drv_get_target, drv_peek_outgoing, drv_sent_outgoing,
// drv_recv_incoming.
//
UniqueChannel new_incoming_channel();
SharedChannel new_incoming_channel();
// Obtain the stdio channel. There is only one stdio channel. It is owned
// by the DrivenEngine. It is an error to delete the stdio channel.
// Obtain the stdio channel. There is only one stdio channel.
//
// DRIVER: the stdio channel is created automatically when the DrivenEngine
// is created. The driver is responsible for relaying data into the channel
// using drv_get_target, drv_peek_outgoing, drv_sent_outgoing,
// drv_recv_incoming.
//
Channel *get_stdio_channel();
SharedChannel get_stdio_channel();
// Obtain the output buffer of the stdio channel as an ostream.
//
@@ -292,13 +293,6 @@ public:
//
void drv_get_listen_ports(std::set<int> &ports);
// Get a list of all recently-closed channels. The driver should
// discard all socket information associated with these channels.
// Caution: this may contain channels that the driver has never
// heard of. In that case, just ignore the close-request.
//
void drv_get_new_closed(std::set<int> &opened);
// Get a list of all recently-opened channels that were created using
// drv_new_outgoing_channel. The driver should initiate outgoing
// connections for these channels.
@@ -316,6 +310,12 @@ public:
//
bool drv_outgoing_empty(int chid);
// Return true if the user has released all references to this channel.
// In this case, the driver should initiate shutdown of the channel,
// and the driver should eventually call drv_notify_close.
//
bool drv_get_channel_released(int chid);
// Get a pointer to the bytes in the outgoing buffer. The pointer returned
// here is naturally only valid until the buffer is changed. This function
// is used for all channels, including sockets and stdio.
@@ -415,11 +415,10 @@ private:
Channel *get_chid(int chid);
private:
Channel *channels_[MAX_CHAN];
SharedChannel channels_[MAX_CHAN];
int next_unused_chid_;
UniqueChannel stdio_channel_;
std::vector<UniqueChannel> accepted_channels_;
std::set<int> new_closed_;
SharedChannel stdio_channel_;
std::vector<SharedChannel> accepted_channels_;
std::set<int> new_outgoing_;
util::LuaSourcePtr lua_source_;
std::set<int> listen_ports_;

View File

@@ -169,14 +169,15 @@ public:
}
}
void handle_new_closed_sockets() {
std::set<int> chans;
driven_->drv_get_new_closed(chans);
for (int chid : chans) {
if (socket_[chid] != INVALID_SOCKET) {
assert(close(socket_[chid]) == 0);
socket_[chid] = INVALID_SOCKET;
connected_[chid] = false;
void handle_new_closed_sockets() {
for (int chid = 1; chid < MAX_CHAN; chid++) {
if (driven_->drv_get_channel_released(chid)) {
if (socket_[chid] != INVALID_SOCKET) {
assert(close(socket_[chid]) == 0);
socket_[chid] = INVALID_SOCKET;
connected_[chid] = false;
}
driven_->drv_notify_close(chid, "");
}
}
}

View File

@@ -12,6 +12,33 @@
class monotonic_clock {
public:
double freq_;
monotonic_clock() {
LARGE_INTEGER x;
BOOL status = QueryPerformanceFrequency(&x);
assert(status != 0);
freq_ = 1.0 / double(x.QuadPart);
}
double get() {
LARGE_INTEGER x;
BOOL status = QueryPerformanceCounter(&x);
assert(status != 0);
return double(x.QuadPart) * freq_;
}
);
#endif
static monotonic_clock monoclock;
namespace util {
double profiling_clock() {
return monoclock.get();
}
}
class Driver {
public:

View File

@@ -23,23 +23,23 @@ static void dump_lines(StreamBuffer *in, StreamBuffer *out, int chid) {
// You can type lines and see them echoed.
class DriverListenTest : public DrivenEngine {
public:
std::vector<UniqueChannel> channels_;
std::vector<SharedChannel> channels_;
virtual void event_init(int argc, char *argv[]) {
listen_port(8085);
}
virtual void event_update() {
while (true) {
UniqueChannel ch = new_incoming_channel();
SharedChannel ch = new_incoming_channel();
if (ch == nullptr) break;
ch->set_readline(true);
channels_.emplace_back(std::move(ch));
}
Channel *stdioch = get_stdio_channel();
SharedChannel stdioch = get_stdio_channel();
dump_lines(stdioch->in(), stdioch->out(), 0);
std::vector<UniqueChannel> keep;
for (UniqueChannel &ch : channels_) {
std::vector<SharedChannel> keep;
for (SharedChannel &ch : channels_) {
dump_lines(ch->in(), stdioch->out(), ch->chid());
if (ch->closed()) {
write_closed_message(ch.get(), stdioch->out());
@@ -55,18 +55,18 @@ public:
// the output from the server.
class DriverWebServerTest : public DrivenEngine {
public:
std::vector<UniqueChannel> channels_;
std::vector<SharedChannel> channels_;
virtual void event_init(int argc, char *argv[]) {
UniqueChannel ch = new_outgoing_channel("stanford.edu:80");
SharedChannel ch = new_outgoing_channel("stanford.edu:80");
ch->out()->write_bytes("GET http://stanford.edu/index.html HTTP/1.1\n\n");
channels_.emplace_back(std::move(ch));
}
virtual void event_update() {
Channel *stdioch = get_stdio_channel();
SharedChannel stdioch = get_stdio_channel();
dump_lines(stdioch->in(), stdioch->out(), 0);
std::vector<UniqueChannel> keep;
for (UniqueChannel &ch : channels_) {
std::vector<SharedChannel> keep;
for (SharedChannel &ch : channels_) {
dump_lines(ch->in(), stdioch->out(), ch->chid());
if (ch->closed()) {
write_closed_message(ch.get(), stdioch->out());
@@ -81,18 +81,18 @@ public:
// This test produces a DNS resolution failure.
class DriverDNSFailTest : public DrivenEngine {
public:
std::vector<UniqueChannel> channels_;
std::vector<SharedChannel> channels_;
virtual void event_init(int argc, char *argv[]) {
UniqueChannel ch = new_outgoing_channel("akjsdkajshdakjshd.alk:80");
SharedChannel ch = new_outgoing_channel("akjsdkajshdakjshd.alk:80");
ch->out()->write_bytes("GET http://stanford.edu/index.html HTTP/1.1\n\n");
channels_.emplace_back(std::move(ch));
}
virtual void event_update() {
Channel *stdioch = get_stdio_channel();
SharedChannel stdioch = get_stdio_channel();
dump_lines(stdioch->in(), stdioch->out(), 0);
std::vector<UniqueChannel> keep;
for (UniqueChannel &ch : channels_) {
std::vector<SharedChannel> keep;
for (SharedChannel &ch : channels_) {
dump_lines(ch->in(), stdioch->out(), ch->chid());
if (ch->closed()) {
write_closed_message(ch.get(), stdioch->out());

View File

@@ -14,7 +14,7 @@ public:
UniqueWorld world_;
int64_t actor_id_;
InvocationQueue unack_;
UniqueChannel channel_;
SharedChannel channel_;
LuaConsole console_;
PrintChanneler print_channeler_;
Gui gui_;

View File

@@ -10,7 +10,7 @@
class Client {
public:
int64_t actor_id_;
UniqueChannel channel_;
SharedChannel channel_;
UniqueWorld sync_;
};
using UniqueClient = std::unique_ptr<Client>;
@@ -151,7 +151,7 @@ public:
// Check for new incoming channels, set up client structures.
while (true) {
UniqueChannel chan = new_incoming_channel();
SharedChannel chan = new_incoming_channel();
if (chan == nullptr) break;
Client *client = new Client;
client->actor_id_ = master_->create_login_actor();