From d5d4e0a650ad30b47e503ebfcb5e59b291eb787a Mon Sep 17 00:00:00 2001 From: teppy999 Date: Fri, 23 Jun 2023 16:27:23 -0400 Subject: [PATCH] Stub of FLpxSockets is now in place --- Source/Integration/Integration.Build.cs | 4 +- .../Integration/IntegrationGameModeBase.cpp | 106 ++++++++++-------- Source/Integration/IntegrationGameModeBase.h | 16 ++- Source/Integration/LuprexSockets.cpp | 101 +++++++++++++++++ Source/Integration/LuprexSockets.hpp | 43 +++++++ Source/Integration/engineutil.cpp | 1 + 6 files changed, 220 insertions(+), 51 deletions(-) create mode 100644 Source/Integration/LuprexSockets.cpp create mode 100644 Source/Integration/LuprexSockets.hpp diff --git a/Source/Integration/Integration.Build.cs b/Source/Integration/Integration.Build.cs index 0a11a6a7..3e3578b3 100644 --- a/Source/Integration/Integration.Build.cs +++ b/Source/Integration/Integration.Build.cs @@ -14,10 +14,12 @@ public class Integration : ModuleRules // Uncomment if you are using Slate UI // PrivateDependencyModuleNames.AddRange(new string[] { "Slate", "SlateCore" }); - + // Uncomment if you are using online features // PrivateDependencyModuleNames.Add("OnlineSubsystem"); // To include OnlineSubsystemSteam, add it to the plugins section in your uproject file with the Enabled attribute set to true + + AddEngineThirdPartyPrivateStaticDependencies(Target, "OpenSSL"); } } diff --git a/Source/Integration/IntegrationGameModeBase.cpp b/Source/Integration/IntegrationGameModeBase.cpp index c5128fca..ada78bea 100644 --- a/Source/Integration/IntegrationGameModeBase.cpp +++ b/Source/Integration/IntegrationGameModeBase.cpp @@ -6,10 +6,6 @@ #include #include - - -EngineWrapper AIntegrationGameModeBase::Luprex; - //// Main loop. //while (!engw.get_stop_driver(&engw)) { // handle_lua_source(); @@ -27,32 +23,17 @@ AIntegrationGameModeBase::AIntegrationGameModeBase() ThreadStopRequested = false; EngineSeconds = 0.0; NextThreadTrigger = 1.0; + ThreadEvent = nullptr; //PrimaryActorTick.bCanEverTick = true; // Probably wrong //PrimaryActorTick.bTickEvenWhenPaused = true; // Probably wrong //PrimaryActorTick.TickGroup = TG_PrePhysics; // Probably wrong SetActorTickEnabled(true); SetActorTickInterval(0.05f); - ThreadEvent = FPlatformProcess::GetSynchEventFromPool(false); } AIntegrationGameModeBase::~AIntegrationGameModeBase() { - WaitForThread(); - FPlatformProcess::ReturnSynchEventToPool(ThreadEvent); - ThreadEvent = nullptr; -} - -void AIntegrationGameModeBase::HandleLuprexConsoleOutput() -{ - uint32_t ndata; const char* data; - Luprex.get_outgoing(&Luprex, 0, &ndata, &data); - if (ndata == 0) return; - std::string_view src(data, ndata); - int consumed; - std::u16string cps = drvutil::utf8_to_ucs2(src, &consumed); - Luprex.play_sent_outgoing(&Luprex, 0, consumed); - FString fs(cps.size(), (const UCS2CHAR*)(&cps[0])); - ConsoleOutput.Append(fs); + ResetToInitialState(); } // Run routine called by the worker thread. @@ -72,20 +53,53 @@ uint32 AIntegrationGameModeBase::Run() engineutil::DPrint("Thread triggered."); { FScopeLock lk(&LuprexMutex); + Sockets->Update(); Luprex.play_invoke_event_update(&Luprex, EngineSeconds); } } return 0; } -#pragma optimize( "", off ) -void AIntegrationGameModeBase::WaitForThread() +void AIntegrationGameModeBase::ResetToInitialState() { - if (Thread == nullptr) return; - ThreadStopRequested = true; - ThreadEvent->Trigger(); - delete Thread; // This waits for the thread to complete. - Thread = nullptr; + // Shut down the thread and release the ThreadEvent + if (Thread != nullptr) + { + ThreadStopRequested = true; + ThreadEvent->Trigger(); + delete Thread; // This waits for the thread to complete. + Thread = nullptr; + FPlatformProcess::ReturnSynchEventToPool(ThreadEvent); + ThreadEvent = nullptr; + } + ThreadStopRequested = false; + + // Delete the engine. + if (Luprex.release != nullptr) + { + Luprex.release(&Luprex); + } + + // Release and close all sockets. + Sockets.Reset(); + + // Reset the clocks. + EngineSeconds = 0; + NextThreadTrigger = 1.0; +} + + +void AIntegrationGameModeBase::HandleLuprexConsoleOutput() +{ + uint32_t ndata; const char* data; + Luprex.get_outgoing(&Luprex, 0, &ndata, &data); + if (ndata == 0) return; + std::string_view src(data, ndata); + int consumed; + std::u16string cps = drvutil::utf8_to_ucs2(src, &consumed); + Luprex.play_sent_outgoing(&Luprex, 0, consumed); + FString fs(cps.size(), (const UCS2CHAR*)(&cps[0])); + ConsoleOutput.Append(fs); } void AIntegrationGameModeBase::Tick(float DeltaSeconds) @@ -93,8 +107,11 @@ void AIntegrationGameModeBase::Tick(float DeltaSeconds) Super::Tick(DeltaSeconds); { FScopeLock lk(&LuprexMutex); - EngineSeconds += DeltaSeconds; - HandleLuprexConsoleOutput(); + if (Luprex.engine != nullptr) + { + EngineSeconds += DeltaSeconds; + HandleLuprexConsoleOutput(); + } } TArray prints = engineutil::DPrintGetStored(); for (const FString& fs : prints) { @@ -133,31 +150,28 @@ void AIntegrationGameModeBase::BeginPlay() { Super::BeginPlay(); - // There should be no thread at this point. + // Sanity checks. checkf(Thread == nullptr, TEXT("There should be no thread here.")); + checkf(Luprex.engine == nullptr, TEXT("There should be no engine here.")); - // Reinitialize simple state. - EngineSeconds = 0; - NextThreadTrigger = 1.0; - ThreadStopRequested = false; - ThreadEvent->Wait(0); // Clear the event if set. + // Make sure we're starting from a clean slate. + ResetToInitialState(); // Try to initialize the wrapper. if (Luprex.play_initialize == nullptr) { engineutil::init_wrapper(&Luprex); - if (Luprex.play_initialize == nullptr) - { - engineutil::DPrint("Luprex wrapper initialization failed"); - } + } + + // If we failed to initialize the wrapper, print an error message. + if (Luprex.play_initialize == nullptr) + { + engineutil::DPrint("Luprex wrapper initialization failed"); } // If wrapper is initialized, try to initialize the luprex engine. if (Luprex.play_initialize != nullptr) { - Luprex.release(&Luprex); - Luprex.hook_dprint(engineutil::DPrintHook); - drvutil::ostringstream srcpak; std::string srcpakerr = drvutil::package_lua_source("c:\\Luprex", &srcpak); if (!srcpakerr.empty()) @@ -178,16 +192,18 @@ void AIntegrationGameModeBase::BeginPlay() } } - // If we successfully created a Luprex engine, create a worker thread. + // If we successfully created a luprex engine, create a socket system and a worker thread. if (Luprex.engine != nullptr) { + Sockets.Reset(FLpxSockets::Create(&Luprex)); + ThreadEvent = FPlatformProcess::GetSynchEventFromPool(false); Thread = FRunnableThread::Create(this, TEXT("Worker Thread")); } } void AIntegrationGameModeBase::EndPlay(const EEndPlayReason::Type EndPlayReason) { - WaitForThread(); + ResetToInitialState(); } diff --git a/Source/Integration/IntegrationGameModeBase.h b/Source/Integration/IntegrationGameModeBase.h index a537899e..618d2eb2 100644 --- a/Source/Integration/IntegrationGameModeBase.h +++ b/Source/Integration/IntegrationGameModeBase.h @@ -6,6 +6,7 @@ #include "GameFramework/GameModeBase.h" #include "enginewrapper.hpp" #include "engineutil.hpp" +#include "LuprexSockets.hpp" #include "IntegrationGameModeBase.generated.h" /** @@ -23,8 +24,9 @@ public: virtual void Tick(float) override; virtual void EndPlay(const EEndPlayReason::Type EndPlayReason); - // Thread shutdown and cleanup. Called by Blueprint thread. - void WaitForThread(); + // Delete all the state created in BeginPlay. That + // includes: the Luprex engine, the thread, and the socket state. + void ResetToInitialState(); // Method of FRunnable, called by the Luprex thread. virtual uint32 Run() override; @@ -58,9 +60,13 @@ public: // This critical section guards the use of the EngineWrapper. FCriticalSection LuprexMutex; - // The Luprex wrapper and engine. You must claim the LuprexMutex - // before touching any of these variables. - static EngineWrapper Luprex; + // The Luprex wrapper and engine. MUST CLAIM LuprexMutex. + EngineWrapper Luprex; + + // Luprex socket system. Aside from construction, only touched by Luprex thread. + TUniquePtr Sockets; + + // Amount of elapsed time. float EngineSeconds; // When do we next trigger the thread event (relative to EngineSeconds). diff --git a/Source/Integration/LuprexSockets.cpp b/Source/Integration/LuprexSockets.cpp new file mode 100644 index 00000000..e42a1302 --- /dev/null +++ b/Source/Integration/LuprexSockets.cpp @@ -0,0 +1,101 @@ + +#include "LuprexSockets.hpp" + +#define UI UI_ST +THIRD_PARTY_INCLUDES_START +#include +#include +#include +#include +#include +#include +#include +#include +THIRD_PARTY_INCLUDES_END +#undef UI + + +enum EChanState { + CHAN_INACTIVE, + CHAN_PLAINTEXT, + CHAN_SSL_CONNECTING, + CHAN_SSL_ACCEPTING, + CHAN_SSL_READWRITE, +}; + +// A communication socket. +class FLpxChannel +{ + int ChannelID; + FSocket* Socket; + SSL* SSLState; + BIO* RecvBIO; + BIO* SendBIO; + + // If recent_error is set, that means that a recent IO operation generated + // an error. As a special case, EOF on read is considered an error, we use + // the string "EOF" for this case. + std::string RecentError; + + // OpenSSL has a rule: if you try to SSL_write and it returns + // SSL_ERROR_WANT_READ, then you have to retry the write with the same + // number of bytes. In this event, we record how many bytes we + // attempted to write, which will enable us to retry. + int RetryWriteNBytes; + + // True if the channel needs to be advanced. + bool NeedAdvance; + + EChanState State; + uint32_t NBytes; + const char* Bytes; +}; + +// A port-listening socket. +class FLpxListener +{ + int BoundPort; + FSocket* Socket; +}; + +class FLpxSocketsI : public FLpxSockets +{ +public: + // We don't own the wrapper, we just have a pointer to it. + // We require a guarantee that it outlives us. + EngineWrapper* Luprex; + + TArray Channels; + TArray Listeners; + + + SSL_CTX* ServerCTX; + SSL_CTX* ClientSecureCTX; + SSL_CTX* ClientInsecureCTX; + + FLpxSocketsI(EngineWrapper* w); + virtual ~FLpxSocketsI() override; + + virtual void Update() override; +}; + +FLpxSocketsI::FLpxSocketsI(EngineWrapper *w) +{ + Luprex = w; + ServerCTX = nullptr; + ClientSecureCTX = nullptr; + ClientInsecureCTX = nullptr; +} + +FLpxSocketsI::~FLpxSocketsI() +{ +} + +void FLpxSocketsI::Update() +{ +} + +FLpxSockets* FLpxSockets::Create(EngineWrapper* w) +{ + return new FLpxSocketsI(w); +} diff --git a/Source/Integration/LuprexSockets.hpp b/Source/Integration/LuprexSockets.hpp new file mode 100644 index 00000000..5b6b1af5 --- /dev/null +++ b/Source/Integration/LuprexSockets.hpp @@ -0,0 +1,43 @@ +#pragma once + +#include "CoreMinimal.h" + +// Class FLpxSockets +// +// This class is responsible for creating outgoing and incoming +// sockets for the Luprex engine. It then relays data into and out +// of the Luprex engine. +// +// It only has one interesting method: Update(). This one method +// does all the communication for the Luprex engine. Note that +// there aren't any methods where Unreal can query the state +// of the FLpxSockets. That's because this class is for +// communication between Luprex and outside servers and clients, +// NOT for communication with Unreal. +// +// The FLpxSockets keeps a pointer to the EngineWrapper. +// The EngineWrapper must outlive the FLpxSockets. +// +// This class is an abstract base class. A derived class +// implements all the behavior. +// + +class FLpxSockets; +struct EngineWrapper; + +class FLpxSockets +{ +protected: + FLpxSockets() {} + +public: + // Create the Luprex socket system. + static FLpxSockets* Create(EngineWrapper *w); + + // Delete the luprex socket system. Cleanly closes all sockets. + virtual ~FLpxSockets() {} + + // The update routine relays data into and out of + // luprex via TCP/IP sockets. + virtual void Update() = 0; +}; \ No newline at end of file diff --git a/Source/Integration/engineutil.cpp b/Source/Integration/engineutil.cpp index 497e8b52..0e2635c8 100644 --- a/Source/Integration/engineutil.cpp +++ b/Source/Integration/engineutil.cpp @@ -10,6 +10,7 @@ void init_wrapper(EngineWrapper* w) { InitFn init = (InitFn)FPlatformProcess::GetDllExport(DLL, TEXT("init_engine_wrapper")); if (init != nullptr) { init(w); + w->hook_dprint(engineutil::DPrintHook); } } }