From 08ca274444a9f401f367ed9caedd5cd8c1f050bc Mon Sep 17 00:00:00 2001 From: Josh Yelon Date: Mon, 19 Jul 2021 17:32:24 -0400 Subject: [PATCH] Polish the streambuffer API a bit --- luprex/core/cpp/idalloc.cpp | 4 +- luprex/core/cpp/sched.cpp | 4 +- luprex/core/cpp/streambuffer.cpp | 255 +++++++++++++++++++++---------- luprex/core/cpp/streambuffer.hpp | 231 +++++++++++++++------------- luprex/core/cpp/world.cpp | 4 +- 5 files changed, 303 insertions(+), 195 deletions(-) diff --git a/luprex/core/cpp/idalloc.cpp b/luprex/core/cpp/idalloc.cpp index 0dbae3c2..57e26884 100644 --- a/luprex/core/cpp/idalloc.cpp +++ b/luprex/core/cpp/idalloc.cpp @@ -84,7 +84,7 @@ int64_t IdGlobalPool::alloc_id_for_thread(lua_State *L) { void IdGlobalPool::serialize(StreamBuffer *sb) { sb->write_int64(next_batch_); sb->write_int64(next_id_); - sb->write_size(salvaged_.size()); + sb->write_uint32(salvaged_.size()); for (int64_t batch : salvaged_) { sb->write_int64(batch); } @@ -93,7 +93,7 @@ void IdGlobalPool::serialize(StreamBuffer *sb) { void IdGlobalPool::deserialize(StreamBuffer *sb) { next_batch_ = sb->read_int64(); next_id_ = sb->read_int64(); - size_t salvaged_size = sb->read_size(); + uint32_t salvaged_size = sb->read_uint32(); salvaged_.resize(salvaged_size); for (int i=0; i < int(salvaged_size); i++) { salvaged_[i] = sb->read_int64(); diff --git a/luprex/core/cpp/sched.cpp b/luprex/core/cpp/sched.cpp index e8e25a14..a10fc405 100644 --- a/luprex/core/cpp/sched.cpp +++ b/luprex/core/cpp/sched.cpp @@ -26,7 +26,7 @@ SchedEntry Schedule::pop() { } void Schedule::serialize(StreamBuffer *sb) { - sb->write_size(schedule_.size()); + sb->write_uint32(schedule_.size()); for (const SchedEntry &entry : schedule_) { sb->write_int64(entry.clock_); sb->write_int64(entry.thread_id_); @@ -36,7 +36,7 @@ void Schedule::serialize(StreamBuffer *sb) { void Schedule::deserialize(StreamBuffer *sb) { schedule_.clear(); - size_t nentry = sb->read_size(); + size_t nentry = sb->read_uint32(); for (size_t i = 0; i < nentry; i++) { int64_t clock = sb->read_int64(); int64_t thread_id = sb->read_int64(); diff --git a/luprex/core/cpp/streambuffer.cpp b/luprex/core/cpp/streambuffer.cpp index 768ada6e..5834605b 100644 --- a/luprex/core/cpp/streambuffer.cpp +++ b/luprex/core/cpp/streambuffer.cpp @@ -103,38 +103,115 @@ void StreamBuffer::clear() { lua_reader_size_ = 0; } -char *StreamBuffer::alloc_space(int64_t bytes) { - assert(bytes >= 0); - make_space(bytes); - return write_cursor_; +// These routines return true if you can losslessly cast the +// specified value to the specified type. + +static inline bool safe_to_cast_to_int8(int64_t vv) { + return ((vv + 0x80LL) & 0xFFFFFFFFFFFFFF00LL) == 0; } -void StreamBuffer::wrote_space(int64_t bytes) { - int64_t maxbuf = buf_hi_ - write_cursor_; - assert(bytes >= 0); - assert(bytes <= maxbuf); - write_cursor_ += bytes; +static inline bool safe_to_cast_to_int16(int64_t vv) { + return ((vv + 0x8000LL) & 0xFFFFFFFFFFFF0000LL) == 0; } -void StreamBuffer::write_int8(int8_t v) { +static inline bool safe_to_cast_to_int32(int64_t vv) { + return ((vv + 0x80000000LL) & 0xFFFFFFFF00000000LL) == 0; +} + +static inline bool safe_to_cast_to_int64(int64_t vv) { + return true; +} + +static inline bool safe_to_cast_to_uint8(uint64_t vv) { + return (vv & 0xFFFFFFFFFFFFFF00LL) == 0; +} + +static inline bool safe_to_cast_to_uint16(uint64_t vv) { + return (vv & 0xFFFFFFFFFFFF0000LL) == 0; +} + +static inline bool safe_to_cast_to_uint32(uint64_t vv) { + return (vv & 0xFFFFFFFF00000000LL) == 0; +} + +static inline bool safe_to_cast_to_uint64(uint64_t vv) { + return true; +} + + +void StreamBuffer::write_bytes(const char *s, int64_t len) { + make_space(len); + memcpy(write_cursor_, s, len); + write_cursor_ += len; +} + + +const char *StreamBuffer::read_bytes(int64_t bytes) { + check_available(bytes); + char *data = read_cursor_; + read_cursor_ += bytes; + return data; +} + +void StreamBuffer::write_int8(int64_t vv) { + assert(safe_to_cast_to_int8(vv)); + int8_t v = vv; make_space(1); memcpy(write_cursor_, &v, 1); write_cursor_ += 1; } -void StreamBuffer::write_int16(int16_t v) { +void StreamBuffer::write_int16(int64_t vv) { + assert(safe_to_cast_to_int16(vv)); + int16_t v = vv; make_space(2); memcpy(write_cursor_, &v, 2); write_cursor_ += 2; } -void StreamBuffer::write_int32(int32_t v) { +void StreamBuffer::write_int32(int64_t vv) { + assert(safe_to_cast_to_int32(vv)); + int32_t v = vv; make_space(4); memcpy(write_cursor_, &v, 4); write_cursor_ += 4; } -void StreamBuffer::write_int64(int64_t v) { +void StreamBuffer::write_int64(int64_t vv) { + assert(safe_to_cast_to_int64(vv)); + int64_t v = vv; + make_space(8); + memcpy(write_cursor_, &v, 8); + write_cursor_ += 8; +} + +void StreamBuffer::write_uint8(uint64_t vv) { + assert(safe_to_cast_to_uint8(vv)); + uint8_t v = vv; + make_space(1); + memcpy(write_cursor_, &v, 1); + write_cursor_ += 1; +} + +void StreamBuffer::write_uint16(uint64_t vv) { + assert(safe_to_cast_to_uint16(vv)); + uint16_t v = vv; + make_space(2); + memcpy(write_cursor_, &v, 2); + write_cursor_ += 2; +} + +void StreamBuffer::write_uint32(uint64_t vv) { + assert(safe_to_cast_to_uint32(vv)); + uint32_t v = vv; + make_space(4); + memcpy(write_cursor_, &v, 4); + write_cursor_ += 4; +} + +void StreamBuffer::write_uint64(uint64_t vv) { + assert(safe_to_cast_to_uint64(vv)); + uint64_t v = vv; make_space(8); memcpy(write_cursor_, &v, 8); write_cursor_ += 8; @@ -152,53 +229,6 @@ void StreamBuffer::write_double(double d) { write_cursor_ += 8; } -void StreamBuffer::write_hashvalue(const util::HashValue &hv) { - make_space(16); - memcpy(write_cursor_, &hv, 16); - write_cursor_ += 16; -} - -void StreamBuffer::write_bytes(const char *s, int64_t len) { - make_space(len); - memcpy(write_cursor_, s, len); - write_cursor_ += len; -} - -void StreamBuffer::write_ztbytes(const char *s) { - write_bytes(s, strlen(s)); -} - -void StreamBuffer::write_string(const std::string &s) { - if (s.size() >= 255) { - write_uint8(0xFF); - write_uint64(s.size()); - write_bytes(s.c_str(), s.size()); - } else { - write_uint8(s.size()); - write_bytes(s.c_str(), s.size()); - } -} - -void StreamBuffer::overwrite_int8(int64_t write_count_after, int8_t v) { - char *target = get_overwrite(1, write_count_after); - memcpy(target, &v, 1); -} - -void StreamBuffer::overwrite_int16(int64_t write_count_after, int16_t v) { - char *target = get_overwrite(2, write_count_after); - memcpy(target, &v, 2); -} - -void StreamBuffer::overwrite_int32(int64_t write_count_after, int32_t v) { - char *target = get_overwrite(4, write_count_after); - memcpy(target, &v, 4); -} - -void StreamBuffer::overwrite_int64(int64_t write_count_after, int64_t v) { - char *target = get_overwrite(8, write_count_after); - memcpy(target, &v, 8); -} - int8_t StreamBuffer::read_int8() { check_available(1); int8_t v; @@ -231,14 +261,6 @@ int64_t StreamBuffer::read_int64() { return v; } -util::HashValue StreamBuffer::read_hashvalue() { - check_available(16); - util::HashValue hv; - memcpy(&hv, read_cursor_, 16); - read_cursor_ += 16; - return hv; -} - float StreamBuffer::read_float() { check_available(4); float f; @@ -255,23 +277,29 @@ double StreamBuffer::read_double() { return d; } -size_t StreamBuffer::read_size() { - return read_size_limit(0xFFFFFFF); +void StreamBuffer::write_hashvalue(const util::HashValue &hv) { + make_space(16); + memcpy(write_cursor_, &hv, 16); + write_cursor_ += 16; } -size_t StreamBuffer::read_size_limit(size_t limit) { - int64_t value = read_int64(); - if ((value < 0)||(value > int64_t(limit))) { - throw StreamCorruption(); +void StreamBuffer::write_string(const std::string &s) { + if (s.size() >= 255) { + write_uint8(0xFF); + write_uint64(s.size()); + write_bytes(s.c_str(), s.size()); + } else { + write_uint8(s.size()); + write_bytes(s.c_str(), s.size()); } - return size_t(value); } -const char *StreamBuffer::read_bytes(int64_t bytes) { - check_available(bytes); - char *data = read_cursor_; - read_cursor_ += bytes; - return data; +util::HashValue StreamBuffer::read_hashvalue() { + check_available(16); + util::HashValue hv; + memcpy(&hv, read_cursor_, 16); + read_cursor_ += 16; + return hv; } std::string StreamBuffer::read_string() { @@ -289,6 +317,63 @@ std::string StreamBuffer::read_string_limit(int64_t max_allowed) { return std::string(bytes, len); } + +void StreamBuffer::overwrite_int8(int64_t write_count_after, int64_t vv) { + assert(safe_to_cast_to_int8(vv)); + int8_t v = vv; + char *target = get_overwrite(1, write_count_after); + memcpy(target, &v, 1); +} + +void StreamBuffer::overwrite_int16(int64_t write_count_after, int64_t vv) { + assert(safe_to_cast_to_int16(vv)); + int16_t v = vv; + char *target = get_overwrite(2, write_count_after); + memcpy(target, &v, 2); +} + +void StreamBuffer::overwrite_int32(int64_t write_count_after, int64_t vv) { + assert(safe_to_cast_to_int32(vv)); + int32_t v = vv; + char *target = get_overwrite(4, write_count_after); + memcpy(target, &v, 4); +} + +void StreamBuffer::overwrite_int64(int64_t write_count_after, int64_t vv) { + assert(safe_to_cast_to_int64(vv)); + int64_t v = vv; + char *target = get_overwrite(8, write_count_after); + memcpy(target, &v, 8); +} + +void StreamBuffer::overwrite_uint8(int64_t write_count_after, uint64_t vv) { + assert(safe_to_cast_to_uint8(vv)); + uint8_t v = vv; + char *target = get_overwrite(1, write_count_after); + memcpy(target, &v, 1); +} + +void StreamBuffer::overwrite_uint16(int64_t write_count_after, uint64_t vv) { + assert(safe_to_cast_to_uint16(vv)); + uint16_t v = vv; + char *target = get_overwrite(2, write_count_after); + memcpy(target, &v, 2); +} + +void StreamBuffer::overwrite_uint32(int64_t write_count_after, uint64_t vv) { + assert(safe_to_cast_to_uint32(vv)); + uint32_t v = vv; + char *target = get_overwrite(4, write_count_after); + memcpy(target, &v, 4); +} + +void StreamBuffer::overwrite_uint64(int64_t write_count_after, uint64_t vv) { + assert(safe_to_cast_to_uint64(vv)); + uint64_t v = vv; + char *target = get_overwrite(8, write_count_after); + memcpy(target, &v, 8); +} + bool StreamBuffer::at_eof() { return (read_cursor_ == write_cursor_); } @@ -350,6 +435,10 @@ static bool streq(const char *str, const char *data) { return memcmp(str, data, len) == 0; } +static void write_ztbytes(StreamBuffer *sb, const char *bytes) { + sb->write_bytes(bytes, strlen(bytes)); +} + LuaDefine(unittests_streambuffer, "c") { // An 11-byte fixed-size stream buffer. StreamBuffer sb11(11, true); @@ -358,7 +447,7 @@ LuaDefine(unittests_streambuffer, "c") { assert(sb11.layout_is(0, 0, 11)); // Write a few bytes. - sb11.write_ztbytes("abcdef"); + write_ztbytes(&sb11, "abcdef"); assert(sb11.layout_is(0, 6, 5)); // Try reading some bytes. @@ -381,13 +470,13 @@ LuaDefine(unittests_streambuffer, "c") { assert(sb11.layout_is(6, 0, 5)); // Write some more bytes into the stream, forcing a shift-left - sb11.write_ztbytes("ghijkl"); + write_ztbytes(&sb11, "ghijkl"); assert(sb11.layout_is(0, 6, 5)); // Test buffer wrapping a little more. assert(streq("ghi", sb11.read_bytes(3))); assert(sb11.layout_is(3, 3, 5)); - sb11.write_ztbytes("mnopqr"); + write_ztbytes(&sb11, "mnopqr"); assert(sb11.layout_is(0, 9, 2)); assert(streq("jklmnopqr", sb11.read_bytes(9))); assert(sb11.layout_is(9, 0, 2)); diff --git a/luprex/core/cpp/streambuffer.hpp b/luprex/core/cpp/streambuffer.hpp index b97c65e8..9d9b0ade 100644 --- a/luprex/core/cpp/streambuffer.hpp +++ b/luprex/core/cpp/streambuffer.hpp @@ -237,6 +237,130 @@ public: }; class StreamBuffer { +public: + // Construct an empty buffer. + StreamBuffer(); + + // Construct an empty buffer, preallocate the specified amount of space. + StreamBuffer(int64_t size, bool fixed_size); + + // Construct a streambuffer that reads from an external block of bytes. + StreamBuffer(const char *s, int64_t len); + + // Delete a StreamBuffer. + ~StreamBuffer(); + + // Get the total number of bytes ever read from this buffer. + int64_t read_count() const; + + // Get the total number of bytes ever written to this buffer. + int64_t write_count() const; + + // Delete all data and (if not fixed-size) free the buffer. + // Also resets the read and write counts. + void clear(); + + // Write block of bytes into the buffer. + // + // Caution: this function doesn't write the length! + // It just writes the bytes. + // + void write_bytes(const char *bytes, int64_t len); + + // Read a block of bytes from the buffer. + // + // May throw StreamEof if the specified number of bytes aren't present. + // + const char *read_bytes(int64_t bytes); + + // Write integers and floats into the buffer. + // + // Note that integral parameters are all 64 bits. That's so that I can do + // runtime error checking to verify that the numbers are all in-range. + // + void write_int8(int64_t v); + void write_int16(int64_t v); + void write_int32(int64_t v); + void write_int64(int64_t v); + void write_uint8(uint64_t v); + void write_uint16(uint64_t v); + void write_uint32(uint64_t v); + void write_uint64(uint64_t v); + void write_float(float f); + void write_double(double d); + + // Read fixed-size integers from the buffer. + // + // May throw StreamEof if the specified number of bytes aren't present. + // + int8_t read_int8(); + int16_t read_int16(); + int32_t read_int32(); + int64_t read_int64(); + uint8_t read_uint8() { return read_int8(); } + uint16_t read_uint16() { return read_int16(); } + uint32_t read_uint32() { return read_int32(); } + uint64_t read_uint64() { return read_int64(); } + float read_float(); + double read_double(); + + // Write other types into the buffer. + // + // Note that strings are preceded by a length field. Reading + // a string works by reading the length field, and then reading + // the correct number of bytes. + // + void write_bool(bool b) { write_int8(b ? 1 : 0); } + void write_hashvalue(const util::HashValue &hv); + void write_string(const std::string &s); + + // Read other types from the buffer. + // + // May throw StreamEof if the specified number of bytes aren't present. + // Read string with a length limit will throw 'StreamCorruption' if the + // length is too long. + // + bool read_bool() { return read_int8(); } + util::HashValue read_hashvalue(); + std::string read_string(); + std::string read_string_limit(int64_t max_allowed); + + // Overwrite values previously written to the buffer. + // + // See the comment at the top of this file for an explanation. + // + void overwrite_int8(int64_t write_count_after, int64_t v); + void overwrite_int16(int64_t write_count_after, int64_t v); + void overwrite_int32(int64_t write_count_after, int64_t v); + void overwrite_int64(int64_t write_count_after, int64_t v); + void overwrite_uint8(int64_t write_count_after, uint64_t v); + void overwrite_uint16(int64_t write_count_after, uint64_t v); + void overwrite_uint32(int64_t write_count_after, uint64_t v); + void overwrite_uint64(int64_t write_count_after, uint64_t v); + + // Return true if the buffer is empty. + bool at_eof(); + + // Verify that the buffer is empty, if not, throw StreamCorruption. + void verify_eof(); + + // Rewind the read cursor to a previous position. + void unread_to(int64_t read_count); + + // Rewind the write cursor to a previous position. + void unwrite_to(int64_t write_count); + + // Calculate a noncryptographic but good hash of what's in the buffer. + util::HashValue hash() const; + + // Use the stream buffer as a lua_Writer. + static int lua_writer(lua_State *L, const void* p, size_t sz, void* ud); + void *lua_writer_ud(); + + // Use the stream buffer as a lua_Reader. + static const char *lua_reader(lua_State *L, void *data, size_t *size); + void *lua_reader_ud(int64_t bytes); + private: // Start and end of the allocated block. char *buf_lo_; @@ -280,115 +404,10 @@ private: // Implementation for the overwrite_int functions. char *get_overwrite(int64_t size, int64_t write_count_after); - // This is mainly for unit testing. + // This is for unit testing. bool layout_is(int64_t a, int64_t b, int64_t c); friend int unittests_streambuffer(lua_State *L); -public: - // Construct an empty buffer. - StreamBuffer(); - - // Construct an empty buffer, preallocate the specified amount of space. - StreamBuffer(int64_t size, bool fixed_size); - - // Construct a streambuffer that reads from an external block of bytes. - StreamBuffer(const char *s, int64_t len); - - // Delete a StreamBuffer. - ~StreamBuffer(); - - // Get the total number of bytes ever read from this buffer. - int64_t read_count() const; - - // Get the total number of bytes ever written to this buffer. - int64_t write_count() const; - - // Delete all data and (if not fixed-size) free the buffer. - // Also resets the read and write counts. - void clear(); - - // Alloc_space and wrote_space need to be invoked together. - char *alloc_space(int64_t bytes); - void wrote_space(int64_t bytes); - - // Write numbers into the buffer. - void write_int8(int8_t v); - void write_int16(int16_t v); - void write_int32(int32_t v); - void write_int64(int64_t v); - void write_float(float f); - void write_double(double d); - void write_hashvalue(const util::HashValue &hv); - void write_uint8(uint8_t v) { write_int8(v); } - void write_uint16(uint16_t v) { write_int16(v); } - void write_uint32(uint32_t v) { write_int32(v); } - void write_uint64(uint64_t v) { write_int64(v); } - void write_size(size_t sz) { write_int64(sz); } - void write_bool(bool b) { write_int8(b ? 1 : 0); } - - // Write strings or blocks of bytes into the buffer. - void write_string(const std::string &s); - void write_bytes(const char *bytes, int64_t len); - void write_ztbytes(const char *bytes); - - // Overwrite values previously written to the buffer. - void overwrite_int8(int64_t write_count_after, int8_t v); - void overwrite_int16(int64_t write_count_after, int16_t v); - void overwrite_int32(int64_t write_count_after, int32_t v); - void overwrite_int64(int64_t write_count_after, int64_t v); - void overwrite_uint8(int64_t write_count_after, uint8_t v) { overwrite_int8(write_count_after, v); } - void overwrite_uint16(int64_t write_count_after, uint16_t v) { overwrite_int16(write_count_after, v); } - void overwrite_uint32(int64_t write_count_after, uint32_t v) { overwrite_int32(write_count_after, v); } - void overwrite_uint64(int64_t write_count_after, uint64_t v) { overwrite_int64(write_count_after, v); } - - // Read numbers from the buffer. May throw StreamEof. - int8_t read_int8(); - int16_t read_int16(); - int32_t read_int32(); - int64_t read_int64(); - float read_float(); - double read_double(); - util::HashValue read_hashvalue(); - uint8_t read_uint8() { return read_int8(); } - uint16_t read_uint16() { return read_int16(); } - uint32_t read_uint32() { return read_int32(); } - uint64_t read_uint64() { return read_int64(); } - bool read_bool() { return read_int8(); } - - // May throw StreamEof or StreamCorruption. - size_t read_size(); - size_t read_size_limit(size_t limit); - - // Read a string of no more than the specified length. - // May throw StreamEof or StreamCorruption. - std::string read_string(); - std::string read_string_limit(int64_t max_allowed); - - // Read a block of bytes. May throw StreamEof. - const char *read_bytes(int64_t bytes); - - // Return true if the buffer is empty. - bool at_eof(); - - // Verify that the buffer is empty, if not, throw StreamCorruption. - void verify_eof(); - - // Rewind the read cursor to a previous position. - void unread_to(int64_t read_count); - - // Rewind the write cursor to a previous position. - void unwrite_to(int64_t write_count); - - // Calculate a noncryptographic but good hash of what's in the buffer. - util::HashValue hash() const; - - // Use the stream buffer as a lua_Writer. - static int lua_writer(lua_State *L, const void* p, size_t sz, void* ud); - void *lua_writer_ud(); - - // Use the stream buffer as a lua_Reader. - static const char *lua_reader(lua_State *L, void *data, size_t *size); - void *lua_reader_ud(int64_t bytes); }; diff --git a/luprex/core/cpp/world.cpp b/luprex/core/cpp/world.cpp index c7e340b4..d0bfb255 100644 --- a/luprex/core/cpp/world.cpp +++ b/luprex/core/cpp/world.cpp @@ -433,7 +433,7 @@ void World::serialize(StreamBuffer *sb) { lua_snap_.serialize(sb); id_global_pool_.serialize(sb); thread_sched_.serialize(sb); - sb->write_size(tangibles_.size()); + sb->write_uint32(tangibles_.size()); for (const auto &p : tangibles_) { sb->write_int64(p.first); p.second->serialize(sb); @@ -454,7 +454,7 @@ void World::deserialize(StreamBuffer *sb) { p.second->plane_item_.set_id(0); } // Deserialize tangibles. - size_t ntan = sb->read_size(); + size_t ntan = sb->read_uint32(); for (size_t i = 0; i < ntan; i++) { int64_t id = sb->read_int64(); std::unique_ptr &t = tangibles_[id];