Files
integration/luprex/cpp/core/streambuffer.cpp

519 lines
14 KiB
C++

#include "wrap-string.hpp"
#include "eng-malloc.hpp"
#include "streambuffer.hpp"
#include "spookyv2.hpp"
#include <cassert>
#include <cstring>
void StreamBuffer::init(bool fixed, bool owned, char *buf, int64_t size) {
buf_lo_ = buf;
buf_hi_ = buf_lo_ + size;
read_cursor_ = buf_lo_;
write_cursor_ = buf_lo_;
pre_read_count_ = 0;
owned_ = owned;
fixed_size_ = fixed;
}
StreamBuffer::StreamBuffer() {
init(false, true, 0, 0);
}
StreamBuffer::StreamBuffer(int64_t size, bool fixed) {
assert(size >= 0);
init(fixed, true, (char*)eng::malloc(size), size);
}
StreamBuffer::StreamBuffer(const char *s, int64_t size) {
assert(size >= 0);
init(true, false, const_cast<char *>(s), size);
write_cursor_ = buf_hi_;
}
StreamBuffer::StreamBuffer(std::string_view data) {
init(true, false, const_cast<char *>(data.data()), data.size());
write_cursor_ = buf_hi_;
}
StreamBuffer::~StreamBuffer() {
if (owned_ && (buf_lo_ != 0)) eng::free(buf_lo_);
}
int64_t StreamBuffer::total_reads() const {
return (read_cursor_ - buf_lo_) + pre_read_count_;
}
int64_t StreamBuffer::total_writes() const {
return (write_cursor_ - buf_lo_) + pre_read_count_;
}
int64_t StreamBuffer::fill() const {
return write_cursor_ - read_cursor_;
}
const char *StreamBuffer::data() const {
return read_cursor_;
}
std::string_view StreamBuffer::view() const {
return std::string_view(read_cursor_, write_cursor_ - read_cursor_);
}
bool StreamBuffer::layout_is(int64_t a, int64_t b, int64_t c) {
if (read_cursor_ - buf_lo_ != a) return false;
if (write_cursor_ - read_cursor_ != b) return false;
if (buf_hi_ - write_cursor_ != c) return false;
return true;
}
void StreamBuffer::make_space_slow(int64_t bytes) {
assert(owned_ && "We don't own this buffer, can't grow it");
// Decide whether the current buffer is big enough.
int64_t data_size = (write_cursor_ - read_cursor_);
int64_t existing_size = (buf_hi_ - buf_lo_);
int64_t desired_size = 8192 + ((data_size + bytes) * 2);
// Update some simple things.
pre_read_count_ += (read_cursor_ - buf_lo_);
// Move the data to the beginning of the buffer, or to
// the beginning of a new buffer.
if (fixed_size_) {
assert((data_size + bytes <= existing_size) && "Not enough space in fixed-size buffer");
if (data_size > 0) memcpy(buf_lo_, read_cursor_, data_size);
} else if (existing_size >= desired_size) {
if (data_size > 0) memcpy(buf_lo_, read_cursor_, data_size);
} else {
char *nbuf = (char *)eng::malloc(desired_size);
if (data_size > 0) memcpy(nbuf, read_cursor_, data_size);
if (buf_lo_ != nullptr) eng::free(buf_lo_);
buf_lo_ = nbuf;
buf_hi_ = nbuf + desired_size;
}
// Update the pointers to the data region.
read_cursor_ = buf_lo_;
write_cursor_ = buf_lo_ + data_size;
}
void StreamBuffer::wrote_space(int64_t bytes) {
int64_t available = buf_hi_ - write_cursor_;
assert(bytes >= 0);
assert(available >= bytes);
write_cursor_ += bytes;
}
char *StreamBuffer::get_overwrite(int64_t size, int64_t write_count_after) {
int64_t write_count_before = write_count_after - size;
assert(write_count_before >= total_reads());
assert(write_count_after <= total_writes());
return buf_lo_ + (write_count_before - pre_read_count_);
}
void StreamBuffer::clear() {
assert(owned_);
if (!fixed_size_) {
if (buf_lo_ != nullptr) eng::free(buf_lo_);
buf_lo_ = 0;
buf_hi_ = 0;
}
owned_ = true;
read_cursor_ = buf_lo_;
write_cursor_ = buf_lo_;
pre_read_count_ = 0;
}
eng::string StreamBuffer::readline() {
char *p = read_cursor_;
while ((p < write_cursor_) && (*p != '\n')) p++;
if (p == write_cursor_) {
return "";
} else {
p++;
eng::string result(read_cursor_, p - read_cursor_);
read_cursor_ = p;
return result;
}
}
void StreamBuffer::write_bytes(const char *s, int64_t len) {
make_space(len);
memcpy(write_cursor_, s, len);
write_cursor_ += len;
}
void StreamBuffer::write_bytes(std::string_view s) {
write_bytes(s.data(), s.size());
}
const char *StreamBuffer::read_bytes(int64_t bytes) {
check_available(bytes);
char *data = read_cursor_;
read_cursor_ += bytes;
return data;
}
void StreamBuffer::read_bytes_into(char *data, int64_t size) {
check_available(size);
memcpy(data, read_cursor_, size);
read_cursor_ += size;
}
std::string_view StreamBuffer::read_string_view_limit(uint64_t limit) {
uint64_t length = read_length();
if (length > limit) throw StreamCorruption();
check_available(length);
std::string_view result(read_cursor_, length);
read_cursor_ += length;
return result;
}
void StreamBuffer::write_xyz(const util::XYZ &xyz) {
make_space(12);
memcpy(write_cursor_, &xyz.x, 4);
write_cursor_ += 4;
memcpy(write_cursor_, &xyz.y, 4);
write_cursor_ += 4;
memcpy(write_cursor_, &xyz.z, 4);
write_cursor_ += 4;
}
void StreamBuffer::write_dxyz(const util::DXYZ &xyz) {
make_space(24);
memcpy(write_cursor_, &xyz.x, 8);
write_cursor_ += 8;
memcpy(write_cursor_, &xyz.y, 8);
write_cursor_ += 8;
memcpy(write_cursor_, &xyz.z, 8);
write_cursor_ += 8;
}
util::XYZ StreamBuffer::read_xyz() {
check_available(12);
util::XYZ result;
memcpy(&result.x, read_cursor_, 4);
read_cursor_ += 4;
memcpy(&result.y, read_cursor_, 4);
read_cursor_ += 4;
memcpy(&result.z, read_cursor_, 4);
read_cursor_ += 4;
return result;
}
util::DXYZ StreamBuffer::read_dxyz() {
check_available(24);
util::DXYZ result;
memcpy(&result.x, read_cursor_, 8);
read_cursor_ += 8;
memcpy(&result.y, read_cursor_, 8);
read_cursor_ += 8;
memcpy(&result.z, read_cursor_, 8);
read_cursor_ += 8;
return result;
}
void StreamBuffer::write_hashvalue(const util::HashValue &hv) {
write_uint64(hv.first);
write_uint64(hv.second);
}
util::HashValue StreamBuffer::read_hashvalue() {
uint64_t f = read_uint64();
uint64_t s = read_uint64();
return util::HashValue(f,s);
}
eng::string StreamBuffer::read_entire_contents() {
eng::string result(read_cursor_, fill());
read_cursor_ = write_cursor_;
return result;
}
void StreamBuffer::overwrite_int8(int64_t write_count_after, int64_t vv) {
int8_t v = vv;
assert(int64_t(v) == vv);
char *target = get_overwrite(1, write_count_after);
memcpy(target, &v, 1);
}
void StreamBuffer::overwrite_int16(int64_t write_count_after, int64_t vv) {
int16_t v = vv;
assert(int64_t(v) == vv);
char *target = get_overwrite(2, write_count_after);
memcpy(target, &v, 2);
}
void StreamBuffer::overwrite_int32(int64_t write_count_after, int64_t vv) {
int32_t v = vv;
assert(int64_t(v) == vv);
char *target = get_overwrite(4, write_count_after);
memcpy(target, &v, 4);
}
void StreamBuffer::overwrite_int64(int64_t write_count_after, int64_t vv) {
int64_t v = vv;
assert(int64_t(v) == vv);
char *target = get_overwrite(8, write_count_after);
memcpy(target, &v, 8);
}
void StreamBuffer::overwrite_uint8(int64_t write_count_after, uint64_t vv) {
uint8_t v = vv;
assert(uint64_t(v) == vv);
char *target = get_overwrite(1, write_count_after);
memcpy(target, &v, 1);
}
void StreamBuffer::overwrite_uint16(int64_t write_count_after, uint64_t vv) {
uint16_t v = vv;
assert(uint64_t(v) == vv);
char *target = get_overwrite(2, write_count_after);
memcpy(target, &v, 2);
}
void StreamBuffer::overwrite_uint32(int64_t write_count_after, uint64_t vv) {
uint32_t v = vv;
assert(uint64_t(v) == vv);
char *target = get_overwrite(4, write_count_after);
memcpy(target, &v, 4);
}
void StreamBuffer::overwrite_uint64(int64_t write_count_after, uint64_t vv) {
uint64_t v = vv;
assert(uint64_t(v) == vv);
char *target = get_overwrite(8, write_count_after);
memcpy(target, &v, 8);
}
bool StreamBuffer::empty() {
return (read_cursor_ == write_cursor_);
}
void StreamBuffer::verify_empty() {
if (read_cursor_ != write_cursor_) {
throw StreamCorruption();
}
}
void StreamBuffer::unread_to(int64_t rd_count) {
assert(rd_count >= pre_read_count_);
assert(rd_count <= total_reads());
read_cursor_ = buf_lo_ + (rd_count - pre_read_count_);
}
void StreamBuffer::unwrite_to(int64_t wr_count) {
assert(wr_count >= total_reads());
assert(wr_count <= total_writes());
write_cursor_ = buf_lo_ + (wr_count - pre_read_count_);
}
void StreamBuffer::copy_into(StreamBuffer *sb) {
sb->write_bytes(read_cursor_, write_cursor_ - read_cursor_);
}
void StreamBuffer::transfer_into(StreamBuffer *sb) {
sb->write_bytes(read_cursor_, write_cursor_ - read_cursor_);
read_cursor_ = write_cursor_;
}
bool StreamBuffer::contents_equal(const StreamBuffer *other) const {
int64_t len = fill();
if (len != other->fill()) {
return false;
}
return memcmp(read_cursor_, other->read_cursor_, len) == 0;
}
util::HashValue StreamBuffer::hash() const {
uint64_t hash1 = 0x82A7912E7893AC87;
uint64_t hash2 = 0x81D402740DE458F3;
SpookyHash::ChainHash128(read_cursor_, write_cursor_ - read_cursor_, &hash1, &hash2);
return std::make_pair(hash1, hash2);
}
class StreamBufferWriter : public std::streambuf, public eng::opnew {
private:
StreamBuffer *target_;
public:
StreamBufferWriter(StreamBuffer *t) : target_(t) {}
virtual int_type overflow(int_type c) {
if (c != EOF) {
target_->write_uint8(c);
}
return c;
}
};
class StreamBufferOStream : public std::ostream, public eng::opnew {
private:
StreamBufferWriter writer_;
public:
StreamBufferOStream(StreamBuffer *t) : std::ostream(nullptr), writer_(t) {
rdbuf(&writer_);
}
virtual ~StreamBufferOStream() {
}
};
std::ostream &StreamBuffer::ostream() {
if (ostream_ == nullptr) {
ostream_.reset(new StreamBufferOStream(this));
}
return *ostream_;
}
int lua_writer_into_streambuffer(lua_State *L, const void* bytes, size_t sz, void* sbv) {
StreamBuffer *sb = (StreamBuffer *)sbv;
memcpy(sb->make_space(sz), bytes, sz);
sb->wrote_space(sz);
return 0;
}
static bool streq(const char *str, const char *data) {
int len = strlen(str);
return memcmp(str, data, len) == 0;
}
static void write_ztbytes(StreamBuffer *sb, const char *bytes) {
sb->write_bytes(bytes, strlen(bytes));
}
LuaDefine(unittests_streambuffer, "", "some unit tests") {
// An 11-byte fixed-size stream buffer.
StreamBuffer sb11(11, true);
// Check the initial state.
LuaAssert(L, sb11.layout_is(0, 0, 11));
// Write a few bytes.
write_ztbytes(&sb11, "abcdef");
LuaAssert(L, sb11.layout_is(0, 6, 5));
// Try reading some bytes.
LuaAssert(L, streq("abcd", sb11.read_bytes(4)));
LuaAssert(L, sb11.layout_is(4, 2, 5));
// Put back two bytes.
sb11.unread_to(2);
LuaAssert(L, sb11.layout_is(2, 4, 5));
// Read some more bytes.
LuaAssert(L, streq("cdef", sb11.read_bytes(4)));
LuaAssert(L, sb11.layout_is(6, 0, 5));
// Reading bytes now should raise an EOF and should not alter layout
try {
sb11.read_bytes(1);
LuaAssert(L, false && "This should have thrown an exception");
} catch (const StreamEof &) {}
LuaAssert(L, sb11.layout_is(6, 0, 5));
// Write some more bytes into the stream, forcing a shift-left
write_ztbytes(&sb11, "ghijkl");
LuaAssert(L, sb11.layout_is(0, 6, 5));
// Test buffer wrapping a little more.
LuaAssert(L, streq("ghi", sb11.read_bytes(3)));
LuaAssert(L, sb11.layout_is(3, 3, 5));
write_ztbytes(&sb11, "mnopqr");
LuaAssert(L, sb11.layout_is(0, 9, 2));
LuaAssert(L, streq("jklmnopqr", sb11.read_bytes(9)));
LuaAssert(L, sb11.layout_is(9, 0, 2));
// Test 1-byte integer ops.
sb11.clear();
for (int i = 0; i < 10; i++) {
sb11.write_int8(i);
sb11.write_int8(i+100);
LuaAssert(L, sb11.read_int8() == i);
LuaAssert(L, sb11.read_int8() == i+100);
}
// Test 2-byte integer ops.
for (int i = 0; i < 10; i++) {
sb11.write_int16(i);
sb11.write_int16(i+10000);
LuaAssert(L, sb11.read_int16() == i);
LuaAssert(L, sb11.read_int16() == i+10000);
}
// Test 4-byte integer ops.
for (int i = 0; i < 10; i++) {
sb11.write_int32(i);
sb11.write_int32(i+1000000);
LuaAssert(L, sb11.read_int32() == i);
LuaAssert(L, sb11.read_int32() == i+1000000);
}
// Test 8-byte integer ops.
for (int i = 0; i < 10; i++) {
sb11.write_int64(i + 1000000);
LuaAssert(L, sb11.read_int64() == i + 1000000);
}
// Check the write count and read count accumulator ability.
sb11.clear();
LuaAssert(L, sb11.total_writes() == 0);
LuaAssert(L, sb11.total_reads() == 0);
for (int i = 0; i < 10; i++) {
LuaAssert(L, sb11.total_writes() == i * 8);
sb11.write_int32(i);
sb11.write_int32(i+1000000);
LuaAssert(L, sb11.total_reads() == i * 8);
LuaAssert(L, sb11.read_int32() == i);
LuaAssert(L, sb11.read_int32() == i+1000000);
}
// Try clearing the buffer.
sb11.clear();
LuaAssert(L, sb11.layout_is(0, 0, 11));
// Write a number then overwrite.
for (int i = 0; i < 2; i++) {
sb11.write_int16(12);
sb11.write_int16(34);
int64_t wc = sb11.total_writes();
sb11.write_int16(56);
sb11.write_int16(78);
sb11.overwrite_int16(wc, 90);
LuaAssert(L, sb11.read_int16() == 12);
LuaAssert(L, sb11.read_int16() == 90);
LuaAssert(L, sb11.read_int16() == 56);
LuaAssert(L, sb11.read_int16() == 78);
}
// Try compact string encoding.
sb11.clear();
sb11.write_string("abc");
sb11.write_string("");
sb11.write_string("de");
LuaAssert(L, sb11.layout_is(0, 8, 3));
LuaAssert(L, sb11.read_string() == "abc");
LuaAssert(L, sb11.read_string() == "");
LuaAssert(L, sb11.read_string() == "de");
// Make sure that contents_equal is not obviously broken.
StreamBuffer eqsb1, eqsb2;
eqsb1.write_int32(12);
eqsb1.write_int32(34);
eqsb2.write_int32(34);
LuaAssert(L, !eqsb1.contents_equal(&eqsb2));
eqsb1.read_int32();
LuaAssert(L, eqsb1.contents_equal(&eqsb2));
eqsb1.write_int32(34);
LuaAssert(L, !eqsb1.contents_equal(&eqsb2));
// Check the OStream functionality.
StreamBuffer ossb;
ossb.ostream() << "Testing.";
ossb.ostream() << "Foo.";
LuaAssertStrEq(L, ossb.read_entire_contents(), "Testing.Foo.");
return 0;
}