Implemented class StreamBuffer, which I'm quite satisfied with.

This commit is contained in:
2021-03-05 14:20:21 -05:00
parent 9d86347d54
commit c742bc5645
9 changed files with 776 additions and 215 deletions

View File

@@ -76,7 +76,7 @@ private:
int64_t next_batch_;
int64_t next_id_;
int queue_fill_;
friend int cunittests_idalloc(lua_State *L);
friend int unittests_idalloc(lua_State *L);
public:
// Construct and destroy global pools. Note that after constructing

View File

@@ -26,7 +26,7 @@ LuaSnap::~LuaSnap() {
std::cerr << "LuaSnap destructor not implemented yet" << std::endl;
}
void LuaSnap::serialize(Packer *pk) {
void LuaSnap::serialize(StreamBuffer *sb) {
// Lua stack should be empty.
assert(lua_gettop(state_) == 0);
@@ -63,25 +63,26 @@ void LuaSnap::serialize(Packer *pk) {
assert(lua_gettop(state_) == 2);
// Write dummy length, use eris to write data, then overwrite length.
pk->write_int64(0);
size_t tell = pk->tellp();
eris_dump(state_, pk->lua_writer, pk->lua_writer_ud());
pk->overwrite_int64(tell, pk->tellp() - tell);
sb->write_int64(0);
int64_t pos1 = sb->write_count();
eris_dump(state_, sb->lua_writer, sb->lua_writer_ud());
int64_t pos2 = sb->write_count();
sb->overwrite_int64(pos1, pos2 - pos1);
lua_settop(state_, 0);
std::cerr << "Eris dump is " << (pk->tellp() - tell) << " bytes." << std::endl;
std::cerr << "Eris dump is " << pos2-pos1 << " bytes." << std::endl;
}
void LuaSnap::deserialize(Unpacker *unpk) {
void LuaSnap::deserialize(StreamBuffer *sb) {
// Lua stack should be empty.
assert(lua_gettop(state_) == 0);
// Get a reader subsection containing the eris data.
size_t len = unpk->read_int64();
Unpacker subsec = unpk->read_section(len);
// Get the length of the eris dump.
int64_t len = sb->read_int64();
void *ud = sb->lua_reader_ud(len);
// Call eris with the permanents table and passing the snapshot as a lua_Reader.
lua_getfield(state_, LUA_REGISTRYINDEX, "unpersist");
eris_undump(state_, subsec.lua_reader, subsec.lua_reader_ud());
eris_undump(state_, sb->lua_reader, ud);
assert(lua_gettop(state_) == 2);
// Set up a stack frame.
@@ -122,21 +123,17 @@ void LuaSnap::deserialize(Unpacker *unpk) {
// require us to maintain any additional code.
bool LuaSnap::have_snapshot() const {
return !snapshot_.empty();
return snapshot_.write_count() != 0;
}
void LuaSnap::snapshot() {
assert(snapshot_.empty());
std::ostringstream oss;
Packer pk(&oss);
serialize(&pk);
snapshot_ = oss.str();
assert(snapshot_.write_count() == 0);
serialize(&snapshot_);
}
void LuaSnap::rollback() {
assert(!snapshot_.empty());
Unpacker unpk(snapshot_.c_str(), snapshot_.size());
deserialize(&unpk);
assert(snapshot_.write_count() != 0);
deserialize(&snapshot_);
snapshot_.clear();
}

View File

@@ -14,13 +14,13 @@
#ifndef LUASNAP_HPP
#define LUASNAP_HPP
#include "packer.hpp"
#include "StreamBuffer.hpp"
#include "luastack.hpp"
class LuaSnap {
private:
lua_State *state_;
std::string snapshot_;
StreamBuffer snapshot_;
public:
LuaSnap();
@@ -32,11 +32,11 @@ public:
// Serialize the state of the lua interpreter.
//
void serialize(Packer *pk);
void serialize(StreamBuffer *sb);
// Restore the the lua interpreter given a serialized state.
//
void deserialize(Unpacker *unpk);
void deserialize(StreamBuffer *sb);
// Return true if there's a saved snapshot.
//
@@ -53,7 +53,6 @@ public:
// If there is no snapshot, this panics.
//
void rollback();
};

View File

@@ -1,115 +0,0 @@
#include <packer.hpp>
#include <cassert>
void Packer::write_int8(int8_t v) {
oss_->write((const char *)&v, sizeof(v));
}
void Packer::write_int16(int16_t v) {
oss_->write((const char *)&v, sizeof(v));
}
void Packer::write_int32(int32_t v) {
oss_->write((const char *)&v, sizeof(v));
}
void Packer::write_int64(int64_t v) {
oss_->write((const char *)&v, sizeof(v));
}
void Packer::overwrite_int8(size_t tell_after, int8_t v) {
oss_->seekp(tell_after - 1);
write_int8(v);
oss_->seekp(0, std::ios_base::end);
}
void Packer::overwrite_int16(size_t tell_after, int16_t v) {
oss_->seekp(tell_after - 2);
write_int16(v);
oss_->seekp(0, std::ios_base::end);
}
void Packer::overwrite_int32(size_t tell_after, int32_t v) {
oss_->seekp(tell_after - 4);
write_int32(v);
oss_->seekp(0, std::ios_base::end);
}
void Packer::overwrite_int64(size_t tell_after, int64_t v) {
oss_->seekp(tell_after - 8);
write_int64(v);
oss_->seekp(0, std::ios_base::end);
}
int Packer::lua_writer(lua_State *L, const void *p, size_t sz, void *ud) {
Packer *pkr = (Packer *)ud;
pkr->oss_->write((const char *)p, sz);
return 0;
}
Unpacker::Unpacker(const char *d, size_t s) {
assert(d != nullptr);
assert(s >= 0);
data_ = d;
size_ = s;
}
int8_t Unpacker::read_int8() {
assert(size_ >= 1);
int8_t result;
memcpy(&result, data_, 1);
data_ += 1;
size_ -= 1;
return result;
}
int16_t Unpacker::read_int16() {
assert(size_ >= 2);
int16_t result;
memcpy(&result, data_, 2);
data_ += 2;
size_ -= 2;
return result;
}
int32_t Unpacker::read_int32() {
assert(size_ >= 4);
int32_t result;
memcpy(&result, data_, 4);
data_ += 4;
size_ -= 4;
return result;
}
int64_t Unpacker::read_int64() {
assert(size_ >= 8);
int64_t result;
memcpy(&result, data_, 8);
data_ += 8;
size_ -= 8;
return result;
}
void Unpacker::skip(size_t n) {
assert(size_ >= n);
data_ += n;
size_ -= n;
}
Unpacker Unpacker::read_section(size_t n) {
assert(size_ >= n);
const char *d = data_;
data_ += n;
size_ -= n;
return Unpacker(d, n);
}
const char *Unpacker::lua_reader(lua_State *L, void *ud, size_t *size) {
Unpacker *unpk = (Unpacker *)ud;
const char *retval = unpk->data_;
*size = unpk->size_;
if (unpk->size_ == 0) retval = nullptr;
unpk->data_ += unpk->size_;
unpk->size_ = 0;
return retval;
}

View File

@@ -1,72 +0,0 @@
#ifndef PACKER_HPP
#define PACKER_HPP
#include "luastack.hpp"
#include <cstdint>
#include <string>
#include <sstream>
#include <cassert>
class Packer {
private:
std::ostringstream *oss_;
// Packer is not copyable.
Packer(const Packer &other) { assert(false); }
public:
using fixhandle = size_t;
Packer(std::ostringstream *s) : oss_(s) {}
// Get the current file position.
size_t tellp() { return oss_->tellp(); }
void write_int8(int8_t v);
void write_int16(int16_t v);
void write_int32(int32_t v);
void write_int64(int64_t v);
// Sometimes, you want to write a length followed by a block of data.
// Sometimes, you don't know the length until after you've written the block
// of data. To help with this situation, we provide the overwrite_int
// functions. Here's what you do:
//
// 1. write a dummy length into the stream.
// 2. use 'tellp' to fetch the stream position *after* the dummy length.
// 3. write the block of data.
// 4. use an overwrite function below to overwrite the dummy length.
//
void overwrite_int8(size_t tell_after, int8_t value);
void overwrite_int16(size_t tell_after, int16_t value);
void overwrite_int32(size_t tell_after, int32_t value);
void overwrite_int64(size_t tell_after, int64_t value);
// The packer can be used as a lua_Writer.
static int lua_writer(lua_State *L, const void *p, size_t sz, void *ud);
void *lua_writer_ud() { return this; }
};
class Unpacker {
private:
const char *data_;
size_t size_;
public:
Unpacker(const char *d, size_t s);
const char *data() { return data_; }
size_t size() { return size_; }
int8_t read_int8();
int16_t read_int16();
int32_t read_int32();
int64_t read_int64();
void skip(size_t n);
Unpacker read_section(size_t n);
// The unpacker can be used as a lua_Reader
static const char *lua_reader(lua_State *L, void *ud, size_t *size);
void *lua_reader_ud() { return this; }
};
#endif // PACKER_HPP

View File

@@ -150,7 +150,7 @@ public:
// run_unittests
//
// Run all the unit tests. Print any errors to console. If there
// Run all the lua unit tests. Print any errors to console. If there
// are any errors, exits the program.
//
void run_unittests();

View File

@@ -0,0 +1,394 @@
#include "streambuffer.hpp"
#include <cassert>
#include <cstring>
void StreamBuffer::init(bool fixed, bool owned, char *buf, int64_t size) {
buf_lo_ = buf;
buf_hi_ = buf_lo_ + size;
read_cursor_ = buf_lo_;
write_cursor_ = buf_lo_;
pre_read_count_ = 0;
owned_ = owned;
fixed_size_ = fixed;
lua_reader_data_ = 0;
lua_reader_size_ = 0;
}
StreamBuffer::StreamBuffer() {
init(false, true, 0, 0);
}
StreamBuffer::StreamBuffer(int64_t size, bool fixed) {
assert(size > 0);
init(fixed, true, (char*)malloc(size), size);
}
StreamBuffer::StreamBuffer(const char *s, int64_t size) {
init(true, false, const_cast<char *>(s), size);
}
StreamBuffer::~StreamBuffer() {
if (owned_ && (buf_lo_ != 0)) delete buf_lo_;
}
int64_t StreamBuffer::read_count() const {
return (read_cursor_ - buf_lo_) + pre_read_count_;
}
int64_t StreamBuffer::write_count() const {
return (write_cursor_ - buf_lo_) + pre_read_count_;
}
bool StreamBuffer::layout_is(int64_t a, int64_t b, int64_t c) {
if (read_cursor_ - buf_lo_ != a) return false;
if (write_cursor_ - read_cursor_ != b) return false;
if (buf_hi_ - write_cursor_ != c) return false;
return true;
}
void StreamBuffer::make_space_slow(int64_t bytes) {
assert(owned_ && "We don't own this buffer, can't grow it");
// Decide whether the current buffer is big enough.
int64_t data_size = (write_cursor_ - read_cursor_);
int64_t existing_size = (buf_hi_ - buf_lo_);
int64_t desired_size = 8192 + ((data_size + bytes) * 2);
// Move the data to the beginning of the buffer, or to
// the beginning of a new buffer.
if (fixed_size_) {
assert((data_size + bytes <= existing_size) && "Not enough space in fixed-size buffer");
if (data_size > 0) memcpy(buf_lo_, read_cursor_, data_size);
} else if (existing_size >= desired_size) {
if (data_size > 0) memcpy(buf_lo_, read_cursor_, data_size);
} else {
char *nbuf = (char *)malloc(desired_size);
if (data_size > 0) memcpy(nbuf, read_cursor_, data_size);
if (buf_lo_ != nullptr) free(buf_lo_);
buf_lo_ = nbuf;
buf_hi_ = nbuf + desired_size;
}
// Update the pointers to the data region.
pre_read_count_ += (read_cursor_ - buf_lo_);
read_cursor_ = buf_lo_;
write_cursor_ = buf_lo_ + data_size;
// Flush the lua reader data, if any.
lua_reader_data_ = 0;
lua_reader_size_ = 0;
}
char *StreamBuffer::get_overwrite(int64_t size, int64_t write_count_after) {
int64_t write_count_before = write_count_after - size;
assert(write_count_before >= read_count());
assert(write_count_after <= write_count());
return buf_lo_ + (write_count_before - pre_read_count_);
}
void StreamBuffer::clear() {
assert(owned_);
if (!fixed_size_) {
if (buf_lo_ != nullptr) delete buf_lo_;
buf_lo_ = 0;
buf_hi_ = 0;
}
owned_ = true;
read_cursor_ = buf_lo_;
write_cursor_ = buf_lo_;
pre_read_count_ = 0;
lua_reader_data_ = 0;
lua_reader_size_ = 0;
}
char *StreamBuffer::alloc_space(int64_t bytes) {
assert(bytes >= 0);
make_space(bytes);
return write_cursor_;
}
void StreamBuffer::wrote_space(int64_t bytes) {
int64_t maxbuf = buf_hi_ - write_cursor_;
assert(bytes >= 0);
assert(bytes <= maxbuf);
write_cursor_ += bytes;
}
void StreamBuffer::write_int8(int8_t v) {
make_space(1);
memcpy(write_cursor_, &v, 1);
write_cursor_ += 1;
}
void StreamBuffer::write_int16(int16_t v) {
make_space(2);
memcpy(write_cursor_, &v, 2);
write_cursor_ += 2;
}
void StreamBuffer::write_int32(int32_t v) {
make_space(4);
memcpy(write_cursor_, &v, 4);
write_cursor_ += 4;
}
void StreamBuffer::write_int64(int64_t v) {
make_space(8);
memcpy(write_cursor_, &v, 8);
write_cursor_ += 8;
}
void StreamBuffer::write_bytes(const char *s, int64_t len) {
make_space(len);
memcpy(write_cursor_, s, len);
write_cursor_ += len;
}
void StreamBuffer::write_ztbytes(const char *s) {
write_bytes(s, strlen(s));
}
void StreamBuffer::write_string(const std::string &s) {
if (s.size() >= 255) {
write_int8(0xFF);
write_int64(s.size());
write_bytes(s.c_str(), s.size());
} else {
write_int8(s.size());
write_bytes(s.c_str(), s.size());
}
}
void StreamBuffer::overwrite_int8(int64_t write_count_after, int8_t v) {
char *target = get_overwrite(1, write_count_after);
memcpy(target, &v, 1);
}
void StreamBuffer::overwrite_int16(int64_t write_count_after, int16_t v) {
char *target = get_overwrite(2, write_count_after);
memcpy(target, &v, 2);
}
void StreamBuffer::overwrite_int32(int64_t write_count_after, int32_t v) {
char *target = get_overwrite(4, write_count_after);
memcpy(target, &v, 4);
}
void StreamBuffer::overwrite_int64(int64_t write_count_after, int64_t v) {
char *target = get_overwrite(8, write_count_after);
memcpy(target, &v, 8);
}
int8_t StreamBuffer::read_int8() {
check_available(1);
int8_t v;
memcpy(&v, read_cursor_, 1);
read_cursor_ += 1;
return v;
}
int16_t StreamBuffer::read_int16() {
check_available(2);
int16_t v;
memcpy(&v, read_cursor_, 2);
read_cursor_ += 2;
return v;
}
int32_t StreamBuffer::read_int32() {
check_available(4);
int32_t v;
memcpy(&v, read_cursor_, 4);
read_cursor_ += 4;
return v;
}
int64_t StreamBuffer::read_int64() {
check_available(8);
int64_t v;
memcpy(&v, read_cursor_, 8);
read_cursor_ += 8;
return v;
}
const char *StreamBuffer::read_bytes(int64_t bytes) {
check_available(bytes);
char *data = read_cursor_;
read_cursor_ += bytes;
return data;
}
std::string StreamBuffer::read_string(int64_t max_allowed) {
int64_t len = read_int8();
if (len == 255) {
len = read_int64();
}
if (len > max_allowed) throw StreamCorruption();
const char *bytes = read_bytes(len);
return std::string(bytes, len);
}
bool StreamBuffer::at_eof() {
return (read_cursor_ == write_cursor_);
}
void StreamBuffer::verify_eof() {
if (read_cursor_ != write_cursor_) {
throw StreamCorruption();
}
}
void StreamBuffer::unread_to(int64_t rd_count) {
assert(rd_count >= pre_read_count_);
assert(rd_count <= read_count());
read_cursor_ = buf_lo_ + (rd_count - pre_read_count_);
}
int StreamBuffer::lua_writer(lua_State *L, const void* p, size_t sz, void* ud) {
StreamBuffer *sb = (StreamBuffer *)ud;
sb->write_bytes((const char *)p, sz);
return 0;
}
void *StreamBuffer::lua_writer_ud() {
return this;
}
const char *StreamBuffer::lua_reader(lua_State *L, void *ud, size_t *size) {
StreamBuffer *sb = (StreamBuffer *)ud;
*size = sb->lua_reader_size_;
const char *data = sb->lua_reader_data_;
// Next time the reader gets called, there's no data left.
sb->lua_reader_data_ = 0;
sb->lua_reader_size_ = 0;
return data;
}
void *StreamBuffer::lua_reader_ud(int64_t size) {
const char *data = read_bytes(size);
lua_reader_data_ = data;
lua_reader_size_ = size;
return this;
}
static bool streq(const char *str, const char *data) {
int len = strlen(str);
return memcmp(str, data, len) == 0;
}
LuaDefine(unittests_streambuffer, "c") {
// An 11-byte fixed-size stream buffer.
StreamBuffer sb11(11, true);
// Check the initial state.
assert(sb11.layout_is(0, 0, 11));
// Write a few bytes.
sb11.write_ztbytes("abcdef");
assert(sb11.layout_is(0, 6, 5));
// Try reading some bytes.
assert(streq("abcd", sb11.read_bytes(4)));
assert(sb11.layout_is(4, 2, 5));
// Put back two bytes.
sb11.unread_to(2);
assert(sb11.layout_is(2, 4, 5));
// Read some more bytes.
assert(streq("cdef", sb11.read_bytes(4)));
assert(sb11.layout_is(6, 0, 5));
// Reading bytes now should raise an EOF and should not alter layout
try {
sb11.read_bytes(1);
assert(false && "This should have thrown an exception");
} catch (StreamEof) {}
assert(sb11.layout_is(6, 0, 5));
// Write some more bytes into the stream, forcing a shift-left
sb11.write_ztbytes("ghijkl");
assert(sb11.layout_is(0, 6, 5));
// Test buffer wrapping a little more.
assert(streq("ghi", sb11.read_bytes(3)));
assert(sb11.layout_is(3, 3, 5));
sb11.write_ztbytes("mnopqr");
assert(sb11.layout_is(0, 9, 2));
assert(streq("jklmnopqr", sb11.read_bytes(9)));
assert(sb11.layout_is(9, 0, 2));
// Test 1-byte integer ops.
sb11.clear();
for (int i = 0; i < 10; i++) {
sb11.write_int8(i);
sb11.write_int8(i+100);
assert(sb11.read_int8() == i);
assert(sb11.read_int8() == i+100);
}
// Test 2-byte integer ops.
for (int i = 0; i < 10; i++) {
sb11.write_int16(i);
sb11.write_int16(i+10000);
assert(sb11.read_int16() == i);
assert(sb11.read_int16() == i+10000);
}
// Test 4-byte integer ops.
for (int i = 0; i < 10; i++) {
sb11.write_int32(i);
sb11.write_int32(i+1000000);
assert(sb11.read_int32() == i);
assert(sb11.read_int32() == i+1000000);
}
// Test 8-byte integer ops.
for (int i = 0; i < 10; i++) {
sb11.write_int64(i + 1000000);
assert(sb11.read_int64() == i + 1000000);
}
// Check the write count and read count accumulator ability.
sb11.clear();
assert(sb11.write_count() == 0);
assert(sb11.read_count() == 0);
for (int i = 0; i < 10; i++) {
assert(sb11.write_count() == i * 8);
sb11.write_int32(i);
sb11.write_int32(i+1000000);
assert(sb11.read_count() == i * 8);
assert(sb11.read_int32() == i);
assert(sb11.read_int32() == i+1000000);
}
// Try clearing the buffer.
sb11.clear();
assert(sb11.layout_is(0, 0, 11));
// Write a number then overwrite.
for (int i = 0; i < 2; i++) {
sb11.write_int16(12);
sb11.write_int16(34);
int64_t wc = sb11.write_count();
sb11.write_int16(56);
sb11.write_int16(78);
sb11.overwrite_int16(wc, 90);
assert(sb11.read_int16() == 12);
assert(sb11.read_int16() == 90);
assert(sb11.read_int16() == 56);
assert(sb11.read_int16() == 78);
}
// Try compact string encoding.
sb11.clear();
sb11.write_string("abc");
sb11.write_string("");
sb11.write_string("de");
assert(sb11.layout_is(0, 8, 3));
assert(sb11.read_string(1000) == "abc");
assert(sb11.read_string(1000) == "");
assert(sb11.read_string(1000) == "de");
return 0;
}

View File

@@ -0,0 +1,358 @@
//////////////////////////////////////////////////////////////
//
// STREAMBUFFER
//
// Serves as a buffer for buffered I/O operations. Has rather sophisticated
// methods to help serialize and deserialize data.
//
// The semantics of this class contain a lot of subtlety! Please read the
// documentation carefully.
//
// TELLING LINUX TO READ A FILE DESCRIPTOR INTO A STREAMBUFFER
//
// It is possible to read from a linux file descriptor, directly into a stream
// buffer. You should do this, it's very efficient. Here is how you do it:
//
// // With linux read, you have to pick an arbitrary buffer size.
// const int bufsize = 16384;
//
// // Allocate transient space in the streambuffer.
// char *space = streambuffer.alloc_space(bufsize);
//
// // Call the linux 'read' function.
// ssize_t bytes_read = read(fd, space, bufsize);
//
// // Append the bytes read to the streambuffer.
// streambuffer.wrote_space(bytes_read);
//
// Now, let's dig into the semantics of this. The method 'alloc_space' MUST be
// followed by 'wrote_space'. It is an error to invoke these methods unless you
// do them in that sequence. Together, these two methods count as a single
// 'write' operation into the StreamBuffer.
//
// 'alloc_space' allocates a block of bytes within the StreamBuffer. The
// pointer returned here is only valid until the 'wrote_space' operation. The
// method 'wrote_space' tells the StreamBuffer that the space has been populated
// with the specified amount of data. The data is then officially appended to
// the StreamBuffer. Again, the two methods 'alloc_space' followed by
// 'wrote_space' together count as a single write operation.
//
// THE OVERWRITE_INT METHODS:
//
// These overwrite methods are meant to help deal with this situation: you want
// to write a length followed by some data, but you don't know the length until
// after you've written the data. The workaround: write a dummy length, then
// write the data, and then overwrite the previously-written length with the
// correct length. This is the construction that accomplishes this:
//
// // Write the dummy length, this will get overwritten.
// streambuffer.write_int32(0);
//
// // Write the data, and calculate its length in bytes.
// int64_t write_count_1 = streambuffer.write_count();
// write_data(stream);
// int64_t write_count_2 = streambuffer.write_count();
// int64_t data_len = write_count_2 - write_count_1;
//
// // Overwrite the previously-written dummy length.
// streambuffer.overwrite_int32(write_count_1, data_len);
//
// Almost all of this is self-explanatory, but the last line is interesting. In
// order to know what part of the buffer to overwrite, overwrite_int uses
// write_count_1 as a pointer into the buffer - it points immedately to the
// right of the integer to overwrite.
//
// OVERWRITE_INT LIMITS
//
// If you use write_int to write an integer into the buffer, you are allowed to
// overwrite that integer UNTIL you do a read from the buffer. Once you do a
// read, it is no longer legal to overwrite ints that you wrote BEFORE the read.
//
// WRITE_STRING STORES THE STRING LENGTH, WRITE_BYTES DOES NOT
//
// write_string writes a string into the buffer and prepends a length. The
// encoding of the length field is designed to be efficient for short strings
// but still capable of encoding long lengths.
//
// write_bytes doesn't store the data length in the buffer. It's just a raw
// write of bytes.
//
// STREAM EXCEPTIONS
//
// If you do a read_int64, but the buffer doesn't contain the necessary 8 bytes,
// it throws a StreamEof exception. In general, during reading, the following
// common situations generate StreamEof or StreamCorruption exceptions:
//
// * not enough bytes to satisfy a 'read' call: StreamEof
// * call read_eof, but the buffer is not empty: StreamCorruption
// * call read_string, but the string is unreasonably long: StreamCorruption
//
// Exceptions are only generated when reading from a stream that contains bad
// data. Any other error generates a full-blown abort. For example, if you try
// to write to a stream that's not open for writing, that's an abort, not an
// exception. Write operations never generate exceptions.
//
// Sometimes, it is convenient to throw StreamCorruption yourself, if you detect
// that the data you've read from a stream is invalid. This can make error
// handling a little cleaner.
//
// READ BYTES POINTER VALIDITY
//
// When you call read_bytes, it returns a pointer to a block of bytes. This
// pointer only remains valid until you do a 'write' into the stream.
//
// NESTED DECODING
//
// Here is an interesting construct:
//
// // Read a message from the stream.
// size_t len = streambuffer.read_int32()
// const char *bytes = streambuffer.read_bytes(len);
//
// // Construct another stream object to decode the message.
// StreamBuffer substream(bytes, len);
// decode(substream);
//
// This is perfectly valid and a potentially convenient way to parse the
// contents of a message.
//
// UNREADING BYTES
//
// It's possible to 'unread' bytes that you've already read from a stream. This
// makes it possible to read those same bytes again.
//
// A common situation where this might be useful is: you're decoding a message,
// but you discover halfway through the process of decoding the message that you
// haven't received the whole message yet. In that case, it may be desirable to
// unread the partial message, so that you can wait for the rest of the message
// to be received.
//
// Here is the construction that accomplishes this:
//
// // Get the stream's read count before parsing the message.
// size_t read_count_before = streambuffer.read_count();
//
// // Parse the message, but if there's an EOF, deal with it:
// try {
// // Parse the message.
// int32_t value1 = streambuffer.read_int32();
// std::string value2 = streambuffer.read_string(maxlen);
// int64_t value3 = streambuffer.read_int64();
//
// // Great! I got the whole message.
// execute_message(value1, value2, value3);
// } catch (StreamEof) {
// // I ran out of bytes. Unread the message.
// streambuffer.unread(read_count_before);
// }
//
// UNREAD LIMITS
//
// If you read bytes from a stream, that data can be 'unread' until you do a
// write. After a write, it is no longer possible to 'unread' data that you
// read before the write.
//
// STREAMBUFFERS THAT DON'T OWN THEIR OWN MEMORY
//
// If you create a streambuffer using this constructor:
//
// StreamBuffer(const char *data, uint64_t len);
//
// This StreamBuffer reads from an external (unowned) block of bytes, which is
// not copied! The StreamBuffer saves the pointer that you passed in. This
// pointer must remain valid until you're done with the StreamBuffer.
//
// A StreamBuffer that reads from an external block of bytes is read-only.
// Attempts to write to this buffer will be caught and will cause an abort. The
// write_count for such a buffer returns the 'len' value that you initialized
// the buffer with.
//
// USING A STREAMBUFFER TO READ AN ENTIRE FILE
//
// If you wish to read an entire file and store the file contents in a
// StreamBuffer, you should probe the size of the file, then allocate a
// StreamBuffer of the correct size using this constructor:
//
// StreamBuffer(int64_t size);
//
// Then, you can use 'alloc_space' and 'wrote_space' to read the file into the
// buffer in a single read call.
//
// USING A STREAMBUFFER AS A LUA_WRITER OR LUA_READER
//
// You can use a streambuffer as a lua_Writer, as follows:
//
// lua_dump(L, stream.lua_writer(), stream.lua_writer_ud());
//
// Anything written to the lua_writer gets appended to the streambuffer, the
// same as if it had been written using write_bytes.
//
// You can use a streambuffer as a lua_Reader, as follows:
//
// lua_load (L, stream.lua_reader(), stream.lua_reader_ud(nbytes), ...)
//
// The exact semantics of the lua_reader are tricky, so be careful:
// lua_reader_ud calls 'read_bytes' immediately, and it stores the bytes in a
// "cache of bytes for lua." Then, when the lua_reader gets invoked, the reader
// returns the entire contents of the cache, and it clears the cache. Here are
// some consequences of this design:
//
// 1. The number of bytes read from the stream is always exactly equal to
// nbytes, even if lua never calls the lua_reader.
//
// 2. If the stream doesn't contain nbytes, a StreamEof exception gets thrown
// from lua_reader_ud, not from the lua_Reader. This is good, because it
// means exceptions don't get thrown from inside the lua runtime.
//
//////////////////////////////////////////////////////////////
#ifndef STREAMBUFFER_HPP
#define STREAMBUFFER_HPP
#include "luastack.hpp"
#include <cstdint>
#include <string>
#include <sstream>
#include <cassert>
class StreamException
{
public:
virtual char const *what() const { return "General stream exception"; }
};
class StreamEof : public StreamException
{
public:
virtual char const *what() const { return "Stream ran out of data"; }
};
class StreamCorruption : public StreamException
{
public:
virtual char const *what() const { return "Stream contained invalid data"; }
};
class StreamBuffer {
private:
// Start and end of the allocated block.
char *buf_lo_;
char *buf_hi_;
// The write and read cursors.
char *write_cursor_;
char *read_cursor_;
// Number of bytes read before buffer was last aligned.
int64_t pre_read_count_;
// True if we own this buffer.
bool owned_;
// True if we're not allowed to expand this buffer.
bool fixed_size_;
// Lua reader return value.
const char *lua_reader_data_;
int64_t lua_reader_size_;
// Initialize with a new buffer.
void init(bool fixed, bool owned, char *buf, int64_t size);
// Make the specified amount of space in the buffer for writing.
void make_space(int64_t bytes) {
int64_t available = buf_hi_ - write_cursor_;
if (available < bytes) make_space_slow(bytes);
}
void make_space_slow(int64_t bytes);
// Make sure the specified number of bytes are available to read.
void check_available(int64_t bytes) {
int64_t avail = write_cursor_ - read_cursor_;
if (avail < bytes) {
throw StreamEof();
}
}
// Implementation for the overwrite_int functions.
char *get_overwrite(int64_t size, int64_t write_count_after);
// This is mainly for unit testing.
bool layout_is(int64_t a, int64_t b, int64_t c);
friend int unittests_streambuffer(lua_State *L);
public:
// Construct an empty buffer.
StreamBuffer();
// Construct an empty buffer, preallocate the specified amount of space.
StreamBuffer(int64_t size, bool fixed_size);
// Construct a streambuffer that reads from an external block of bytes.
StreamBuffer(const char *s, int64_t len);
// Delete a StreamBuffer.
~StreamBuffer();
// Get the total number of bytes ever read from this buffer.
int64_t read_count() const;
// Get the total number of bytes ever written to this buffer.
int64_t write_count() const;
// Delete all data and (if not fixed-size) free the buffer.
void clear();
// Alloc_space and wrote_space need to be invoked together.
char *alloc_space(int64_t bytes);
void wrote_space(int64_t bytes);
// Write values into the buffer.
void write_int8(int8_t v);
void write_int16(int16_t v);
void write_int32(int32_t v);
void write_int64(int64_t v);
void write_bytes(const char *bytes, int64_t len);
void write_string(const std::string &s);
void write_ztbytes(const char *bytes);
// Overwrite values previously written to the buffer.
void overwrite_int8(int64_t write_count_after, int8_t value);
void overwrite_int16(int64_t write_count_after, int16_t value);
void overwrite_int32(int64_t write_count_after, int32_t value);
void overwrite_int64(int64_t write_count_after, int64_t value);
// Read integers from the buffer. May throw StreamEof.
int8_t read_int8();
int16_t read_int16();
int32_t read_int32();
int64_t read_int64();
// Read a string of no more than the specified length. May throw StreamEof
// or StreamCorruption.
std::string read_string(int64_t max_allowed);
// Read a block of bytes. May throw StreamEof.
const char *read_bytes(int64_t bytes);
// Return true if the buffer is empty.
bool at_eof();
// Verify that the buffer is empty, if not, throw StreamCorruption.
void verify_eof();
// Rewind the read cursor to a previous position.
void unread_to(int64_t read_count);
// Use the stream buffer as a lua_Writer.
static int lua_writer(lua_State *L, const void* p, size_t sz, void* ud);
void *lua_writer_ud();
// Use the stream buffer as a lua_Reader.
static const char *lua_reader(lua_State *L, void *data, size_t *size);
void *lua_reader_ud(int64_t bytes);
};
#endif // STREAMBUFFER_HPP