#include "wrap-string.hpp" #include "eng-malloc.hpp" #include "streambuffer.hpp" #include "spookyv2.hpp" #include #include void StreamBuffer::init(bool fixed, bool owned, char *buf, int64_t size) { buf_lo_ = buf; buf_hi_ = buf_lo_ + size; read_cursor_ = buf_lo_; write_cursor_ = buf_lo_; pre_read_count_ = 0; owned_ = owned; fixed_size_ = fixed; lua_reader_data_ = 0; lua_reader_size_ = 0; } StreamBuffer::StreamBuffer() { init(false, true, 0, 0); } StreamBuffer::StreamBuffer(int64_t size, bool fixed) { assert(size >= 0); init(fixed, true, (char*)eng::malloc(size), size); } StreamBuffer::StreamBuffer(const char *s, int64_t size) { assert(size >= 0); init(true, false, const_cast(s), size); write_cursor_ = buf_hi_; } StreamBuffer::StreamBuffer(const eng::string &src) { init(true, false, const_cast(src.c_str()), src.size()); write_cursor_ = buf_hi_; } StreamBuffer::~StreamBuffer() { if (owned_ && (buf_lo_ != 0)) eng::free(buf_lo_); } int64_t StreamBuffer::total_reads() const { return (read_cursor_ - buf_lo_) + pre_read_count_; } int64_t StreamBuffer::total_writes() const { return (write_cursor_ - buf_lo_) + pre_read_count_; } int64_t StreamBuffer::fill() const { return write_cursor_ - read_cursor_; } const char *StreamBuffer::data() const { return read_cursor_; } std::string_view StreamBuffer::view() const { return std::string_view(read_cursor_, write_cursor_ - read_cursor_); } bool StreamBuffer::layout_is(int64_t a, int64_t b, int64_t c) { if (read_cursor_ - buf_lo_ != a) return false; if (write_cursor_ - read_cursor_ != b) return false; if (buf_hi_ - write_cursor_ != c) return false; return true; } void StreamBuffer::make_space_slow(int64_t bytes) { assert(owned_ && "We don't own this buffer, can't grow it"); // Decide whether the current buffer is big enough. int64_t data_size = (write_cursor_ - read_cursor_); int64_t existing_size = (buf_hi_ - buf_lo_); int64_t desired_size = 8192 + ((data_size + bytes) * 2); // Update some simple things. pre_read_count_ += (read_cursor_ - buf_lo_); lua_reader_data_ = 0; lua_reader_size_ = 0; // Move the data to the beginning of the buffer, or to // the beginning of a new buffer. if (fixed_size_) { assert((data_size + bytes <= existing_size) && "Not enough space in fixed-size buffer"); if (data_size > 0) memcpy(buf_lo_, read_cursor_, data_size); } else if (existing_size >= desired_size) { if (data_size > 0) memcpy(buf_lo_, read_cursor_, data_size); } else { char *nbuf = (char *)eng::malloc(desired_size); if (data_size > 0) memcpy(nbuf, read_cursor_, data_size); if (buf_lo_ != nullptr) eng::free(buf_lo_); buf_lo_ = nbuf; buf_hi_ = nbuf + desired_size; } // Update the pointers to the data region. read_cursor_ = buf_lo_; write_cursor_ = buf_lo_ + data_size; } void StreamBuffer::wrote_space(int64_t bytes) { int64_t available = buf_hi_ - write_cursor_; assert(bytes >= 0); assert(available >= bytes); write_cursor_ += bytes; } char *StreamBuffer::get_overwrite(int64_t size, int64_t write_count_after) { int64_t write_count_before = write_count_after - size; assert(write_count_before >= total_reads()); assert(write_count_after <= total_writes()); return buf_lo_ + (write_count_before - pre_read_count_); } void StreamBuffer::clear() { assert(owned_); if (!fixed_size_) { if (buf_lo_ != nullptr) eng::free(buf_lo_); buf_lo_ = 0; buf_hi_ = 0; } owned_ = true; read_cursor_ = buf_lo_; write_cursor_ = buf_lo_; pre_read_count_ = 0; lua_reader_data_ = 0; lua_reader_size_ = 0; } eng::string StreamBuffer::readline() { char *p = read_cursor_; while ((p < write_cursor_) && (*p != '\n')) p++; if (p == write_cursor_) { return ""; } else { p++; eng::string result(read_cursor_, p - read_cursor_); read_cursor_ = p; return result; } } // These routines return true if you can losslessly cast the // specified value to the specified type. static inline bool safe_to_cast_to_int8(int64_t vv) { return ((vv + 0x80LL) & 0xFFFFFFFFFFFFFF00LL) == 0; } static inline bool safe_to_cast_to_int16(int64_t vv) { return ((vv + 0x8000LL) & 0xFFFFFFFFFFFF0000LL) == 0; } static inline bool safe_to_cast_to_int32(int64_t vv) { return ((vv + 0x80000000LL) & 0xFFFFFFFF00000000LL) == 0; } static inline bool safe_to_cast_to_int64(int64_t vv) { return true; } static inline bool safe_to_cast_to_uint8(uint64_t vv) { return (vv & 0xFFFFFFFFFFFFFF00LL) == 0; } static inline bool safe_to_cast_to_uint16(uint64_t vv) { return (vv & 0xFFFFFFFFFFFF0000LL) == 0; } static inline bool safe_to_cast_to_uint32(uint64_t vv) { return (vv & 0xFFFFFFFF00000000LL) == 0; } static inline bool safe_to_cast_to_uint64(uint64_t vv) { return true; } void StreamBuffer::write_bytes(const char *s, int64_t len) { make_space(len); memcpy(write_cursor_, s, len); write_cursor_ += len; } void StreamBuffer::write_bytes(std::string_view s) { make_space(s.size()); memcpy(write_cursor_, s.data(), s.size()); write_cursor_ += s.size(); } const char *StreamBuffer::read_bytes(int64_t bytes) { check_available(bytes); char *data = read_cursor_; read_cursor_ += bytes; return data; } void StreamBuffer::write_int8(int64_t vv) { assert(safe_to_cast_to_int8(vv)); int8_t v = vv; make_space(1); memcpy(write_cursor_, &v, 1); write_cursor_ += 1; } void StreamBuffer::write_int16(int64_t vv) { assert(safe_to_cast_to_int16(vv)); int16_t v = vv; make_space(2); memcpy(write_cursor_, &v, 2); write_cursor_ += 2; } void StreamBuffer::write_int32(int64_t vv) { assert(safe_to_cast_to_int32(vv)); int32_t v = vv; make_space(4); memcpy(write_cursor_, &v, 4); write_cursor_ += 4; } void StreamBuffer::write_int64(int64_t vv) { assert(safe_to_cast_to_int64(vv)); int64_t v = vv; make_space(8); memcpy(write_cursor_, &v, 8); write_cursor_ += 8; } void StreamBuffer::write_uint8(uint64_t vv) { assert(safe_to_cast_to_uint8(vv)); uint8_t v = vv; make_space(1); memcpy(write_cursor_, &v, 1); write_cursor_ += 1; } void StreamBuffer::write_uint16(uint64_t vv) { assert(safe_to_cast_to_uint16(vv)); uint16_t v = vv; make_space(2); memcpy(write_cursor_, &v, 2); write_cursor_ += 2; } void StreamBuffer::write_uint32(uint64_t vv) { assert(safe_to_cast_to_uint32(vv)); uint32_t v = vv; make_space(4); memcpy(write_cursor_, &v, 4); write_cursor_ += 4; } void StreamBuffer::write_uint64(uint64_t vv) { assert(safe_to_cast_to_uint64(vv)); uint64_t v = vv; make_space(8); memcpy(write_cursor_, &v, 8); write_cursor_ += 8; } void StreamBuffer::write_char(char c) { make_space(1); write_cursor_[0] = c; write_cursor_ += 1; } void StreamBuffer::write_float(float f) { make_space(4); memcpy(write_cursor_, &f, 4); write_cursor_ += 4; } void StreamBuffer::write_double(double d) { make_space(8); memcpy(write_cursor_, &d, 8); write_cursor_ += 8; } int8_t StreamBuffer::read_int8() { check_available(1); int8_t v; memcpy(&v, read_cursor_, 1); read_cursor_ += 1; return v; } int16_t StreamBuffer::read_int16() { check_available(2); int16_t v; memcpy(&v, read_cursor_, 2); read_cursor_ += 2; return v; } int32_t StreamBuffer::read_int32() { check_available(4); int32_t v; memcpy(&v, read_cursor_, 4); read_cursor_ += 4; return v; } int64_t StreamBuffer::read_int64() { check_available(8); int64_t v; memcpy(&v, read_cursor_, 8); read_cursor_ += 8; return v; } char StreamBuffer::read_char() { check_available(1); char c = read_cursor_[0]; read_cursor_ += 1; return c; } float StreamBuffer::read_float() { check_available(4); float f; memcpy(&f, read_cursor_, 4); read_cursor_ += 4; return f; } double StreamBuffer::read_double() { check_available(8); double d; memcpy(&d, read_cursor_, 8); read_cursor_ += 8; return d; } void StreamBuffer::write_hashvalue(const util::HashValue &hv) { write_uint64(hv.first); write_uint64(hv.second); } void StreamBuffer::write_string(std::string_view s) { if (s.size() >= 255) { write_uint8(0xFF); write_uint64(s.size()); write_bytes(s); } else { write_uint8(s.size()); write_bytes(s); } } util::HashValue StreamBuffer::read_hashvalue() { uint64_t f = read_uint64(); uint64_t s = read_uint64(); return util::HashValue(f,s); } eng::string StreamBuffer::read_string() { return read_string_limit(0xFFFFFFF); } eng::string StreamBuffer::read_string_limit(int64_t max_allowed) { int64_t len = read_uint8(); if (len == 255) { len = read_int64(); } if (len < 0) throw StreamCorruption(); if (len > max_allowed) throw StreamCorruption(); const char *bytes = read_bytes(len); return eng::string(bytes, len); } eng::string StreamBuffer::read_entire_contents() { eng::string result(read_cursor_, fill()); read_cursor_ = write_cursor_; return result; } void StreamBuffer::overwrite_int8(int64_t write_count_after, int64_t vv) { assert(safe_to_cast_to_int8(vv)); int8_t v = vv; char *target = get_overwrite(1, write_count_after); memcpy(target, &v, 1); } void StreamBuffer::overwrite_int16(int64_t write_count_after, int64_t vv) { assert(safe_to_cast_to_int16(vv)); int16_t v = vv; char *target = get_overwrite(2, write_count_after); memcpy(target, &v, 2); } void StreamBuffer::overwrite_int32(int64_t write_count_after, int64_t vv) { assert(safe_to_cast_to_int32(vv)); int32_t v = vv; char *target = get_overwrite(4, write_count_after); memcpy(target, &v, 4); } void StreamBuffer::overwrite_int64(int64_t write_count_after, int64_t vv) { assert(safe_to_cast_to_int64(vv)); int64_t v = vv; char *target = get_overwrite(8, write_count_after); memcpy(target, &v, 8); } void StreamBuffer::overwrite_uint8(int64_t write_count_after, uint64_t vv) { assert(safe_to_cast_to_uint8(vv)); uint8_t v = vv; char *target = get_overwrite(1, write_count_after); memcpy(target, &v, 1); } void StreamBuffer::overwrite_uint16(int64_t write_count_after, uint64_t vv) { assert(safe_to_cast_to_uint16(vv)); uint16_t v = vv; char *target = get_overwrite(2, write_count_after); memcpy(target, &v, 2); } void StreamBuffer::overwrite_uint32(int64_t write_count_after, uint64_t vv) { assert(safe_to_cast_to_uint32(vv)); uint32_t v = vv; char *target = get_overwrite(4, write_count_after); memcpy(target, &v, 4); } void StreamBuffer::overwrite_uint64(int64_t write_count_after, uint64_t vv) { assert(safe_to_cast_to_uint64(vv)); uint64_t v = vv; char *target = get_overwrite(8, write_count_after); memcpy(target, &v, 8); } bool StreamBuffer::empty() { return (read_cursor_ == write_cursor_); } void StreamBuffer::verify_empty() { if (read_cursor_ != write_cursor_) { throw StreamCorruption(); } } void StreamBuffer::unread_to(int64_t rd_count) { assert(rd_count >= pre_read_count_); assert(rd_count <= total_reads()); read_cursor_ = buf_lo_ + (rd_count - pre_read_count_); } void StreamBuffer::unwrite_to(int64_t wr_count) { assert(wr_count >= total_reads()); assert(wr_count <= total_writes()); write_cursor_ = buf_lo_ + (wr_count - pre_read_count_); } void StreamBuffer::copy_into(StreamBuffer *sb) { sb->write_bytes(read_cursor_, write_cursor_ - read_cursor_); } void StreamBuffer::transfer_into(StreamBuffer *sb) { sb->write_bytes(read_cursor_, write_cursor_ - read_cursor_); read_cursor_ = write_cursor_; } bool StreamBuffer::contents_equal(const StreamBuffer *other) const { int64_t len = fill(); if (len != other->fill()) { return false; } return memcmp(read_cursor_, other->read_cursor_, len) == 0; } util::HashValue StreamBuffer::hash() const { uint64_t hash1 = 0x82A7912E7893AC87; uint64_t hash2 = 0x81D402740DE458F3; SpookyHash::ChainHash128(read_cursor_, write_cursor_ - read_cursor_, &hash1, &hash2); return std::make_pair(hash1, hash2); } int StreamBuffer::lua_writer(lua_State *L, const void* p, size_t sz, void* ud) { StreamBuffer *sb = (StreamBuffer *)ud; memcpy(sb->make_space(sz), p, sz); sb->write_cursor_ += sz; return 0; } void *StreamBuffer::lua_writer_ud() { return this; } const char *StreamBuffer::lua_reader(lua_State *L, void *ud, size_t *size) { StreamBuffer *sb = (StreamBuffer *)ud; *size = sb->lua_reader_size_; const char *data = sb->lua_reader_data_; // Next time the reader gets called, there's no data left. sb->lua_reader_data_ = 0; sb->lua_reader_size_ = 0; return data; } void *StreamBuffer::lua_reader_ud(int64_t size) { const char *data = read_bytes(size); lua_reader_data_ = data; lua_reader_size_ = size; return this; } class StreamBufferWriter : public std::streambuf, public eng::opnew { private: StreamBuffer *target_; public: StreamBufferWriter(StreamBuffer *t) : target_(t) {} virtual int_type overflow(int_type c) { if (c != EOF) { target_->write_uint8(c); } return c; } }; class StreamBufferOStream : public std::ostream, public eng::opnew { private: StreamBufferWriter writer_; public: StreamBufferOStream(StreamBuffer *t) : std::ostream(nullptr), writer_(t) { rdbuf(&writer_); } virtual ~StreamBufferOStream() { } }; std::ostream &StreamBuffer::ostream() { if (ostream_ == nullptr) { ostream_.reset(new StreamBufferOStream(this)); } return *ostream_; } static bool streq(const char *str, const char *data) { int len = strlen(str); return memcmp(str, data, len) == 0; } static void write_ztbytes(StreamBuffer *sb, const char *bytes) { sb->write_bytes(bytes, strlen(bytes)); } LuaDefine(unittests_streambuffer, "", "some unit tests") { // An 11-byte fixed-size stream buffer. StreamBuffer sb11(11, true); // Check the initial state. LuaAssert(L, sb11.layout_is(0, 0, 11)); // Write a few bytes. write_ztbytes(&sb11, "abcdef"); LuaAssert(L, sb11.layout_is(0, 6, 5)); // Try reading some bytes. LuaAssert(L, streq("abcd", sb11.read_bytes(4))); LuaAssert(L, sb11.layout_is(4, 2, 5)); // Put back two bytes. sb11.unread_to(2); LuaAssert(L, sb11.layout_is(2, 4, 5)); // Read some more bytes. LuaAssert(L, streq("cdef", sb11.read_bytes(4))); LuaAssert(L, sb11.layout_is(6, 0, 5)); // Reading bytes now should raise an EOF and should not alter layout try { sb11.read_bytes(1); LuaAssert(L, false && "This should have thrown an exception"); } catch (const StreamEof &) {} LuaAssert(L, sb11.layout_is(6, 0, 5)); // Write some more bytes into the stream, forcing a shift-left write_ztbytes(&sb11, "ghijkl"); LuaAssert(L, sb11.layout_is(0, 6, 5)); // Test buffer wrapping a little more. LuaAssert(L, streq("ghi", sb11.read_bytes(3))); LuaAssert(L, sb11.layout_is(3, 3, 5)); write_ztbytes(&sb11, "mnopqr"); LuaAssert(L, sb11.layout_is(0, 9, 2)); LuaAssert(L, streq("jklmnopqr", sb11.read_bytes(9))); LuaAssert(L, sb11.layout_is(9, 0, 2)); // Test 1-byte integer ops. sb11.clear(); for (int i = 0; i < 10; i++) { sb11.write_int8(i); sb11.write_int8(i+100); LuaAssert(L, sb11.read_int8() == i); LuaAssert(L, sb11.read_int8() == i+100); } // Test 2-byte integer ops. for (int i = 0; i < 10; i++) { sb11.write_int16(i); sb11.write_int16(i+10000); LuaAssert(L, sb11.read_int16() == i); LuaAssert(L, sb11.read_int16() == i+10000); } // Test 4-byte integer ops. for (int i = 0; i < 10; i++) { sb11.write_int32(i); sb11.write_int32(i+1000000); LuaAssert(L, sb11.read_int32() == i); LuaAssert(L, sb11.read_int32() == i+1000000); } // Test 8-byte integer ops. for (int i = 0; i < 10; i++) { sb11.write_int64(i + 1000000); LuaAssert(L, sb11.read_int64() == i + 1000000); } // Check the write count and read count accumulator ability. sb11.clear(); LuaAssert(L, sb11.total_writes() == 0); LuaAssert(L, sb11.total_reads() == 0); for (int i = 0; i < 10; i++) { LuaAssert(L, sb11.total_writes() == i * 8); sb11.write_int32(i); sb11.write_int32(i+1000000); LuaAssert(L, sb11.total_reads() == i * 8); LuaAssert(L, sb11.read_int32() == i); LuaAssert(L, sb11.read_int32() == i+1000000); } // Try clearing the buffer. sb11.clear(); LuaAssert(L, sb11.layout_is(0, 0, 11)); // Write a number then overwrite. for (int i = 0; i < 2; i++) { sb11.write_int16(12); sb11.write_int16(34); int64_t wc = sb11.total_writes(); sb11.write_int16(56); sb11.write_int16(78); sb11.overwrite_int16(wc, 90); LuaAssert(L, sb11.read_int16() == 12); LuaAssert(L, sb11.read_int16() == 90); LuaAssert(L, sb11.read_int16() == 56); LuaAssert(L, sb11.read_int16() == 78); } // Try compact string encoding. sb11.clear(); sb11.write_string("abc"); sb11.write_string(""); sb11.write_string("de"); LuaAssert(L, sb11.layout_is(0, 8, 3)); LuaAssert(L, sb11.read_string() == "abc"); LuaAssert(L, sb11.read_string() == ""); LuaAssert(L, sb11.read_string() == "de"); // Make sure that contents_equal is not obviously broken. StreamBuffer eqsb1, eqsb2; eqsb1.write_int32(12); eqsb1.write_int32(34); eqsb2.write_int32(34); LuaAssert(L, !eqsb1.contents_equal(&eqsb2)); eqsb1.read_int32(); LuaAssert(L, eqsb1.contents_equal(&eqsb2)); eqsb1.write_int32(34); LuaAssert(L, !eqsb1.contents_equal(&eqsb2)); // Check the OStream functionality. StreamBuffer ossb; ossb.ostream() << "Testing."; ossb.ostream() << "Foo."; LuaAssertStrEq(L, ossb.read_entire_contents(), "Testing.Foo."); return 0; }