Move most StreamBuffer code into base-buffer.hpp

This commit is contained in:
2023-10-18 17:23:05 -04:00
parent 6c2a27b274
commit 049b0b893a
10 changed files with 822 additions and 808 deletions

View File

@@ -93,7 +93,7 @@
#include "wrap-deque.hpp"
#include "wrap-unordered-map.hpp"
#include "base-writer.hpp"
#include "base-buffer.hpp"
#include "luastack.hpp"
#include "streambuffer.hpp"
#include "debugcollector.hpp"

View File

@@ -3,6 +3,7 @@
#include "util.hpp"
#include "drivenengine.hpp"
#include "world.hpp"
#include "base-buffer.hpp"
#include <string_view>
#include <utility>
@@ -481,7 +482,7 @@ void DrivenEngine::drv_invoke_event_update(double clock) {
}
void DrivenEngine::drv_set_lua_source(uint32_t srcpklen, const char *srcpk) {
StreamBuffer sb(srcpk, srcpklen);
StreamBuffer sb(std::string_view(srcpk, srcpklen));
uint32_t nfiles = sb.read_uint32();
lua_source_.reset(new util::LuaSourceVec);
lua_source_->resize(nfiles);

View File

@@ -204,10 +204,7 @@ public:
int64_t nactor = world_->patch_everything(sb, &dbc);
if (nactor != actor_id_) change_actor_id(nactor);
dbc.dump(stdostream());
} catch (const StreamEof &seof) {
abandon_server();
return;
} catch (const StreamCorruption &scorr) {
} catch (const StreamException &sexcept) {
abandon_server();
return;
}
@@ -222,7 +219,8 @@ public:
sb->unread_to(tr_before);
return false;
}
StreamBuffer body(sb->read_bytes(message_body_len), message_body_len);
const char *message_body = sb->read_bytes(message_body_len);
StreamBuffer body(std::string_view(message_body, message_body_len));
if (message_type == util::MSG_ACK) {
receive_ack_from_server(&body);
} else if (message_type == util::MSG_DIFF) {

View File

@@ -146,7 +146,7 @@ public:
return false;
}
inv.deserialize(sb);
} catch (const StreamEof &seof) {
} catch (const StreamEofOnRead &seof) {
sb->unread_to(tr_before);
return false;
} catch (const StreamCorruption &scorr) {

View File

@@ -116,7 +116,7 @@ public:
}
uint8_t b = sb_->read_uint8();
deserialize_r(b, val);
} catch (const StreamEof &e) {
} catch (const StreamException &e) {
error_ = "EOF reached while deserializing data";
lua_settop(LS_.state(), top);
LS_.set(val, LuaNil);

View File

@@ -7,320 +7,6 @@
#include <cassert>
#include <cstring>
void StreamBuffer::init(bool fixed, bool owned, char *buf, int64_t size) {
buf_lo_ = buf;
buf_hi_ = buf_lo_ + size;
read_cursor_ = buf_lo_;
write_cursor_ = buf_lo_;
pre_read_count_ = 0;
owned_ = owned;
fixed_size_ = fixed;
}
StreamBuffer::StreamBuffer() {
init(false, true, 0, 0);
}
StreamBuffer::StreamBuffer(int64_t size, bool fixed) {
assert(size >= 0);
init(fixed, true, (char*)eng::malloc(size), size);
}
StreamBuffer::StreamBuffer(const char *s, int64_t size) {
assert(size >= 0);
init(true, false, const_cast<char *>(s), size);
write_cursor_ = buf_hi_;
}
StreamBuffer::StreamBuffer(std::string_view data) {
init(true, false, const_cast<char *>(data.data()), data.size());
write_cursor_ = buf_hi_;
}
StreamBuffer::~StreamBuffer() {
if (owned_ && (buf_lo_ != 0)) eng::free(buf_lo_);
}
int64_t StreamBuffer::total_reads() const {
return (read_cursor_ - buf_lo_) + pre_read_count_;
}
int64_t StreamBuffer::total_writes() const {
return (write_cursor_ - buf_lo_) + pre_read_count_;
}
int64_t StreamBuffer::fill() const {
return write_cursor_ - read_cursor_;
}
const char *StreamBuffer::data() const {
return read_cursor_;
}
std::string_view StreamBuffer::view() const {
return std::string_view(read_cursor_, write_cursor_ - read_cursor_);
}
bool StreamBuffer::layout_is(int64_t a, int64_t b, int64_t c) {
if (read_cursor_ - buf_lo_ != a) return false;
if (write_cursor_ - read_cursor_ != b) return false;
if (buf_hi_ - write_cursor_ != c) return false;
return true;
}
void StreamBuffer::make_space_slow(int64_t bytes) {
assert(owned_ && "We don't own this buffer, can't grow it");
// Decide whether the current buffer is big enough.
int64_t data_size = (write_cursor_ - read_cursor_);
int64_t existing_size = (buf_hi_ - buf_lo_);
int64_t desired_size = 8192 + ((data_size + bytes) * 2);
// Update some simple things.
pre_read_count_ += (read_cursor_ - buf_lo_);
// Move the data to the beginning of the buffer, or to
// the beginning of a new buffer.
if (fixed_size_) {
assert((data_size + bytes <= existing_size) && "Not enough space in fixed-size buffer");
if (data_size > 0) memcpy(buf_lo_, read_cursor_, data_size);
} else if (existing_size >= desired_size) {
if (data_size > 0) memcpy(buf_lo_, read_cursor_, data_size);
} else {
char *nbuf = (char *)eng::malloc(desired_size);
if (data_size > 0) memcpy(nbuf, read_cursor_, data_size);
if (buf_lo_ != nullptr) eng::free(buf_lo_);
buf_lo_ = nbuf;
buf_hi_ = nbuf + desired_size;
}
// Update the pointers to the data region.
read_cursor_ = buf_lo_;
write_cursor_ = buf_lo_ + data_size;
}
void StreamBuffer::wrote_space(int64_t bytes) {
int64_t available = buf_hi_ - write_cursor_;
assert(bytes >= 0);
assert(available >= bytes);
write_cursor_ += bytes;
}
char *StreamBuffer::get_overwrite(int64_t size, int64_t write_count_after) {
int64_t write_count_before = write_count_after - size;
assert(write_count_before >= total_reads());
assert(write_count_after <= total_writes());
return buf_lo_ + (write_count_before - pre_read_count_);
}
void StreamBuffer::clear() {
assert(owned_);
if (!fixed_size_) {
if (buf_lo_ != nullptr) eng::free(buf_lo_);
buf_lo_ = 0;
buf_hi_ = 0;
}
owned_ = true;
read_cursor_ = buf_lo_;
write_cursor_ = buf_lo_;
pre_read_count_ = 0;
}
eng::string StreamBuffer::readline() {
char *p = read_cursor_;
while ((p < write_cursor_) && (*p != '\n')) p++;
if (p == write_cursor_) {
return "";
} else {
p++;
eng::string result(read_cursor_, p - read_cursor_);
read_cursor_ = p;
return result;
}
}
void StreamBuffer::write_bytes(const char *s, int64_t len) {
make_space(len);
memcpy(write_cursor_, s, len);
write_cursor_ += len;
}
void StreamBuffer::write_bytes(std::string_view s) {
write_bytes(s.data(), s.size());
}
const char *StreamBuffer::read_bytes(int64_t bytes) {
check_available(bytes);
char *data = read_cursor_;
read_cursor_ += bytes;
return data;
}
void StreamBuffer::read_bytes_into(char *data, int64_t size) {
check_available(size);
memcpy(data, read_cursor_, size);
read_cursor_ += size;
}
std::string_view StreamBuffer::read_string_view_limit(uint64_t limit) {
uint64_t length = read_length();
if (length > limit) throw StreamCorruption();
check_available(length);
std::string_view result(read_cursor_, length);
read_cursor_ += length;
return result;
}
void StreamBuffer::write_xyz(const util::XYZ &xyz) {
make_space(12);
memcpy(write_cursor_, &xyz.x, 4);
write_cursor_ += 4;
memcpy(write_cursor_, &xyz.y, 4);
write_cursor_ += 4;
memcpy(write_cursor_, &xyz.z, 4);
write_cursor_ += 4;
}
void StreamBuffer::write_dxyz(const util::DXYZ &xyz) {
make_space(24);
memcpy(write_cursor_, &xyz.x, 8);
write_cursor_ += 8;
memcpy(write_cursor_, &xyz.y, 8);
write_cursor_ += 8;
memcpy(write_cursor_, &xyz.z, 8);
write_cursor_ += 8;
}
util::XYZ StreamBuffer::read_xyz() {
check_available(12);
util::XYZ result;
memcpy(&result.x, read_cursor_, 4);
read_cursor_ += 4;
memcpy(&result.y, read_cursor_, 4);
read_cursor_ += 4;
memcpy(&result.z, read_cursor_, 4);
read_cursor_ += 4;
return result;
}
util::DXYZ StreamBuffer::read_dxyz() {
check_available(24);
util::DXYZ result;
memcpy(&result.x, read_cursor_, 8);
read_cursor_ += 8;
memcpy(&result.y, read_cursor_, 8);
read_cursor_ += 8;
memcpy(&result.z, read_cursor_, 8);
read_cursor_ += 8;
return result;
}
void StreamBuffer::write_hashvalue(const util::HashValue &hv) {
write_uint64(hv.first);
write_uint64(hv.second);
}
util::HashValue StreamBuffer::read_hashvalue() {
uint64_t f = read_uint64();
uint64_t s = read_uint64();
return util::HashValue(f,s);
}
void StreamBuffer::overwrite_int8(int64_t write_count_after, int64_t vv) {
int8_t v = vv;
assert(int64_t(v) == vv);
char *target = get_overwrite(1, write_count_after);
memcpy(target, &v, 1);
}
void StreamBuffer::overwrite_int16(int64_t write_count_after, int64_t vv) {
int16_t v = vv;
assert(int64_t(v) == vv);
char *target = get_overwrite(2, write_count_after);
memcpy(target, &v, 2);
}
void StreamBuffer::overwrite_int32(int64_t write_count_after, int64_t vv) {
int32_t v = vv;
assert(int64_t(v) == vv);
char *target = get_overwrite(4, write_count_after);
memcpy(target, &v, 4);
}
void StreamBuffer::overwrite_int64(int64_t write_count_after, int64_t vv) {
int64_t v = vv;
assert(int64_t(v) == vv);
char *target = get_overwrite(8, write_count_after);
memcpy(target, &v, 8);
}
void StreamBuffer::overwrite_uint8(int64_t write_count_after, uint64_t vv) {
uint8_t v = vv;
assert(uint64_t(v) == vv);
char *target = get_overwrite(1, write_count_after);
memcpy(target, &v, 1);
}
void StreamBuffer::overwrite_uint16(int64_t write_count_after, uint64_t vv) {
uint16_t v = vv;
assert(uint64_t(v) == vv);
char *target = get_overwrite(2, write_count_after);
memcpy(target, &v, 2);
}
void StreamBuffer::overwrite_uint32(int64_t write_count_after, uint64_t vv) {
uint32_t v = vv;
assert(uint64_t(v) == vv);
char *target = get_overwrite(4, write_count_after);
memcpy(target, &v, 4);
}
void StreamBuffer::overwrite_uint64(int64_t write_count_after, uint64_t vv) {
uint64_t v = vv;
assert(uint64_t(v) == vv);
char *target = get_overwrite(8, write_count_after);
memcpy(target, &v, 8);
}
bool StreamBuffer::empty() {
return (read_cursor_ == write_cursor_);
}
void StreamBuffer::verify_empty() {
if (read_cursor_ != write_cursor_) {
throw StreamCorruption();
}
}
void StreamBuffer::unread_to(int64_t rd_count) {
assert(rd_count >= pre_read_count_);
assert(rd_count <= total_reads());
read_cursor_ = buf_lo_ + (rd_count - pre_read_count_);
}
void StreamBuffer::unwrite_to(int64_t wr_count) {
assert(wr_count >= total_reads());
assert(wr_count <= total_writes());
write_cursor_ = buf_lo_ + (wr_count - pre_read_count_);
}
void StreamBuffer::copy_into(StreamBuffer *sb) {
sb->write_bytes(read_cursor_, write_cursor_ - read_cursor_);
}
void StreamBuffer::transfer_into(StreamBuffer *sb) {
sb->write_bytes(read_cursor_, write_cursor_ - read_cursor_);
read_cursor_ = write_cursor_;
}
bool StreamBuffer::contents_equal(const StreamBuffer *other) const {
int64_t len = fill();
if (len != other->fill()) {
return false;
}
return memcmp(read_cursor_, other->read_cursor_, len) == 0;
}
int lua_writer_into_streambuffer(lua_State *L, const void* bytes, size_t sz, void* sbv) {
StreamBuffer *sb = (StreamBuffer *)sbv;
@@ -334,10 +20,6 @@ static bool streq(const char *str, const char *data) {
return memcmp(str, data, len) == 0;
}
static void write_ztbytes(StreamBuffer *sb, const char *bytes) {
sb->write_bytes(bytes, strlen(bytes));
}
LuaDefine(unittests_streambuffer, "", "some unit tests") {
// An 11-byte fixed-size stream buffer.
StreamBuffer sb11(11, true);
@@ -346,7 +28,7 @@ LuaDefine(unittests_streambuffer, "", "some unit tests") {
LuaAssert(L, sb11.layout_is(0, 0, 11));
// Write a few bytes.
write_ztbytes(&sb11, "abcdef");
sb11.write_bytes("abcdef");
LuaAssert(L, sb11.layout_is(0, 6, 5));
// Try reading some bytes.
@@ -365,17 +47,17 @@ LuaDefine(unittests_streambuffer, "", "some unit tests") {
try {
sb11.read_bytes(1);
LuaAssert(L, false && "This should have thrown an exception");
} catch (const StreamEof &) {}
} catch (const StreamEofOnRead &) {}
LuaAssert(L, sb11.layout_is(6, 0, 5));
// Write some more bytes into the stream, forcing a shift-left
write_ztbytes(&sb11, "ghijkl");
sb11.write_bytes("ghijkl");
LuaAssert(L, sb11.layout_is(0, 6, 5));
// Test buffer wrapping a little more.
LuaAssert(L, streq("ghi", sb11.read_bytes(3)));
LuaAssert(L, sb11.layout_is(3, 3, 5));
write_ztbytes(&sb11, "mnopqr");
sb11.write_bytes("mnopqr");
LuaAssert(L, sb11.layout_is(0, 9, 2));
LuaAssert(L, streq("jklmnopqr", sb11.read_bytes(9)));
LuaAssert(L, sb11.layout_is(9, 0, 2));

View File

@@ -184,27 +184,17 @@
//
// You can use a streambuffer as a lua_Writer, as follows:
//
// lua_dump(L, stream.lua_writer(), stream.lua_writer_ud());
// lua_dump(L, lua_writer_into_streambuffer, &sb);
//
// Anything written to the lua_writer gets appended to the streambuffer, the
// same as if it had been written using write_bytes.
//
// You can use a streambuffer as a lua_Reader, as follows:
// You can't use streambuffer as a lua_Reader directly, but you can get a
// string_view out of it and then use that to construct a lua_Reader, as
// follows:
//
// lua_load (L, stream.lua_reader(), stream.lua_reader_ud(nbytes), ...)
//
// The exact semantics of the lua_reader are tricky, so be careful:
// lua_reader_ud calls 'read_bytes' immediately, and it stores the bytes in a
// "cache of bytes for lua." Then, when the lua_reader gets invoked, the reader
// returns the entire contents of the cache, and it clears the cache. Here are
// some consequences of this design:
//
// 1. The number of bytes read from the stream is always exactly equal to
// nbytes, even if lua never calls the lua_reader.
//
// 2. If the stream doesn't contain nbytes, a StreamEof exception gets thrown
// from lua_reader_ud, not from the lua_Reader. This is good, because it
// means exceptions don't get thrown from inside the lua runtime.
// LuaStringViewReader svr(mystreambuffer.view());
// lua_load (L, svr.lua_reader(), svr.lua_reader_userdata());
//
//////////////////////////////////////////////////////////////
@@ -219,7 +209,7 @@
#include <cstdint>
#include <cassert>
#include "base-writer.hpp"
#include "base-buffer.hpp"
#include "luastack.hpp"
#include "util.hpp"
@@ -229,188 +219,88 @@ public:
virtual char const *what() const { return "General stream exception"; }
};
class StreamEof : public StreamException
class StreamEofOnRead : public StreamException
{
public:
virtual char const *what() const { return "Stream ran out of data"; }
};
class StreamCorruption : public StreamException
class StreamStringTooLong: public StreamException
{
public:
virtual char const *what() const { return "Stream contained invalid data"; }
virtual char const *what() const { return "Stream contained a string that was too long"; }
};
class StreamBuffer : public eng::nevernew, public BaseReader<StreamBuffer>, public BaseWriter<StreamBuffer> {
class StreamIntegerTruncated: public StreamException
{
public:
using read_string_type = eng::string;
virtual char const *what() const { return "You truncated an integer when writing to a stream"; }
};
// Construct an empty buffer.
StreamBuffer();
class StreamCorruption: public StreamException
{
public:
virtual char const *what() const { return "Stream Corruption"; }
};
// Construct an empty buffer, preallocate the specified amount of space.
StreamBuffer(int64_t size, bool fixed_size);
class StreamBufferCore {
protected:
void *basebuffer_malloc(size_t size) { return eng::malloc(size); }
void basebuffer_free(void *p) { eng::free(p); }
void clear_error_flags() { }
void raise_eof_on_read() { throw StreamEofOnRead(); }
void raise_string_too_long() { throw StreamStringTooLong(); }
void raise_integer_truncated() { throw StreamIntegerTruncated(); }
};
// Construct a streambuffer that reads from an external block of bytes.
StreamBuffer(const char *s, int64_t len);
class StreamBuffer : public eng::nevernew, public BaseBuffer<StreamBufferCore, eng::string> {
public:
using BaseBuffer::BaseBuffer;
// Construct a streambuffer that reads from an external block of bytes.
StreamBuffer(std::string_view data);
// Delete a StreamBuffer.
~StreamBuffer();
// Get the total number of bytes ever read from this buffer.
int64_t total_reads() const;
// Get the total number of bytes ever written to this buffer.
int64_t total_writes() const;
// Make the specified amount of space in the buffer for writing.
// Return a pointer to the space.
char *make_space(int64_t bytes) {
int64_t available = buf_hi_ - write_cursor_;
if (available < bytes) make_space_slow(bytes);
return write_cursor_;
void write_xyz(const util::XYZ &xyz) {
write_float(xyz.x);
write_float(xyz.y);
write_float(xyz.z);
}
// Used after calling make_space then filling the space.
void wrote_space(int64_t bytes);
// Amount of data inside the buffer.
int64_t fill() const;
// Get a pointer to the data.
const char *data() const;
// Get entire contents as a string_view
std::string_view view() const;
// Discard all data. Reset total read and write counts.
// Frees up as much space as possible.
void clear();
// Attempt to do a "readline". If there is no newline in
// the buffer, returns empty string. If there is a newline,
// returns a block of text that ends in newline.
eng::string readline();
// Write block of bytes into the buffer.
//
// Caution: this function doesn't write the length!
// It just writes the bytes.
//
void write_bytes(const char *bytes, int64_t len);
void write_bytes(std::string_view s);
// Read a block of bytes from the buffer.
//
// Caution: the pointer returned is a pointer to the stream's buffer. It is
// only valid until you mutate the buffer. Throws StreamEof if the specified
// number of bytes aren't present.
//
const char *read_bytes(int64_t bytes);
// Copy bytes from the StreamBuffer into an external buffer.
//
// Returns true if the bytes were successfully read.
//
void read_bytes_into(char *target, int64_t len);
// Read a string as a string_view.
//
std::string_view read_string_view_limit(uint64_t limit);
std::string_view read_string_view() { return read_string_view_limit(0x1000000); }
// Read and write larger types.
//
// Throws StreamEof if the specified number of bytes aren't present.
// Read string with a length limit will throw 'StreamCorruption' if the
// length is too long.
//
void write_xyz(const util::XYZ &xyz);
void write_dxyz(const util::DXYZ &xyz);
util::XYZ read_xyz();
util::DXYZ read_dxyz();
void write_hashvalue(const util::HashValue &hv);
util::HashValue read_hashvalue();
// Overwrite values previously written to the buffer.
//
// See the comment at the top of this file for an explanation.
//
void overwrite_int8(int64_t write_count_after, int64_t v);
void overwrite_int16(int64_t write_count_after, int64_t v);
void overwrite_int32(int64_t write_count_after, int64_t v);
void overwrite_int64(int64_t write_count_after, int64_t v);
void overwrite_uint8(int64_t write_count_after, uint64_t v);
void overwrite_uint16(int64_t write_count_after, uint64_t v);
void overwrite_uint32(int64_t write_count_after, uint64_t v);
void overwrite_uint64(int64_t write_count_after, uint64_t v);
// This function checks to see if the buffer is empty.
bool empty();
// Verify that the buffer is empty, if not, throw StreamCorruption.
void verify_empty();
// Make sure the specified number of bytes are available to read.
void check_available(int64_t bytes) {
int64_t avail = write_cursor_ - read_cursor_;
if (avail < bytes) {
throw StreamEof();
}
void write_dxyz(const util::DXYZ &xyz) {
write_double(xyz.x);
write_double(xyz.y);
write_double(xyz.z);
}
// Rewind the read cursor to a previous position.
void unread_to(int64_t total_reads);
// Rewind the write cursor to a previous position.
void unwrite_to(int64_t total_writes);
// Copy the entire contents of this streambuffer into another one.
void copy_into(StreamBuffer *sb);
void write_hashvalue(const util::HashValue &h) {
write_uint64(h.first);
write_uint64(h.second);
}
// Transfer the entire contents of this streambuffer into another one.
void transfer_into(StreamBuffer *sb);
util::XYZ read_xyz() {
float x = read_float();
float y = read_float();
float z = read_float();
return util::XYZ(x, y, z);
}
// Compare the contents of this streambuffer to another one.
bool contents_equal(const StreamBuffer *sb) const;
util::DXYZ read_dxyz() {
double x = read_double();
double y = read_double();
double z = read_double();
return util::DXYZ(x, y, z);
}
// Throw a StreamCorruption exception.
void raise_truncated() { throw StreamCorruption(); }
void raise_string_too_long() { throw StreamCorruption(); }
// This is for unit testing.
bool layout_is(int64_t a, int64_t b, int64_t c);
util::HashValue read_hashvalue() {
uint64_t f = read_uint64();
uint64_t s = read_uint64();
return util::HashValue(f, s);
}
private:
// Start and end of the allocated block.
char *buf_lo_;
char *buf_hi_;
bool contents_equal(const StreamBuffer *sb) {
return view() == sb->view();
}
// The write and read cursors.
char *write_cursor_;
char *read_cursor_;
// Number of bytes read before buffer was last aligned.
int64_t pre_read_count_;
// True if we own this buffer.
bool owned_;
// True if we're not allowed to expand this buffer.
bool fixed_size_;
// Initialize with a new buffer.
void init(bool fixed, bool owned, char *buf, int64_t size);
// Function that resizes the buffer during make_space operations.
void make_space_slow(int64_t bytes);
// Implementation for the overwrite_int functions.
char *get_overwrite(int64_t size, int64_t write_count_after);
void copy_into(StreamBuffer *sb) {
sb->write_bytes(view());
}
};
// Use a streambuffer as a lua_writer.