Refactor all thread stuff into its own file

This commit is contained in:
2023-09-04 23:19:10 -04:00
parent 01c37cf2c9
commit cfc536011e
4 changed files with 190 additions and 55 deletions

View File

@@ -11,11 +11,8 @@ using namespace DebugPrint;
AIntegrationGameModeBase::AIntegrationGameModeBase() AIntegrationGameModeBase::AIntegrationGameModeBase()
{ {
Thread = nullptr;
ThreadStopRequested = false;
EngineSeconds = 0.0; EngineSeconds = 0.0;
NextThreadTrigger = 1.0; NextThreadTrigger = 1.0;
ThreadEvent = nullptr;
//PrimaryActorTick.bCanEverTick = true; // Probably wrong //PrimaryActorTick.bCanEverTick = true; // Probably wrong
//PrimaryActorTick.bTickEvenWhenPaused = true; // Probably wrong //PrimaryActorTick.bTickEvenWhenPaused = true; // Probably wrong
//PrimaryActorTick.TickGroup = TG_PrePhysics; // Probably wrong //PrimaryActorTick.TickGroup = TG_PrePhysics; // Probably wrong
@@ -29,43 +26,21 @@ AIntegrationGameModeBase::~AIntegrationGameModeBase()
ResetToInitialState(); ResetToInitialState();
} }
// Run routine called by the worker thread. // This method runs in the background thread,
uint32 AIntegrationGameModeBase::Run() // at the moment we trigger it.
{ //
while (true) uint32 AIntegrationGameModeBase::Run() {
{ FLockedWrapper lockedwrap(LockableWrapper);
bool triggered = ThreadEvent->Wait(3000); Sockets->Update(lockedwrap);
if (ThreadStopRequested) { lockedwrap->play_invoke_event_update(lockedwrap.Get(), EngineSeconds);
DPrint("Thread stopping as requested"); Sockets->Update(lockedwrap);
break;
}
if (!triggered) {
DPrint("Thread waiting a long time...");
continue;
}
{
FLockedWrapper lockedwrap(LockableWrapper);
Sockets->Update(lockedwrap);
lockedwrap->play_invoke_event_update(lockedwrap.Get(), EngineSeconds);
Sockets->Update(lockedwrap);
}
}
return 0; return 0;
} }
void AIntegrationGameModeBase::ResetToInitialState() void AIntegrationGameModeBase::ResetToInitialState()
{ {
// Shut down the thread and release the ThreadEvent // Shut down the thread
if (Thread != nullptr) LuprexUpdateTask.Shutdown();
{
ThreadStopRequested = true;
ThreadEvent->Trigger();
delete Thread; // This waits for the thread to complete.
Thread = nullptr;
FPlatformProcess::ReturnSynchEventToPool(ThreadEvent);
ThreadEvent = nullptr;
}
ThreadStopRequested = false;
// Now that the thread's gone, we should be able to // Now that the thread's gone, we should be able to
// just claim and hold the lock on the wrapper. // just claim and hold the lock on the wrapper.
@@ -123,9 +98,9 @@ void AIntegrationGameModeBase::Tick(float DeltaSeconds)
ConsoleSetOutput(ConsoleOutput.Get()); ConsoleSetOutput(ConsoleOutput.Get());
ConsoleOutput.ClearDirty(); ConsoleOutput.ClearDirty();
} }
if ((Thread != nullptr) && (EngineSeconds >= NextThreadTrigger)) if (EngineSeconds >= NextThreadTrigger)
{ {
ThreadEvent->Trigger(); LuprexUpdateTask.Trigger();
NextThreadTrigger += 0.05; NextThreadTrigger += 0.05;
} }
} }
@@ -162,7 +137,7 @@ void AIntegrationGameModeBase::BeginPlay()
FLockedWrapper w(LockableWrapper); FLockedWrapper w(LockableWrapper);
// Sanity checks. Make sure everything is clean. // Sanity checks. Make sure everything is clean.
checkf(Thread == nullptr, TEXT("There should be no thread here.")); checkf(!LuprexUpdateTask.IsRunning(), TEXT("There should be no thread here."));
checkf(w->engine == nullptr, TEXT("There should be no engine here.")); checkf(w->engine == nullptr, TEXT("There should be no engine here."));
// Try to initialize the wrapper. // Try to initialize the wrapper.
@@ -203,8 +178,7 @@ void AIntegrationGameModeBase::BeginPlay()
Sockets.Reset(FLpxSockets::Create(w)); Sockets.Reset(FLpxSockets::Create(w));
std::string error = Sockets->GetError(); std::string error = Sockets->GetError();
check(error.empty()); check(error.empty());
ThreadEvent = FPlatformProcess::GetSynchEventFromPool(false); LuprexUpdateTask.Startup(this);
Thread = FRunnableThread::Create(this, TEXT("Worker Thread"));
} }
// Create a tangible. // Create a tangible.

View File

@@ -8,8 +8,10 @@
#include "DebugPrint.h" #include "DebugPrint.h"
#include "TangibleManager.h" #include "TangibleManager.h"
#include "LuprexSockets.h" #include "LuprexSockets.h"
#include "TriggeredTask.h"
#include "IntegrationGameModeBase.generated.h" #include "IntegrationGameModeBase.generated.h"
/** /**
* *
*/ */
@@ -29,9 +31,6 @@ public:
// includes: the Luprex engine, the thread, and the socket state. // includes: the Luprex engine, the thread, and the socket state.
void ResetToInitialState(); void ResetToInitialState();
// Method of FRunnable, called by the Luprex thread.
virtual uint32 Run() override;
// Set the entire contents of the console output box. // Set the entire contents of the console output box.
UFUNCTION(BlueprintImplementableEvent) UFUNCTION(BlueprintImplementableEvent)
void ConsoleSetOutput(const FString& text); void ConsoleSetOutput(const FString& text);
@@ -47,28 +46,23 @@ public:
// Transfer console output from the Luprex engine to unreal. // Transfer console output from the Luprex engine to unreal.
void HandleLuprexConsoleOutput(FLockedWrapper &w); void HandleLuprexConsoleOutput(FLockedWrapper &w);
// The run function is called by a background thread
// to update luprex sockets and update luprex itself.
virtual uint32 Run() override;
UPROPERTY() UPROPERTY()
FTangibleManager TangibleManager; FTangibleManager TangibleManager;
// This stores the entire text currently visible in the console. // This stores the entire text currently visible in the console.
FConsoleOutput ConsoleOutput; FConsoleOutput ConsoleOutput;
// The worker thread.
FRunnableThread *Thread;
// Used to tell the worker thread to stop.
bool ThreadStopRequested;
// This event is used to wake up the Luprex thread. Normally,
// this means we want the worker to do one processing step. But
// if ThreadStopRequested is true, it means we want the thread
// to exit.
FEvent* ThreadEvent;
// The Luprex EngineWrapper, with a Mutex to protect it. // The Luprex EngineWrapper, with a Mutex to protect it.
// To access it, construct a FLockedWrapper. // To access it, construct a FLockedWrapper.
FLockableWrapper LockableWrapper; FLockableWrapper LockableWrapper;
// This utility runs the luprex update and socket update in a thread.
FTriggeredTask LuprexUpdateTask;
// Luprex socket system. Aside from construction, only touched by Luprex thread. // Luprex socket system. Aside from construction, only touched by Luprex thread.
TUniquePtr<FLpxSockets> Sockets; TUniquePtr<FLpxSockets> Sockets;

View File

@@ -0,0 +1,63 @@
#include "TriggeredTask.h"
#include "DebugPrint.h"
using namespace DebugPrint;
FTriggeredTask::FTriggeredTask() {
Client = nullptr;
Thread = nullptr;
ThreadStopRequested = false;
ThreadEvent = nullptr;
}
uint32 FTriggeredTask::Run() {
while (true)
{
bool triggered = ThreadEvent->Wait(3000);
if (ThreadStopRequested) {
DPrint("Thread stopping as requested");
break;
}
if (!triggered) {
DPrint("Thread waiting a long time...");
continue;
}
// The payload.
Client->Run();
}
return 0;
}
void FTriggeredTask::Startup(FRunnable *client) {
FScopeLock lock(&Mutex);
if (Thread == nullptr) {
Client = client;
ThreadEvent = FPlatformProcess::GetSynchEventFromPool(false);
Thread = FRunnableThread::Create(this, TEXT("Worker Thread"));
}
}
void FTriggeredTask::Shutdown() {
FScopeLock lock(&Mutex);
if (Thread != nullptr) {
ThreadStopRequested = true;
ThreadEvent->Trigger();
delete Thread; // This waits for the thread to complete.
Thread = nullptr;
FPlatformProcess::ReturnSynchEventToPool(ThreadEvent);
ThreadEvent = nullptr;
}
ThreadStopRequested = false;
}
void FTriggeredTask::Trigger() {
FScopeLock lock(&Mutex);
if (Thread != nullptr) {
ThreadEvent->Trigger();
}
}
bool FTriggeredTask::IsRunning() {
FScopeLock lock(&Mutex);
return (Thread != nullptr);
}

View File

@@ -0,0 +1,104 @@
#include "CoreMinimal.h"
///////////////////////////////////////////////
//
// TRIGGERED TASKS
//
// This is the situation where this class is
// useful:
//
// * You have a function to run in the background.
// * It should start the instant a foreground thread says "NOW".
// * It should be run exactly once for each "NOW".
//
// To use this class, construct an FTriggeredTask,
// and provide a runnable object. The runnable
// object's "Run" method will get executed in
// each time you call 'Trigger' on the FTriggeredTask.
//
// The run method will never get called reentrantly,
// even if you call 'Trigger' rapidly in succession.
//
///////////////////////////////////////////////
class FTriggeredTask : public FRunnable
{
private:
// Mutex used by control routines.
//
FCriticalSection Mutex;
// The worker thread.
//
FRunnableThread* Thread;
// Used to tell the worker thread to stop.
//
bool ThreadStopRequested;
// This event is used to wake up the thread.
//
// Normally, this means we want the worker to run the task
// once. But if ThreadStopRequested is true, it means we
// want the thread to exit.
//
FEvent* ThreadEvent;
// The client whose task we're triggering.
//
FRunnable* Client;
private:
/////////////////////////////////////////////
//
// This section contains routines that are
// executed by the thread itself.
//
/////////////////////////////////////////////
// Method of FRunnable, called by the Luprex thread.
//
virtual uint32 Run() override;
public:
/////////////////////////////////////////////
//
// This section contains thread control routines.
// These are not invoked by the thread, but by the
// outside entity that started the thread.
//
/////////////////////////////////////////////
// Constructor.
//
// The client must outlive the FTriggeredTask.
//
FTriggeredTask();
// Startup.
//
// Bring the system online, put it in ready mode.
//
void Startup(FRunnable* client);
// Shutdown.
//
// Bring the system down, deallocate all threads.
//
void Shutdown();
// Trigger
//
// Run the client's 'RUN' method once, in the
// background. Trigger is a no-op if the system
// is shut down. If you trigger when the task
// is already running, a trigger may get dropped.
//
void Trigger();
// IsRunning
//
bool IsRunning();
};