diff --git a/luprex/core/Makefile b/luprex/core/Makefile index 4a9009eb..826e968c 100644 --- a/luprex/core/Makefile +++ b/luprex/core/Makefile @@ -14,7 +14,7 @@ CPP_FILES=\ cpp/gui.cpp\ cpp/luasnap.cpp\ cpp/animqueue.cpp\ - cpp/packer.cpp\ + cpp/streambuffer.cpp\ cpp/source.cpp\ cpp/world.cpp\ cpp/textgame.cpp\ diff --git a/luprex/core/cpp/idalloc.hpp b/luprex/core/cpp/idalloc.hpp index 17213519..e12aee04 100644 --- a/luprex/core/cpp/idalloc.hpp +++ b/luprex/core/cpp/idalloc.hpp @@ -76,7 +76,7 @@ private: int64_t next_batch_; int64_t next_id_; int queue_fill_; - friend int cunittests_idalloc(lua_State *L); + friend int unittests_idalloc(lua_State *L); public: // Construct and destroy global pools. Note that after constructing diff --git a/luprex/core/cpp/luasnap.cpp b/luprex/core/cpp/luasnap.cpp index 1ee7836d..6c30cde9 100644 --- a/luprex/core/cpp/luasnap.cpp +++ b/luprex/core/cpp/luasnap.cpp @@ -26,7 +26,7 @@ LuaSnap::~LuaSnap() { std::cerr << "LuaSnap destructor not implemented yet" << std::endl; } -void LuaSnap::serialize(Packer *pk) { +void LuaSnap::serialize(StreamBuffer *sb) { // Lua stack should be empty. assert(lua_gettop(state_) == 0); @@ -63,25 +63,26 @@ void LuaSnap::serialize(Packer *pk) { assert(lua_gettop(state_) == 2); // Write dummy length, use eris to write data, then overwrite length. - pk->write_int64(0); - size_t tell = pk->tellp(); - eris_dump(state_, pk->lua_writer, pk->lua_writer_ud()); - pk->overwrite_int64(tell, pk->tellp() - tell); + sb->write_int64(0); + int64_t pos1 = sb->write_count(); + eris_dump(state_, sb->lua_writer, sb->lua_writer_ud()); + int64_t pos2 = sb->write_count(); + sb->overwrite_int64(pos1, pos2 - pos1); lua_settop(state_, 0); - std::cerr << "Eris dump is " << (pk->tellp() - tell) << " bytes." << std::endl; + std::cerr << "Eris dump is " << pos2-pos1 << " bytes." << std::endl; } -void LuaSnap::deserialize(Unpacker *unpk) { +void LuaSnap::deserialize(StreamBuffer *sb) { // Lua stack should be empty. assert(lua_gettop(state_) == 0); - // Get a reader subsection containing the eris data. - size_t len = unpk->read_int64(); - Unpacker subsec = unpk->read_section(len); + // Get the length of the eris dump. + int64_t len = sb->read_int64(); + void *ud = sb->lua_reader_ud(len); // Call eris with the permanents table and passing the snapshot as a lua_Reader. lua_getfield(state_, LUA_REGISTRYINDEX, "unpersist"); - eris_undump(state_, subsec.lua_reader, subsec.lua_reader_ud()); + eris_undump(state_, sb->lua_reader, ud); assert(lua_gettop(state_) == 2); // Set up a stack frame. @@ -122,21 +123,17 @@ void LuaSnap::deserialize(Unpacker *unpk) { // require us to maintain any additional code. bool LuaSnap::have_snapshot() const { - return !snapshot_.empty(); + return snapshot_.write_count() != 0; } void LuaSnap::snapshot() { - assert(snapshot_.empty()); - std::ostringstream oss; - Packer pk(&oss); - serialize(&pk); - snapshot_ = oss.str(); + assert(snapshot_.write_count() == 0); + serialize(&snapshot_); } void LuaSnap::rollback() { - assert(!snapshot_.empty()); - Unpacker unpk(snapshot_.c_str(), snapshot_.size()); - deserialize(&unpk); + assert(snapshot_.write_count() != 0); + deserialize(&snapshot_); snapshot_.clear(); } diff --git a/luprex/core/cpp/luasnap.hpp b/luprex/core/cpp/luasnap.hpp index 3e85051d..30cf53c4 100644 --- a/luprex/core/cpp/luasnap.hpp +++ b/luprex/core/cpp/luasnap.hpp @@ -14,13 +14,13 @@ #ifndef LUASNAP_HPP #define LUASNAP_HPP -#include "packer.hpp" +#include "StreamBuffer.hpp" #include "luastack.hpp" class LuaSnap { private: lua_State *state_; - std::string snapshot_; + StreamBuffer snapshot_; public: LuaSnap(); @@ -32,11 +32,11 @@ public: // Serialize the state of the lua interpreter. // - void serialize(Packer *pk); + void serialize(StreamBuffer *sb); // Restore the the lua interpreter given a serialized state. // - void deserialize(Unpacker *unpk); + void deserialize(StreamBuffer *sb); // Return true if there's a saved snapshot. // @@ -53,7 +53,6 @@ public: // If there is no snapshot, this panics. // void rollback(); - }; diff --git a/luprex/core/cpp/packer.cpp b/luprex/core/cpp/packer.cpp deleted file mode 100644 index a1405c18..00000000 --- a/luprex/core/cpp/packer.cpp +++ /dev/null @@ -1,115 +0,0 @@ -#include -#include - -void Packer::write_int8(int8_t v) { - oss_->write((const char *)&v, sizeof(v)); -} - -void Packer::write_int16(int16_t v) { - oss_->write((const char *)&v, sizeof(v)); -} - -void Packer::write_int32(int32_t v) { - oss_->write((const char *)&v, sizeof(v)); -} - -void Packer::write_int64(int64_t v) { - oss_->write((const char *)&v, sizeof(v)); -} - -void Packer::overwrite_int8(size_t tell_after, int8_t v) { - oss_->seekp(tell_after - 1); - write_int8(v); - oss_->seekp(0, std::ios_base::end); -} - -void Packer::overwrite_int16(size_t tell_after, int16_t v) { - oss_->seekp(tell_after - 2); - write_int16(v); - oss_->seekp(0, std::ios_base::end); -} - -void Packer::overwrite_int32(size_t tell_after, int32_t v) { - oss_->seekp(tell_after - 4); - write_int32(v); - oss_->seekp(0, std::ios_base::end); -} - -void Packer::overwrite_int64(size_t tell_after, int64_t v) { - oss_->seekp(tell_after - 8); - write_int64(v); - oss_->seekp(0, std::ios_base::end); -} - -int Packer::lua_writer(lua_State *L, const void *p, size_t sz, void *ud) { - Packer *pkr = (Packer *)ud; - pkr->oss_->write((const char *)p, sz); - return 0; -} - -Unpacker::Unpacker(const char *d, size_t s) { - assert(d != nullptr); - assert(s >= 0); - data_ = d; - size_ = s; -} - -int8_t Unpacker::read_int8() { - assert(size_ >= 1); - int8_t result; - memcpy(&result, data_, 1); - data_ += 1; - size_ -= 1; - return result; -} - -int16_t Unpacker::read_int16() { - assert(size_ >= 2); - int16_t result; - memcpy(&result, data_, 2); - data_ += 2; - size_ -= 2; - return result; -} - -int32_t Unpacker::read_int32() { - assert(size_ >= 4); - int32_t result; - memcpy(&result, data_, 4); - data_ += 4; - size_ -= 4; - return result; -} - -int64_t Unpacker::read_int64() { - assert(size_ >= 8); - int64_t result; - memcpy(&result, data_, 8); - data_ += 8; - size_ -= 8; - return result; -} - -void Unpacker::skip(size_t n) { - assert(size_ >= n); - data_ += n; - size_ -= n; -} - -Unpacker Unpacker::read_section(size_t n) { - assert(size_ >= n); - const char *d = data_; - data_ += n; - size_ -= n; - return Unpacker(d, n); -} - -const char *Unpacker::lua_reader(lua_State *L, void *ud, size_t *size) { - Unpacker *unpk = (Unpacker *)ud; - const char *retval = unpk->data_; - *size = unpk->size_; - if (unpk->size_ == 0) retval = nullptr; - unpk->data_ += unpk->size_; - unpk->size_ = 0; - return retval; -} diff --git a/luprex/core/cpp/packer.hpp b/luprex/core/cpp/packer.hpp deleted file mode 100644 index 6f462c8e..00000000 --- a/luprex/core/cpp/packer.hpp +++ /dev/null @@ -1,72 +0,0 @@ -#ifndef PACKER_HPP -#define PACKER_HPP - -#include "luastack.hpp" -#include -#include -#include -#include - -class Packer { -private: - std::ostringstream *oss_; - - // Packer is not copyable. - Packer(const Packer &other) { assert(false); } -public: - using fixhandle = size_t; - Packer(std::ostringstream *s) : oss_(s) {} - - // Get the current file position. - size_t tellp() { return oss_->tellp(); } - - void write_int8(int8_t v); - void write_int16(int16_t v); - void write_int32(int32_t v); - void write_int64(int64_t v); - - // Sometimes, you want to write a length followed by a block of data. - // Sometimes, you don't know the length until after you've written the block - // of data. To help with this situation, we provide the overwrite_int - // functions. Here's what you do: - // - // 1. write a dummy length into the stream. - // 2. use 'tellp' to fetch the stream position *after* the dummy length. - // 3. write the block of data. - // 4. use an overwrite function below to overwrite the dummy length. - // - void overwrite_int8(size_t tell_after, int8_t value); - void overwrite_int16(size_t tell_after, int16_t value); - void overwrite_int32(size_t tell_after, int32_t value); - void overwrite_int64(size_t tell_after, int64_t value); - - // The packer can be used as a lua_Writer. - static int lua_writer(lua_State *L, const void *p, size_t sz, void *ud); - void *lua_writer_ud() { return this; } -}; - -class Unpacker { -private: - const char *data_; - size_t size_; - -public: - Unpacker(const char *d, size_t s); - const char *data() { return data_; } - size_t size() { return size_; } - - int8_t read_int8(); - int16_t read_int16(); - int32_t read_int32(); - int64_t read_int64(); - void skip(size_t n); - Unpacker read_section(size_t n); - - // The unpacker can be used as a lua_Reader - static const char *lua_reader(lua_State *L, void *ud, size_t *size); - void *lua_reader_ud() { return this; } -}; - - - -#endif // PACKER_HPP diff --git a/luprex/core/cpp/source.hpp b/luprex/core/cpp/source.hpp index 27484fc9..e185e21d 100644 --- a/luprex/core/cpp/source.hpp +++ b/luprex/core/cpp/source.hpp @@ -150,7 +150,7 @@ public: // run_unittests // - // Run all the unit tests. Print any errors to console. If there + // Run all the lua unit tests. Print any errors to console. If there // are any errors, exits the program. // void run_unittests(); diff --git a/luprex/core/cpp/streambuffer.cpp b/luprex/core/cpp/streambuffer.cpp new file mode 100644 index 00000000..a64a5791 --- /dev/null +++ b/luprex/core/cpp/streambuffer.cpp @@ -0,0 +1,394 @@ +#include "streambuffer.hpp" +#include +#include + +void StreamBuffer::init(bool fixed, bool owned, char *buf, int64_t size) { + buf_lo_ = buf; + buf_hi_ = buf_lo_ + size; + read_cursor_ = buf_lo_; + write_cursor_ = buf_lo_; + pre_read_count_ = 0; + owned_ = owned; + fixed_size_ = fixed; + lua_reader_data_ = 0; + lua_reader_size_ = 0; +} + +StreamBuffer::StreamBuffer() { + init(false, true, 0, 0); +} + +StreamBuffer::StreamBuffer(int64_t size, bool fixed) { + assert(size > 0); + init(fixed, true, (char*)malloc(size), size); +} + +StreamBuffer::StreamBuffer(const char *s, int64_t size) { + init(true, false, const_cast(s), size); +} + +StreamBuffer::~StreamBuffer() { + if (owned_ && (buf_lo_ != 0)) delete buf_lo_; +} + +int64_t StreamBuffer::read_count() const { + return (read_cursor_ - buf_lo_) + pre_read_count_; +} + +int64_t StreamBuffer::write_count() const { + return (write_cursor_ - buf_lo_) + pre_read_count_; +} + +bool StreamBuffer::layout_is(int64_t a, int64_t b, int64_t c) { + if (read_cursor_ - buf_lo_ != a) return false; + if (write_cursor_ - read_cursor_ != b) return false; + if (buf_hi_ - write_cursor_ != c) return false; + return true; +} + +void StreamBuffer::make_space_slow(int64_t bytes) { + assert(owned_ && "We don't own this buffer, can't grow it"); + + // Decide whether the current buffer is big enough. + int64_t data_size = (write_cursor_ - read_cursor_); + int64_t existing_size = (buf_hi_ - buf_lo_); + int64_t desired_size = 8192 + ((data_size + bytes) * 2); + + // Move the data to the beginning of the buffer, or to + // the beginning of a new buffer. + if (fixed_size_) { + assert((data_size + bytes <= existing_size) && "Not enough space in fixed-size buffer"); + if (data_size > 0) memcpy(buf_lo_, read_cursor_, data_size); + } else if (existing_size >= desired_size) { + if (data_size > 0) memcpy(buf_lo_, read_cursor_, data_size); + } else { + char *nbuf = (char *)malloc(desired_size); + if (data_size > 0) memcpy(nbuf, read_cursor_, data_size); + if (buf_lo_ != nullptr) free(buf_lo_); + buf_lo_ = nbuf; + buf_hi_ = nbuf + desired_size; + } + + // Update the pointers to the data region. + pre_read_count_ += (read_cursor_ - buf_lo_); + read_cursor_ = buf_lo_; + write_cursor_ = buf_lo_ + data_size; + + // Flush the lua reader data, if any. + lua_reader_data_ = 0; + lua_reader_size_ = 0; +} + +char *StreamBuffer::get_overwrite(int64_t size, int64_t write_count_after) { + int64_t write_count_before = write_count_after - size; + assert(write_count_before >= read_count()); + assert(write_count_after <= write_count()); + return buf_lo_ + (write_count_before - pre_read_count_); +} + +void StreamBuffer::clear() { + assert(owned_); + if (!fixed_size_) { + if (buf_lo_ != nullptr) delete buf_lo_; + buf_lo_ = 0; + buf_hi_ = 0; + } + owned_ = true; + read_cursor_ = buf_lo_; + write_cursor_ = buf_lo_; + pre_read_count_ = 0; + lua_reader_data_ = 0; + lua_reader_size_ = 0; +} + +char *StreamBuffer::alloc_space(int64_t bytes) { + assert(bytes >= 0); + make_space(bytes); + return write_cursor_; +} + +void StreamBuffer::wrote_space(int64_t bytes) { + int64_t maxbuf = buf_hi_ - write_cursor_; + assert(bytes >= 0); + assert(bytes <= maxbuf); + write_cursor_ += bytes; +} + +void StreamBuffer::write_int8(int8_t v) { + make_space(1); + memcpy(write_cursor_, &v, 1); + write_cursor_ += 1; +} + +void StreamBuffer::write_int16(int16_t v) { + make_space(2); + memcpy(write_cursor_, &v, 2); + write_cursor_ += 2; +} + +void StreamBuffer::write_int32(int32_t v) { + make_space(4); + memcpy(write_cursor_, &v, 4); + write_cursor_ += 4; +} + +void StreamBuffer::write_int64(int64_t v) { + make_space(8); + memcpy(write_cursor_, &v, 8); + write_cursor_ += 8; +} + +void StreamBuffer::write_bytes(const char *s, int64_t len) { + make_space(len); + memcpy(write_cursor_, s, len); + write_cursor_ += len; +} + +void StreamBuffer::write_ztbytes(const char *s) { + write_bytes(s, strlen(s)); +} + +void StreamBuffer::write_string(const std::string &s) { + if (s.size() >= 255) { + write_int8(0xFF); + write_int64(s.size()); + write_bytes(s.c_str(), s.size()); + } else { + write_int8(s.size()); + write_bytes(s.c_str(), s.size()); + } +} + +void StreamBuffer::overwrite_int8(int64_t write_count_after, int8_t v) { + char *target = get_overwrite(1, write_count_after); + memcpy(target, &v, 1); +} + +void StreamBuffer::overwrite_int16(int64_t write_count_after, int16_t v) { + char *target = get_overwrite(2, write_count_after); + memcpy(target, &v, 2); +} + +void StreamBuffer::overwrite_int32(int64_t write_count_after, int32_t v) { + char *target = get_overwrite(4, write_count_after); + memcpy(target, &v, 4); +} + +void StreamBuffer::overwrite_int64(int64_t write_count_after, int64_t v) { + char *target = get_overwrite(8, write_count_after); + memcpy(target, &v, 8); +} + +int8_t StreamBuffer::read_int8() { + check_available(1); + int8_t v; + memcpy(&v, read_cursor_, 1); + read_cursor_ += 1; + return v; +} + +int16_t StreamBuffer::read_int16() { + check_available(2); + int16_t v; + memcpy(&v, read_cursor_, 2); + read_cursor_ += 2; + return v; +} + +int32_t StreamBuffer::read_int32() { + check_available(4); + int32_t v; + memcpy(&v, read_cursor_, 4); + read_cursor_ += 4; + return v; +} + +int64_t StreamBuffer::read_int64() { + check_available(8); + int64_t v; + memcpy(&v, read_cursor_, 8); + read_cursor_ += 8; + return v; +} + +const char *StreamBuffer::read_bytes(int64_t bytes) { + check_available(bytes); + char *data = read_cursor_; + read_cursor_ += bytes; + return data; +} + +std::string StreamBuffer::read_string(int64_t max_allowed) { + int64_t len = read_int8(); + if (len == 255) { + len = read_int64(); + } + if (len > max_allowed) throw StreamCorruption(); + const char *bytes = read_bytes(len); + return std::string(bytes, len); +} + +bool StreamBuffer::at_eof() { + return (read_cursor_ == write_cursor_); +} + +void StreamBuffer::verify_eof() { + if (read_cursor_ != write_cursor_) { + throw StreamCorruption(); + } +} + +void StreamBuffer::unread_to(int64_t rd_count) { + assert(rd_count >= pre_read_count_); + assert(rd_count <= read_count()); + read_cursor_ = buf_lo_ + (rd_count - pre_read_count_); +} + +int StreamBuffer::lua_writer(lua_State *L, const void* p, size_t sz, void* ud) { + StreamBuffer *sb = (StreamBuffer *)ud; + sb->write_bytes((const char *)p, sz); + return 0; +} + +void *StreamBuffer::lua_writer_ud() { + return this; +} + +const char *StreamBuffer::lua_reader(lua_State *L, void *ud, size_t *size) { + StreamBuffer *sb = (StreamBuffer *)ud; + *size = sb->lua_reader_size_; + const char *data = sb->lua_reader_data_; + // Next time the reader gets called, there's no data left. + sb->lua_reader_data_ = 0; + sb->lua_reader_size_ = 0; + return data; +} + +void *StreamBuffer::lua_reader_ud(int64_t size) { + const char *data = read_bytes(size); + lua_reader_data_ = data; + lua_reader_size_ = size; + return this; +} + +static bool streq(const char *str, const char *data) { + int len = strlen(str); + return memcmp(str, data, len) == 0; +} + +LuaDefine(unittests_streambuffer, "c") { + // An 11-byte fixed-size stream buffer. + StreamBuffer sb11(11, true); + + // Check the initial state. + assert(sb11.layout_is(0, 0, 11)); + + // Write a few bytes. + sb11.write_ztbytes("abcdef"); + assert(sb11.layout_is(0, 6, 5)); + + // Try reading some bytes. + assert(streq("abcd", sb11.read_bytes(4))); + assert(sb11.layout_is(4, 2, 5)); + + // Put back two bytes. + sb11.unread_to(2); + assert(sb11.layout_is(2, 4, 5)); + + // Read some more bytes. + assert(streq("cdef", sb11.read_bytes(4))); + assert(sb11.layout_is(6, 0, 5)); + + // Reading bytes now should raise an EOF and should not alter layout + try { + sb11.read_bytes(1); + assert(false && "This should have thrown an exception"); + } catch (StreamEof) {} + assert(sb11.layout_is(6, 0, 5)); + + // Write some more bytes into the stream, forcing a shift-left + sb11.write_ztbytes("ghijkl"); + assert(sb11.layout_is(0, 6, 5)); + + // Test buffer wrapping a little more. + assert(streq("ghi", sb11.read_bytes(3))); + assert(sb11.layout_is(3, 3, 5)); + sb11.write_ztbytes("mnopqr"); + assert(sb11.layout_is(0, 9, 2)); + assert(streq("jklmnopqr", sb11.read_bytes(9))); + assert(sb11.layout_is(9, 0, 2)); + + // Test 1-byte integer ops. + sb11.clear(); + for (int i = 0; i < 10; i++) { + sb11.write_int8(i); + sb11.write_int8(i+100); + assert(sb11.read_int8() == i); + assert(sb11.read_int8() == i+100); + } + + // Test 2-byte integer ops. + for (int i = 0; i < 10; i++) { + sb11.write_int16(i); + sb11.write_int16(i+10000); + assert(sb11.read_int16() == i); + assert(sb11.read_int16() == i+10000); + } + + // Test 4-byte integer ops. + for (int i = 0; i < 10; i++) { + sb11.write_int32(i); + sb11.write_int32(i+1000000); + assert(sb11.read_int32() == i); + assert(sb11.read_int32() == i+1000000); + } + + // Test 8-byte integer ops. + for (int i = 0; i < 10; i++) { + sb11.write_int64(i + 1000000); + assert(sb11.read_int64() == i + 1000000); + } + + // Check the write count and read count accumulator ability. + sb11.clear(); + assert(sb11.write_count() == 0); + assert(sb11.read_count() == 0); + for (int i = 0; i < 10; i++) { + assert(sb11.write_count() == i * 8); + sb11.write_int32(i); + sb11.write_int32(i+1000000); + assert(sb11.read_count() == i * 8); + assert(sb11.read_int32() == i); + assert(sb11.read_int32() == i+1000000); + } + + // Try clearing the buffer. + sb11.clear(); + assert(sb11.layout_is(0, 0, 11)); + + // Write a number then overwrite. + for (int i = 0; i < 2; i++) { + sb11.write_int16(12); + sb11.write_int16(34); + int64_t wc = sb11.write_count(); + sb11.write_int16(56); + sb11.write_int16(78); + sb11.overwrite_int16(wc, 90); + assert(sb11.read_int16() == 12); + assert(sb11.read_int16() == 90); + assert(sb11.read_int16() == 56); + assert(sb11.read_int16() == 78); + } + + // Try compact string encoding. + sb11.clear(); + sb11.write_string("abc"); + sb11.write_string(""); + sb11.write_string("de"); + assert(sb11.layout_is(0, 8, 3)); + assert(sb11.read_string(1000) == "abc"); + assert(sb11.read_string(1000) == ""); + assert(sb11.read_string(1000) == "de"); + + return 0; +} diff --git a/luprex/core/cpp/streambuffer.hpp b/luprex/core/cpp/streambuffer.hpp new file mode 100644 index 00000000..8467f4bf --- /dev/null +++ b/luprex/core/cpp/streambuffer.hpp @@ -0,0 +1,358 @@ +////////////////////////////////////////////////////////////// +// +// STREAMBUFFER +// +// Serves as a buffer for buffered I/O operations. Has rather sophisticated +// methods to help serialize and deserialize data. +// +// The semantics of this class contain a lot of subtlety! Please read the +// documentation carefully. +// +// TELLING LINUX TO READ A FILE DESCRIPTOR INTO A STREAMBUFFER +// +// It is possible to read from a linux file descriptor, directly into a stream +// buffer. You should do this, it's very efficient. Here is how you do it: +// +// // With linux read, you have to pick an arbitrary buffer size. +// const int bufsize = 16384; +// +// // Allocate transient space in the streambuffer. +// char *space = streambuffer.alloc_space(bufsize); +// +// // Call the linux 'read' function. +// ssize_t bytes_read = read(fd, space, bufsize); +// +// // Append the bytes read to the streambuffer. +// streambuffer.wrote_space(bytes_read); +// +// Now, let's dig into the semantics of this. The method 'alloc_space' MUST be +// followed by 'wrote_space'. It is an error to invoke these methods unless you +// do them in that sequence. Together, these two methods count as a single +// 'write' operation into the StreamBuffer. +// +// 'alloc_space' allocates a block of bytes within the StreamBuffer. The +// pointer returned here is only valid until the 'wrote_space' operation. The +// method 'wrote_space' tells the StreamBuffer that the space has been populated +// with the specified amount of data. The data is then officially appended to +// the StreamBuffer. Again, the two methods 'alloc_space' followed by +// 'wrote_space' together count as a single write operation. +// +// THE OVERWRITE_INT METHODS: +// +// These overwrite methods are meant to help deal with this situation: you want +// to write a length followed by some data, but you don't know the length until +// after you've written the data. The workaround: write a dummy length, then +// write the data, and then overwrite the previously-written length with the +// correct length. This is the construction that accomplishes this: +// +// // Write the dummy length, this will get overwritten. +// streambuffer.write_int32(0); +// +// // Write the data, and calculate its length in bytes. +// int64_t write_count_1 = streambuffer.write_count(); +// write_data(stream); +// int64_t write_count_2 = streambuffer.write_count(); +// int64_t data_len = write_count_2 - write_count_1; +// +// // Overwrite the previously-written dummy length. +// streambuffer.overwrite_int32(write_count_1, data_len); +// +// Almost all of this is self-explanatory, but the last line is interesting. In +// order to know what part of the buffer to overwrite, overwrite_int uses +// write_count_1 as a pointer into the buffer - it points immedately to the +// right of the integer to overwrite. +// +// OVERWRITE_INT LIMITS +// +// If you use write_int to write an integer into the buffer, you are allowed to +// overwrite that integer UNTIL you do a read from the buffer. Once you do a +// read, it is no longer legal to overwrite ints that you wrote BEFORE the read. +// +// WRITE_STRING STORES THE STRING LENGTH, WRITE_BYTES DOES NOT +// +// write_string writes a string into the buffer and prepends a length. The +// encoding of the length field is designed to be efficient for short strings +// but still capable of encoding long lengths. +// +// write_bytes doesn't store the data length in the buffer. It's just a raw +// write of bytes. +// +// STREAM EXCEPTIONS +// +// If you do a read_int64, but the buffer doesn't contain the necessary 8 bytes, +// it throws a StreamEof exception. In general, during reading, the following +// common situations generate StreamEof or StreamCorruption exceptions: +// +// * not enough bytes to satisfy a 'read' call: StreamEof +// * call read_eof, but the buffer is not empty: StreamCorruption +// * call read_string, but the string is unreasonably long: StreamCorruption +// +// Exceptions are only generated when reading from a stream that contains bad +// data. Any other error generates a full-blown abort. For example, if you try +// to write to a stream that's not open for writing, that's an abort, not an +// exception. Write operations never generate exceptions. +// +// Sometimes, it is convenient to throw StreamCorruption yourself, if you detect +// that the data you've read from a stream is invalid. This can make error +// handling a little cleaner. +// +// READ BYTES POINTER VALIDITY +// +// When you call read_bytes, it returns a pointer to a block of bytes. This +// pointer only remains valid until you do a 'write' into the stream. +// +// NESTED DECODING +// +// Here is an interesting construct: +// +// // Read a message from the stream. +// size_t len = streambuffer.read_int32() +// const char *bytes = streambuffer.read_bytes(len); +// +// // Construct another stream object to decode the message. +// StreamBuffer substream(bytes, len); +// decode(substream); +// +// This is perfectly valid and a potentially convenient way to parse the +// contents of a message. +// +// UNREADING BYTES +// +// It's possible to 'unread' bytes that you've already read from a stream. This +// makes it possible to read those same bytes again. +// +// A common situation where this might be useful is: you're decoding a message, +// but you discover halfway through the process of decoding the message that you +// haven't received the whole message yet. In that case, it may be desirable to +// unread the partial message, so that you can wait for the rest of the message +// to be received. +// +// Here is the construction that accomplishes this: +// +// // Get the stream's read count before parsing the message. +// size_t read_count_before = streambuffer.read_count(); +// +// // Parse the message, but if there's an EOF, deal with it: +// try { +// // Parse the message. +// int32_t value1 = streambuffer.read_int32(); +// std::string value2 = streambuffer.read_string(maxlen); +// int64_t value3 = streambuffer.read_int64(); +// +// // Great! I got the whole message. +// execute_message(value1, value2, value3); +// } catch (StreamEof) { +// // I ran out of bytes. Unread the message. +// streambuffer.unread(read_count_before); +// } +// +// UNREAD LIMITS +// +// If you read bytes from a stream, that data can be 'unread' until you do a +// write. After a write, it is no longer possible to 'unread' data that you +// read before the write. +// +// STREAMBUFFERS THAT DON'T OWN THEIR OWN MEMORY +// +// If you create a streambuffer using this constructor: +// +// StreamBuffer(const char *data, uint64_t len); +// +// This StreamBuffer reads from an external (unowned) block of bytes, which is +// not copied! The StreamBuffer saves the pointer that you passed in. This +// pointer must remain valid until you're done with the StreamBuffer. +// +// A StreamBuffer that reads from an external block of bytes is read-only. +// Attempts to write to this buffer will be caught and will cause an abort. The +// write_count for such a buffer returns the 'len' value that you initialized +// the buffer with. +// +// USING A STREAMBUFFER TO READ AN ENTIRE FILE +// +// If you wish to read an entire file and store the file contents in a +// StreamBuffer, you should probe the size of the file, then allocate a +// StreamBuffer of the correct size using this constructor: +// +// StreamBuffer(int64_t size); +// +// Then, you can use 'alloc_space' and 'wrote_space' to read the file into the +// buffer in a single read call. +// +// USING A STREAMBUFFER AS A LUA_WRITER OR LUA_READER +// +// You can use a streambuffer as a lua_Writer, as follows: +// +// lua_dump(L, stream.lua_writer(), stream.lua_writer_ud()); +// +// Anything written to the lua_writer gets appended to the streambuffer, the +// same as if it had been written using write_bytes. +// +// You can use a streambuffer as a lua_Reader, as follows: +// +// lua_load (L, stream.lua_reader(), stream.lua_reader_ud(nbytes), ...) +// +// The exact semantics of the lua_reader are tricky, so be careful: +// lua_reader_ud calls 'read_bytes' immediately, and it stores the bytes in a +// "cache of bytes for lua." Then, when the lua_reader gets invoked, the reader +// returns the entire contents of the cache, and it clears the cache. Here are +// some consequences of this design: +// +// 1. The number of bytes read from the stream is always exactly equal to +// nbytes, even if lua never calls the lua_reader. +// +// 2. If the stream doesn't contain nbytes, a StreamEof exception gets thrown +// from lua_reader_ud, not from the lua_Reader. This is good, because it +// means exceptions don't get thrown from inside the lua runtime. +// +////////////////////////////////////////////////////////////// + + +#ifndef STREAMBUFFER_HPP +#define STREAMBUFFER_HPP + +#include "luastack.hpp" +#include +#include +#include +#include + +class StreamException +{ +public: + virtual char const *what() const { return "General stream exception"; } +}; + +class StreamEof : public StreamException +{ +public: + virtual char const *what() const { return "Stream ran out of data"; } +}; + +class StreamCorruption : public StreamException +{ +public: + virtual char const *what() const { return "Stream contained invalid data"; } +}; + +class StreamBuffer { +private: + // Start and end of the allocated block. + char *buf_lo_; + char *buf_hi_; + + // The write and read cursors. + char *write_cursor_; + char *read_cursor_; + + // Number of bytes read before buffer was last aligned. + int64_t pre_read_count_; + + // True if we own this buffer. + bool owned_; + + // True if we're not allowed to expand this buffer. + bool fixed_size_; + + // Lua reader return value. + const char *lua_reader_data_; + int64_t lua_reader_size_; + + // Initialize with a new buffer. + void init(bool fixed, bool owned, char *buf, int64_t size); + + // Make the specified amount of space in the buffer for writing. + void make_space(int64_t bytes) { + int64_t available = buf_hi_ - write_cursor_; + if (available < bytes) make_space_slow(bytes); + } + void make_space_slow(int64_t bytes); + + // Make sure the specified number of bytes are available to read. + void check_available(int64_t bytes) { + int64_t avail = write_cursor_ - read_cursor_; + if (avail < bytes) { + throw StreamEof(); + } + } + + // Implementation for the overwrite_int functions. + char *get_overwrite(int64_t size, int64_t write_count_after); + + // This is mainly for unit testing. + bool layout_is(int64_t a, int64_t b, int64_t c); + + friend int unittests_streambuffer(lua_State *L); +public: + // Construct an empty buffer. + StreamBuffer(); + + // Construct an empty buffer, preallocate the specified amount of space. + StreamBuffer(int64_t size, bool fixed_size); + + // Construct a streambuffer that reads from an external block of bytes. + StreamBuffer(const char *s, int64_t len); + + // Delete a StreamBuffer. + ~StreamBuffer(); + + // Get the total number of bytes ever read from this buffer. + int64_t read_count() const; + + // Get the total number of bytes ever written to this buffer. + int64_t write_count() const; + + // Delete all data and (if not fixed-size) free the buffer. + void clear(); + + // Alloc_space and wrote_space need to be invoked together. + char *alloc_space(int64_t bytes); + void wrote_space(int64_t bytes); + + // Write values into the buffer. + void write_int8(int8_t v); + void write_int16(int16_t v); + void write_int32(int32_t v); + void write_int64(int64_t v); + void write_bytes(const char *bytes, int64_t len); + void write_string(const std::string &s); + void write_ztbytes(const char *bytes); + + // Overwrite values previously written to the buffer. + void overwrite_int8(int64_t write_count_after, int8_t value); + void overwrite_int16(int64_t write_count_after, int16_t value); + void overwrite_int32(int64_t write_count_after, int32_t value); + void overwrite_int64(int64_t write_count_after, int64_t value); + + // Read integers from the buffer. May throw StreamEof. + int8_t read_int8(); + int16_t read_int16(); + int32_t read_int32(); + int64_t read_int64(); + + // Read a string of no more than the specified length. May throw StreamEof + // or StreamCorruption. + std::string read_string(int64_t max_allowed); + + // Read a block of bytes. May throw StreamEof. + const char *read_bytes(int64_t bytes); + + // Return true if the buffer is empty. + bool at_eof(); + + // Verify that the buffer is empty, if not, throw StreamCorruption. + void verify_eof(); + + // Rewind the read cursor to a previous position. + void unread_to(int64_t read_count); + + // Use the stream buffer as a lua_Writer. + static int lua_writer(lua_State *L, const void* p, size_t sz, void* ud); + void *lua_writer_ud(); + + // Use the stream buffer as a lua_Reader. + static const char *lua_reader(lua_State *L, void *data, size_t *size); + void *lua_reader_ud(int64_t bytes); +}; + + +#endif // STREAMBUFFER_HPP