Stub of FLpxSockets is now in place

This commit is contained in:
2023-06-23 16:27:23 -04:00
parent 42f3e02ec2
commit d5d4e0a650
6 changed files with 220 additions and 51 deletions

View File

@@ -14,10 +14,12 @@ public class Integration : ModuleRules
// Uncomment if you are using Slate UI
// PrivateDependencyModuleNames.AddRange(new string[] { "Slate", "SlateCore" });
// Uncomment if you are using online features
// PrivateDependencyModuleNames.Add("OnlineSubsystem");
// To include OnlineSubsystemSteam, add it to the plugins section in your uproject file with the Enabled attribute set to true
AddEngineThirdPartyPrivateStaticDependencies(Target, "OpenSSL");
}
}

View File

@@ -6,10 +6,6 @@
#include <string>
#include <string_view>
EngineWrapper AIntegrationGameModeBase::Luprex;
//// Main loop.
//while (!engw.get_stop_driver(&engw)) {
// handle_lua_source();
@@ -27,32 +23,17 @@ AIntegrationGameModeBase::AIntegrationGameModeBase()
ThreadStopRequested = false;
EngineSeconds = 0.0;
NextThreadTrigger = 1.0;
ThreadEvent = nullptr;
//PrimaryActorTick.bCanEverTick = true; // Probably wrong
//PrimaryActorTick.bTickEvenWhenPaused = true; // Probably wrong
//PrimaryActorTick.TickGroup = TG_PrePhysics; // Probably wrong
SetActorTickEnabled(true);
SetActorTickInterval(0.05f);
ThreadEvent = FPlatformProcess::GetSynchEventFromPool(false);
}
AIntegrationGameModeBase::~AIntegrationGameModeBase()
{
WaitForThread();
FPlatformProcess::ReturnSynchEventToPool(ThreadEvent);
ThreadEvent = nullptr;
}
void AIntegrationGameModeBase::HandleLuprexConsoleOutput()
{
uint32_t ndata; const char* data;
Luprex.get_outgoing(&Luprex, 0, &ndata, &data);
if (ndata == 0) return;
std::string_view src(data, ndata);
int consumed;
std::u16string cps = drvutil::utf8_to_ucs2(src, &consumed);
Luprex.play_sent_outgoing(&Luprex, 0, consumed);
FString fs(cps.size(), (const UCS2CHAR*)(&cps[0]));
ConsoleOutput.Append(fs);
ResetToInitialState();
}
// Run routine called by the worker thread.
@@ -72,20 +53,53 @@ uint32 AIntegrationGameModeBase::Run()
engineutil::DPrint("Thread triggered.");
{
FScopeLock lk(&LuprexMutex);
Sockets->Update();
Luprex.play_invoke_event_update(&Luprex, EngineSeconds);
}
}
return 0;
}
#pragma optimize( "", off )
void AIntegrationGameModeBase::WaitForThread()
void AIntegrationGameModeBase::ResetToInitialState()
{
if (Thread == nullptr) return;
ThreadStopRequested = true;
ThreadEvent->Trigger();
delete Thread; // This waits for the thread to complete.
Thread = nullptr;
// Shut down the thread and release the ThreadEvent
if (Thread != nullptr)
{
ThreadStopRequested = true;
ThreadEvent->Trigger();
delete Thread; // This waits for the thread to complete.
Thread = nullptr;
FPlatformProcess::ReturnSynchEventToPool(ThreadEvent);
ThreadEvent = nullptr;
}
ThreadStopRequested = false;
// Delete the engine.
if (Luprex.release != nullptr)
{
Luprex.release(&Luprex);
}
// Release and close all sockets.
Sockets.Reset();
// Reset the clocks.
EngineSeconds = 0;
NextThreadTrigger = 1.0;
}
void AIntegrationGameModeBase::HandleLuprexConsoleOutput()
{
uint32_t ndata; const char* data;
Luprex.get_outgoing(&Luprex, 0, &ndata, &data);
if (ndata == 0) return;
std::string_view src(data, ndata);
int consumed;
std::u16string cps = drvutil::utf8_to_ucs2(src, &consumed);
Luprex.play_sent_outgoing(&Luprex, 0, consumed);
FString fs(cps.size(), (const UCS2CHAR*)(&cps[0]));
ConsoleOutput.Append(fs);
}
void AIntegrationGameModeBase::Tick(float DeltaSeconds)
@@ -93,8 +107,11 @@ void AIntegrationGameModeBase::Tick(float DeltaSeconds)
Super::Tick(DeltaSeconds);
{
FScopeLock lk(&LuprexMutex);
EngineSeconds += DeltaSeconds;
HandleLuprexConsoleOutput();
if (Luprex.engine != nullptr)
{
EngineSeconds += DeltaSeconds;
HandleLuprexConsoleOutput();
}
}
TArray<FString> prints = engineutil::DPrintGetStored();
for (const FString& fs : prints) {
@@ -133,31 +150,28 @@ void AIntegrationGameModeBase::BeginPlay()
{
Super::BeginPlay();
// There should be no thread at this point.
// Sanity checks.
checkf(Thread == nullptr, TEXT("There should be no thread here."));
checkf(Luprex.engine == nullptr, TEXT("There should be no engine here."));
// Reinitialize simple state.
EngineSeconds = 0;
NextThreadTrigger = 1.0;
ThreadStopRequested = false;
ThreadEvent->Wait(0); // Clear the event if set.
// Make sure we're starting from a clean slate.
ResetToInitialState();
// Try to initialize the wrapper.
if (Luprex.play_initialize == nullptr)
{
engineutil::init_wrapper(&Luprex);
if (Luprex.play_initialize == nullptr)
{
engineutil::DPrint("Luprex wrapper initialization failed");
}
}
// If we failed to initialize the wrapper, print an error message.
if (Luprex.play_initialize == nullptr)
{
engineutil::DPrint("Luprex wrapper initialization failed");
}
// If wrapper is initialized, try to initialize the luprex engine.
if (Luprex.play_initialize != nullptr)
{
Luprex.release(&Luprex);
Luprex.hook_dprint(engineutil::DPrintHook);
drvutil::ostringstream srcpak;
std::string srcpakerr = drvutil::package_lua_source("c:\\Luprex", &srcpak);
if (!srcpakerr.empty())
@@ -178,16 +192,18 @@ void AIntegrationGameModeBase::BeginPlay()
}
}
// If we successfully created a Luprex engine, create a worker thread.
// If we successfully created a luprex engine, create a socket system and a worker thread.
if (Luprex.engine != nullptr)
{
Sockets.Reset(FLpxSockets::Create(&Luprex));
ThreadEvent = FPlatformProcess::GetSynchEventFromPool(false);
Thread = FRunnableThread::Create(this, TEXT("Worker Thread"));
}
}
void AIntegrationGameModeBase::EndPlay(const EEndPlayReason::Type EndPlayReason)
{
WaitForThread();
ResetToInitialState();
}

View File

@@ -6,6 +6,7 @@
#include "GameFramework/GameModeBase.h"
#include "enginewrapper.hpp"
#include "engineutil.hpp"
#include "LuprexSockets.hpp"
#include "IntegrationGameModeBase.generated.h"
/**
@@ -23,8 +24,9 @@ public:
virtual void Tick(float) override;
virtual void EndPlay(const EEndPlayReason::Type EndPlayReason);
// Thread shutdown and cleanup. Called by Blueprint thread.
void WaitForThread();
// Delete all the state created in BeginPlay. That
// includes: the Luprex engine, the thread, and the socket state.
void ResetToInitialState();
// Method of FRunnable, called by the Luprex thread.
virtual uint32 Run() override;
@@ -58,9 +60,13 @@ public:
// This critical section guards the use of the EngineWrapper.
FCriticalSection LuprexMutex;
// The Luprex wrapper and engine. You must claim the LuprexMutex
// before touching any of these variables.
static EngineWrapper Luprex;
// The Luprex wrapper and engine. MUST CLAIM LuprexMutex.
EngineWrapper Luprex;
// Luprex socket system. Aside from construction, only touched by Luprex thread.
TUniquePtr<FLpxSockets> Sockets;
// Amount of elapsed time.
float EngineSeconds;
// When do we next trigger the thread event (relative to EngineSeconds).

View File

@@ -0,0 +1,101 @@
#include "LuprexSockets.hpp"
#define UI UI_ST
THIRD_PARTY_INCLUDES_START
#include <openssl/ssl.h>
#include <openssl/rsa.h>
#include <openssl/x509.h>
#include <openssl/evp.h>
#include <openssl/err.h>
#include <openssl/bio.h>
#include <openssl/pem.h>
#include <openssl/conf.h>
THIRD_PARTY_INCLUDES_END
#undef UI
enum EChanState {
CHAN_INACTIVE,
CHAN_PLAINTEXT,
CHAN_SSL_CONNECTING,
CHAN_SSL_ACCEPTING,
CHAN_SSL_READWRITE,
};
// A communication socket.
class FLpxChannel
{
int ChannelID;
FSocket* Socket;
SSL* SSLState;
BIO* RecvBIO;
BIO* SendBIO;
// If recent_error is set, that means that a recent IO operation generated
// an error. As a special case, EOF on read is considered an error, we use
// the string "EOF" for this case.
std::string RecentError;
// OpenSSL has a rule: if you try to SSL_write and it returns
// SSL_ERROR_WANT_READ, then you have to retry the write with the same
// number of bytes. In this event, we record how many bytes we
// attempted to write, which will enable us to retry.
int RetryWriteNBytes;
// True if the channel needs to be advanced.
bool NeedAdvance;
EChanState State;
uint32_t NBytes;
const char* Bytes;
};
// A port-listening socket.
class FLpxListener
{
int BoundPort;
FSocket* Socket;
};
class FLpxSocketsI : public FLpxSockets
{
public:
// We don't own the wrapper, we just have a pointer to it.
// We require a guarantee that it outlives us.
EngineWrapper* Luprex;
TArray<FLpxChannel> Channels;
TArray<FLpxListener> Listeners;
SSL_CTX* ServerCTX;
SSL_CTX* ClientSecureCTX;
SSL_CTX* ClientInsecureCTX;
FLpxSocketsI(EngineWrapper* w);
virtual ~FLpxSocketsI() override;
virtual void Update() override;
};
FLpxSocketsI::FLpxSocketsI(EngineWrapper *w)
{
Luprex = w;
ServerCTX = nullptr;
ClientSecureCTX = nullptr;
ClientInsecureCTX = nullptr;
}
FLpxSocketsI::~FLpxSocketsI()
{
}
void FLpxSocketsI::Update()
{
}
FLpxSockets* FLpxSockets::Create(EngineWrapper* w)
{
return new FLpxSocketsI(w);
}

View File

@@ -0,0 +1,43 @@
#pragma once
#include "CoreMinimal.h"
// Class FLpxSockets
//
// This class is responsible for creating outgoing and incoming
// sockets for the Luprex engine. It then relays data into and out
// of the Luprex engine.
//
// It only has one interesting method: Update(). This one method
// does all the communication for the Luprex engine. Note that
// there aren't any methods where Unreal can query the state
// of the FLpxSockets. That's because this class is for
// communication between Luprex and outside servers and clients,
// NOT for communication with Unreal.
//
// The FLpxSockets keeps a pointer to the EngineWrapper.
// The EngineWrapper must outlive the FLpxSockets.
//
// This class is an abstract base class. A derived class
// implements all the behavior.
//
class FLpxSockets;
struct EngineWrapper;
class FLpxSockets
{
protected:
FLpxSockets() {}
public:
// Create the Luprex socket system.
static FLpxSockets* Create(EngineWrapper *w);
// Delete the luprex socket system. Cleanly closes all sockets.
virtual ~FLpxSockets() {}
// The update routine relays data into and out of
// luprex via TCP/IP sockets.
virtual void Update() = 0;
};

View File

@@ -10,6 +10,7 @@ void init_wrapper(EngineWrapper* w) {
InitFn init = (InitFn)FPlatformProcess::GetDllExport(DLL, TEXT("init_engine_wrapper"));
if (init != nullptr) {
init(w);
w->hook_dprint(engineutil::DPrintHook);
}
}
}