More work on diff xmit, not finished yet

This commit is contained in:
2021-07-25 19:22:39 -04:00
parent 49a4ec220d
commit 9f342854e1
13 changed files with 304 additions and 100 deletions

View File

@@ -447,23 +447,22 @@ void AnimQueue::deserialize(StreamBuffer *sb) {
}
}
bool AnimQueue::make_patch(const AnimQueue &auth, StreamBuffer *sb) const {
bool AnimQueue::need_patch(const AnimQueue &auth) const {
// Sanity check.
assert(valid());
assert(auth.valid());
// Fast path to detect equivalence.
if (version_number_ == auth.version_number_) {
sb->write_uint8(0);
return false;
}
// Special case: if we're already a perfect match.
if (size_and_steps_equal(auth)) {
sb->write_uint8(0);
return false;
}
// Otherwise, do a direct comparison.
return !size_and_steps_equal(auth);
}
void AnimQueue::diff(const AnimQueue &auth, StreamBuffer *sb) const {
// Write the first element.
sb->write_uint8(auth.steps_.size());
sb->write_uint8(auth.size_limit_);
@@ -502,14 +501,10 @@ bool AnimQueue::make_patch(const AnimQueue &auth, StreamBuffer *sb) const {
}
}
}
return true;
}
void AnimQueue::apply_patch(StreamBuffer *sb) {
void AnimQueue::patch(StreamBuffer *sb) {
int len = sb->read_uint8();
if (len == 0) {
return;
}
size_limit_ = sb->read_uint8();
// Decode the diff, stop at eof.
std::deque<AnimStep> old = std::move(steps_);
@@ -545,8 +540,8 @@ const AnimStep &AnimQueue::back() const {
static bool diff_works(const AnimQueue &master, AnimQueue &sync) {
StreamBuffer sb;
sync.make_patch(master, &sb);
sync.apply_patch(&sb);
sync.diff(master, &sb);
sync.patch(&sb);
return sync.size_and_steps_equal(master);
}
@@ -647,10 +642,8 @@ LuaDefine(unittests_animqueue, "c") {
LuaAssert(L, diff_works(aq, aqds));
// compare again, should be no differences.
sb.clear();
LuaAssert(L, !aqds.make_patch(aq, &sb));
aqds.apply_patch(&sb);
LuaAssert(L, aqds.size_and_steps_equal(aq));
LuaAssert(L, !aqds.need_patch(aq));
LuaAssert(L, diff_works(aq, aqds));
// Discard all but the last action.
aq.set_limit(1);

View File

@@ -186,8 +186,9 @@ public:
void deserialize(StreamBuffer *sb);
// Difference transmission
bool make_patch(const AnimQueue &auth, StreamBuffer *sb) const;
void apply_patch(StreamBuffer *sb);
bool need_patch(const AnimQueue &auth) const;
void diff(const AnimQueue &auth, StreamBuffer *sb) const;
void patch(StreamBuffer *sb);
void update_version(const AnimQueue &auth);
// Get the final resting place after all animations are complete.

View File

@@ -205,17 +205,10 @@ bool IdPlayerPool::valid() const {
return true;
}
bool IdPlayerPool::make_patch(const IdPlayerPool &auth, StreamBuffer *sb) const {
void IdPlayerPool::diff(const IdPlayerPool &auth, StreamBuffer *sb) const {
assert(valid());
assert(auth.valid());
// The fifo capacity cannot be 255, so we use this as special
// to indicate that no changes are present.
if (exactly_equal(auth)) {
sb->write_uint8(255);
return false;
}
// Write the fifo capacity and nranges
sb->write_uint8(auth.fifo_capacity_);
sb->write_uint8(auth.ranges_.size());
@@ -238,15 +231,11 @@ bool IdPlayerPool::make_patch(const IdPlayerPool &auth, StreamBuffer *sb) const
sb->write_uint8(slot);
}
}
return true;
}
void IdPlayerPool::apply_patch(StreamBuffer *sb) {
void IdPlayerPool::patch(StreamBuffer *sb) {
// read the header byte
int fifo_cap = sb->read_uint8();
if (fifo_cap == 255) {
return;
}
fifo_capacity_ = fifo_cap;
int nranges = sb->read_uint8();
std::deque<int64_t> old = std::move(ranges_);
@@ -441,8 +430,8 @@ LuaDefine(unittests_idalloc, "c") {
// Check case: no differences.
sb.clear();
LuaAssert(L, !ppds.make_patch(pp, &sb));
ppds.apply_patch(&sb);
ppds.diff(pp, &sb);
ppds.patch(&sb);
LuaAssert(L, ppds.exactly_equal(pp));
// Add some values to master pool
@@ -451,8 +440,8 @@ LuaDefine(unittests_idalloc, "c") {
// transmit and compare.
sb.clear();
LuaAssert(L, ppds.make_patch(pp, &sb));
ppds.apply_patch(&sb);
ppds.diff(pp, &sb);
ppds.patch(&sb);
LuaAssert(L, ppds.exactly_equal(pp));
// Pop a value from master pool
@@ -461,8 +450,8 @@ LuaDefine(unittests_idalloc, "c") {
// transmit and compare.
sb.clear();
LuaAssert(L, ppds.make_patch(pp, &sb));
ppds.apply_patch(&sb);
ppds.diff(pp, &sb);
ppds.patch(&sb);
LuaAssert(L, ppds.exactly_equal(pp));
return 0;

View File

@@ -181,15 +181,12 @@ public:
// Difference transmission
//
// When there are no differences, make_patch returns false but
// still writes out a valid patch.
//
bool make_patch(const IdPlayerPool &auth, StreamBuffer *sb) const;
void apply_patch(StreamBuffer *sb);
void diff(const IdPlayerPool &auth, StreamBuffer *sb) const;
void patch(StreamBuffer *sb);
// Debug string.
std::string debug_string() const;
private:
IdGlobalPool *global_;
int fifo_capacity_;

View File

@@ -1,5 +1,4 @@
#include "luastack.hpp"
#include "util.hpp"
#include <iostream>
LuaSpecial LuaRegistry(LUA_REGISTRYINDEX);

View File

@@ -160,8 +160,8 @@ int PlaneMap::total_cells() const {
return total;
}
PlaneMap::IdVec PlaneMap::scan_radius(const std::string &plane, double x, double y, double radius, int64_t prepend) const {
PlaneMap::IdVec result;
PlaneMap::IdVector PlaneMap::scan_radius(const std::string &plane, double x, double y, double radius, int64_t prepend) const {
PlaneMap::IdVector result;
if (prepend != 0) {
result.push_back(prepend);
}
@@ -188,6 +188,24 @@ PlaneMap::IdVec PlaneMap::scan_radius(const std::string &plane, double x, double
return result;
}
util::IdVector PlaneMap::sort_union_id_vectors(const IdVector &v1, const IdVector &v2) {
IdVector result(v1.size() + v2.size());
int next = 0;
for (int64_t id : v1) result[next++] = id;
for (int64_t id : v2) result[next++] = id;
std::sort(result.begin(), result.end());
int64_t prev = -1;
int64_t count = 0;
for (int64_t id : result) {
if (id != prev) {
prev = id;
result[count++] = id;
}
}
result.resize(count);
return result;
}
LuaDefine(unittests_planemap, "c") {
double SC = CELL_SCALE;
double E = CELL_SCALE * 0.4;
@@ -196,7 +214,7 @@ LuaDefine(unittests_planemap, "c") {
PlaneMap pm;
PlaneItem pia, pib;
PlaneMap::EltVec elts;
PlaneMap::IdVec ids;
PlaneMap::IdVector ids;
// Simple test.
LuaAssert(L, rect_cell_range(-7*SC, -15*SC, 87*SC, 21*SC).equal(-7, -15, 87, 21));
@@ -298,5 +316,20 @@ LuaDefine(unittests_planemap, "c") {
LuaAssert(L, ids[0] == 123);
LuaAssert(L, ids[1] == 456);
// Test the unioning of ID vectors.
PlaneMap::IdVector idv1,idv2;
idv1.push_back(1);
idv1.push_back(6);
idv1.push_back(4);
idv2.push_back(1);
idv2.push_back(5);
idv2.push_back(6);
PlaneMap::IdVector joined = PlaneMap::sort_union_id_vectors(idv1, idv2);
LuaAssert(L, joined.size() == 4);
LuaAssert(L, joined[0] == 1);
LuaAssert(L, joined[1] == 4);
LuaAssert(L, joined[2] == 5);
LuaAssert(L, joined[3] == 6);
return 0;
}

View File

@@ -76,6 +76,7 @@
#include <cstdint>
#include <vector>
#include <map>
#include "util.hpp"
class PlaneMap;
@@ -118,11 +119,16 @@ private:
void insert(const std::string &plane, int64_t cell, PlaneItem *client);
public:
using IdVec = std::vector<int64_t>;
using IdVector = util::IdVector;
PlaneMap();
~PlaneMap();
IdVec scan_radius(const std::string &plane, double x, double y, double radius, int64_t prepend) const;
// Caution: scan_radius is not deterministically ordered.
IdVector scan_radius(const std::string &plane, double x, double y, double radius, int64_t prepend) const;
// Unions and sorts two ID vectors.
static IdVector sort_union_id_vectors(const IdVector &v1, const IdVector &v2);
private:
// unit testing stuff.
friend int unittests_planemap(lua_State *L);

View File

@@ -396,6 +396,10 @@ void StreamBuffer::unwrite_to(int64_t wr_count) {
write_cursor_ = buf_lo_ + (wr_count - pre_read_count_);
}
void StreamBuffer::copy_into(StreamBuffer *sb) {
sb->write_bytes(read_cursor_, write_cursor_ - read_cursor_);
}
util::HashValue StreamBuffer::hash() const {
uint64_t hash1 = 0x82A7912E7893AC87;
uint64_t hash2 = 0x81D402740DE458F3;

View File

@@ -338,7 +338,7 @@ public:
void overwrite_uint32(int64_t write_count_after, uint64_t v);
void overwrite_uint64(int64_t write_count_after, uint64_t v);
// Return true if the buffer is empty.
// This function checks to see if the buffer is empty.
bool at_eof();
// Verify that the buffer is empty, if not, throw StreamCorruption.
@@ -350,6 +350,9 @@ public:
// Rewind the write cursor to a previous position.
void unwrite_to(int64_t total_writes);
// Copy the entire contents of this streambuffer into another one.
void copy_into(StreamBuffer *sb);
// Calculate a noncryptographic but good hash of what's in the buffer.
util::HashValue hash() const;

View File

@@ -15,7 +15,7 @@
#endif
namespace util {
stringvec split(const std::string &s, char sep) {
stringvec result;
int start = 0;

View File

@@ -21,6 +21,7 @@ enum WorldType {
using stringvec = std::vector<std::string>;
using stringset = std::set<std::string>;
using HashValue = std::pair<uint64_t, uint64_t>;
using IdVector = std::vector<int64_t>;
// Split a string into multiple strings
stringvec split(const std::string &s, char sep);

View File

@@ -98,6 +98,14 @@ Tangible *World::tangible_get(int64_t id) {
}
}
std::vector<Tangible*> World::tangible_get_all(const std::vector<int64_t> &ids) {
std::vector<Tangible*> result(ids.size());
for (int i = 0; i < int(ids.size()); i++) {
result[i] = tangible_get(ids[i]);
}
return result;
}
Tangible *World::tangible_get(lua_State *L, int idx) {
Tangible *result = nullptr;
int top = lua_gettop(L);
@@ -144,16 +152,16 @@ void World::tangible_delete(lua_State *L, int64_t id) {
LS.result();
}
std::vector<int64_t> World::get_near(int64_t player_id, float radius, bool exclude_nowhere) {
util::IdVector World::get_near(int64_t player_id, float radius, bool exclude_nowhere) {
Tangible *player = tangible_get(player_id);
if (player == nullptr) {
return std::vector<int64_t>();
return IdVector();
}
// Find out where's the center of the world.
const AnimStep &aqback = player->anim_queue_.back();
if (exclude_nowhere && (aqback.plane() == "nowhere")) {
return std::vector<int64_t>();
return IdVector();
}
return plane_map_.scan_radius(aqback.plane(), aqback.xyz().x, aqback.xyz().y, radius, player_id);
@@ -486,6 +494,124 @@ void World::rollback() {
deserialize(&snapshot_);
}
void World::difference_transmit(int64_t actor_id, World *master, StreamBuffer *sb) {
StreamBuffer tsb;
diff_actor_essentials(actor_id, master, &tsb);
tsb.copy_into(sb);
patch_actor_essentials(&tsb);
diff_visible_animations(actor_id, master, &tsb);
tsb.copy_into(sb);
patch_visible_animations(&tsb);
}
void World::diff_actor_essentials(int64_t actor_id, World *master, StreamBuffer *sb) {
Tangible *s_actor = tangible_get(actor_id);
Tangible *m_actor = master->tangible_get(actor_id);
assert(s_actor != nullptr);
assert(m_actor != nullptr);
sb->write_int64(actor_id);
s_actor->id_player_pool_.diff(m_actor->id_player_pool_, sb);
s_actor->anim_queue_.diff(m_actor->anim_queue_, sb);
}
void World::patch_actor_essentials(StreamBuffer *sb) {
int64_t actor_id = sb->read_int64();
Tangible *s_actor = tangible_get(actor_id);
s_actor->id_player_pool_.patch(sb);
s_actor->anim_queue_.patch(sb);
s_actor->update_plane_item();
}
void World::diff_visible_animations(int64_t actor_id, World *master, StreamBuffer *sb) {
// Get the list of tangibles visible in either model.
util::IdVector visible = PlaneMap::sort_union_id_vectors(
master->get_near(actor_id, 100.0, true),
this->get_near(actor_id, 100.0, true));
// Some tangibles may be missing in the master, some may be missing in the sync.
std::vector<Tangible *> m_visible = tangible_get_all(visible);
std::vector<Tangible *> s_visible = tangible_get_all(visible);
assert(m_visible.size() == s_visible.size());
// For each tangible missing in the synchronous model, send the
// necessary information to create the tangible.
sb->write_int32(0);
int count_pos = sb->total_writes();
int count = 0;
for (int i = 0; i < int(s_visible.size()); i++) {
Tangible *mt = m_visible[i];
Tangible *st = s_visible[i];
if (st == nullptr) {
count += 1;
sb->write_int64(mt->id());
mt->anim_queue_.serialize(sb);
}
}
sb->overwrite_int32(count_pos, count);
// For each tangible present in the synchronous model that doesn't
// exist in the master model, send command to delete it.
sb->write_int32(0);
count_pos = sb->total_writes();
count = 0;
for (int i = 0; i < int(s_visible.size()); i++) {
Tangible *mt = m_visible[i];
Tangible *st = s_visible[i];
if (mt == nullptr) {
count += 1;
sb->write_int64(st->id());
}
}
sb->overwrite_int32(count_pos, count);
// For each tangible present in both models, compare
// the animation queues.
sb->write_int32(0);
count_pos = sb->total_writes();
count = 0;
for (int i = 0; i < int(s_visible.size()); i++) {
Tangible *mt = m_visible[i];
Tangible *st = s_visible[i];
if ((mt != nullptr) && (st != nullptr)) {
if (st->anim_queue_.need_patch(mt->anim_queue_)) {
count++;
sb->write_int64(st->id());
st->anim_queue_.diff(mt->anim_queue_, sb);
}
}
}
sb->overwrite_int32(count_pos, count);
}
void World::patch_visible_animations(StreamBuffer *sb) {
// Receive create messages.
int count = sb->read_int32();
for (int i = 0; i < count; i++) {
int64_t id = sb->read_int64();
Tangible *t = tangible_make(state(), id, false);
t->anim_queue_.deserialize(sb);
t->update_plane_item();
}
// Receive delete messages
count = sb->read_int32();
for (int i = 0; i < count; i++) {
int64_t id = sb->read_int64();
tangible_delete(state(), id);
}
// Receive update messages
count = sb->read_int32();
for (int i = 0; i < count; i++) {
int64_t id = sb->read_int64();
Tangible *t = tangible_get(id);
assert(t != nullptr);
t->anim_queue_.patch(sb);
}
}
LuaDefine(tangible_animstate, "c") {
LuaArg tanobj;
LuaRet graphic, plane, x, y, z, facing;

View File

@@ -7,6 +7,7 @@
#include "idalloc.hpp"
#include "animqueue.hpp"
#include "invocation.hpp"
#include "streambuffer.hpp"
#include "sched.hpp"
#include "source.hpp"
#include "gui.hpp"
@@ -76,49 +77,10 @@ public:
class World {
public:
// Type of model
util::WorldType world_type_;
// A lua intepreter with snapshot function.
//
LuaSnap lua_snap_;
// The Global ID Pool.
//
IdGlobalPool id_global_pool_;
// Source Database.
//
SourceDB source_db_;
PlaneMap plane_map_;
// Tangibles table.
//
std::unordered_map<int64_t, std::unique_ptr<Tangible>> tangibles_;
// Thread schedule: must include every thread, except
// for the one currently-executing thread.
//
Schedule thread_sched_;
// Serialized snapshot of world model.
StreamBuffer snapshot_;
// Redirects.
//
using IdVector = util::IdVector;
using TangibleVector = std::vector<Tangible*>;
using Redirects = std::map<int64_t, int64_t>;
Redirects redirects_;
void run_scheduled_threads(int64_t clk);
static void store_global_pointer(lua_State *L, World *w);
// Check if main thread has nothing on the stack
bool stack_is_clear() const { return lua_gettop(state()) == 0; }
// Invoke an action plan.
//
void invoke_plan(int64_t actor_id, int64_t place_id, const std::string &action, const InvocationData &idata);
public:
// Constructor.
//
// The constructor also calls 'lua_open' to create a new
@@ -145,7 +107,7 @@ public:
// Get a list of the tangibles that are near the player. If 'exclude_nowhere' is
// true, exclude any tangibles on the nowhere plane.
//
std::vector<int64_t> get_near(int64_t player_id, float radius, bool exclude_nowhere);
IdVector get_near(int64_t player_id, float radius, bool exclude_nowhere);
// Make a tangible.
//
@@ -159,6 +121,10 @@ public:
//
Tangible *tangible_get(int64_t id);
// Get pointers to many tangibles.
//
TangibleVector tangible_get_all(const IdVector &ids);
// Get a pointer to the specified tangible.
//
// The value on the lua stack should be a valid lua tangible. If not,
@@ -209,6 +175,92 @@ public:
//
void snapshot();
void rollback();
// Difference transmission.
//
// This generates diffs and stores them in the specified buffer,
// so that they can be sent to the client. It also applies the diffs
// to this model.
//
void difference_transmit(int64_t actor, World *master, StreamBuffer *sb);
// Apply differences.
//
// Note that difference_transmit applies the differences to the server
// synchronous model, so this is only used by the client synchronous model.
//
void apply_differences(int64_t actor, StreamBuffer *sb);
private:
// Run any threads which according to the scheduler queue are ready.
//
void run_scheduled_threads(int64_t clk);
// Store a pointer to a world model into a lua registry.
//
static void store_global_pointer(lua_State *L, World *w);
// Check that the main thread has nothing on the stack
//
bool stack_is_clear() const { return lua_gettop(state()) == 0; }
// Invoke a plan.
//
void invoke_plan(int64_t actor_id, int64_t place_id, const std::string &action, const InvocationData &idata);
// pass 1 of difference transmission: actor essentials.
//
// Before we do anything else, we need to get the actor in the right place.
// We also update the actor's ID allocation pipeline.
//
void diff_actor_essentials(int64_t actor_id, World *master, StreamBuffer *sb);
void patch_actor_essentials(StreamBuffer *sb);
// Pass 2 of difference transmission: visible animations.
//
// Synchronizes the animation status of every tangible inside the visibility
// radius of either model. Creates missing tangibles and deletes excess tangibles.
//
void diff_visible_animations(int64_t actor_id, World *master, StreamBuffer *sb);
void patch_visible_animations(StreamBuffer *sb);
private:
// Type of model
util::WorldType world_type_;
// A lua intepreter with snapshot function.
//
LuaSnap lua_snap_;
// The Global ID Pool.
//
IdGlobalPool id_global_pool_;
// Source Database.
//
SourceDB source_db_;
PlaneMap plane_map_;
// Tangibles table.
//
std::unordered_map<int64_t, std::unique_ptr<Tangible>> tangibles_;
// Thread schedule: must include every thread, except
// for the one currently-executing thread.
//
Schedule thread_sched_;
// Serialized snapshot of world model.
StreamBuffer snapshot_;
// Redirects.
//
Redirects redirects_;
friend class Tangible;
friend int tangible_animate(lua_State *L);
friend int tangible_build(lua_State *L);
friend int tangible_redirect(lua_State *L);
};
#endif // WORLD_HPP