Harden UE Wingman request handling and numeric property conversion.
Switch the Wingman protocol to null-delimited JSON, rework the server's socket buffering and send logic, and document the bugs found during the review. Also refactor WingProperty's numeric setters into clearer helper paths while preserving the existing conversion rules.
This commit is contained in:
@@ -384,70 +384,157 @@ void UWingServer::CleanupFinishedClients()
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// Stuff Performed on the Client Thread
|
||||
// ============================================================
|
||||
|
||||
void UWingServer::ClientThreadFunc(UWingServer* Server, TSharedPtr<FClientConnection> Client)
|
||||
{
|
||||
constexpr int32 MaxRecvBufBytes = 1024 * 1024;
|
||||
constexpr int32 MinUnusedRecvSpace = 4096;
|
||||
|
||||
FSocket* Socket = Client->Socket;
|
||||
FString LineBuffer;
|
||||
uint8 RecvBuf[4096];
|
||||
TArray<uint8> RecvBuf;
|
||||
RecvBuf.SetNumUninitialized(MinUnusedRecvSpace);
|
||||
int32 RecvLen = 0;
|
||||
|
||||
WaitForAssetRegistry();
|
||||
|
||||
while (true)
|
||||
{
|
||||
int32 BytesRead = 0;
|
||||
if (!Socket->Recv(RecvBuf, sizeof(RecvBuf) - 1, BytesRead))
|
||||
FString Request;
|
||||
if (ExtractRequestFromBuffer(RecvBuf, RecvLen, Request))
|
||||
{
|
||||
break; // socket error or closed
|
||||
}
|
||||
if (BytesRead <= 0)
|
||||
{
|
||||
break; // connection closed
|
||||
}
|
||||
|
||||
RecvBuf[BytesRead] = 0;
|
||||
LineBuffer += UTF8_TO_TCHAR((const ANSICHAR*)RecvBuf);
|
||||
|
||||
// Process complete lines
|
||||
int32 NewlineIdx;
|
||||
while (LineBuffer.FindChar(TEXT('\n'), NewlineIdx))
|
||||
{
|
||||
FString Line = LineBuffer.Left(NewlineIdx).TrimEnd();
|
||||
LineBuffer.RightChopInline(NewlineIdx + 1);
|
||||
|
||||
if (Line.IsEmpty()) continue;
|
||||
|
||||
// Wait for the asset registry to finish its initial scan.
|
||||
FString Response;
|
||||
if (!ProcessRequestOnGameThread(Request, Response))
|
||||
{
|
||||
IAssetRegistry& AR = FModuleManager::LoadModuleChecked<FAssetRegistryModule>("AssetRegistry").Get();
|
||||
while (AR.IsLoadingAssets()) FPlatformProcess::Sleep(0.25f);
|
||||
Client->bDone = true;
|
||||
return;
|
||||
}
|
||||
|
||||
// Enqueue the line for game-thread processing
|
||||
TSharedPtr<UWingServer::FPendingMessage> Msg = MakeShared<UWingServer::FPendingMessage>();
|
||||
Msg->Line = Line;
|
||||
TFuture<FString> Future = Msg->Response.GetFuture();
|
||||
|
||||
{
|
||||
FScopeLock Lock(&Server->Mutex);
|
||||
if (Server->bShuttingDown)
|
||||
{
|
||||
Client->bDone = true;
|
||||
return;
|
||||
}
|
||||
Server->PendingMessages.Add(Msg);
|
||||
}
|
||||
|
||||
// Block until the game thread processes this message
|
||||
FString Response = Future.Get();
|
||||
|
||||
// Write the response back, null-terminated (blocking)
|
||||
FTCHARToUTF8 Utf8(*Response);
|
||||
int32 BytesSent = 0;
|
||||
Socket->Send((const uint8*)Utf8.Get(), Utf8.Length() + 1, BytesSent);
|
||||
if (!SendAll(Socket, reinterpret_cast<const uint8*>(Utf8.Get()),
|
||||
Utf8.Length() + 1))
|
||||
{
|
||||
Client->bDone = true;
|
||||
return;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!ReceiveMoreBytesIntoBuffer(Socket, RecvBuf, RecvLen))
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Client->bDone = true;
|
||||
}
|
||||
|
||||
bool UWingServer::ExtractRequestFromBuffer(
|
||||
TArray<uint8>& RecvBuf, int32& RecvLen, FString& OutRequest)
|
||||
{
|
||||
const uint8* EndOfRequest = static_cast<const uint8*>(
|
||||
memchr(RecvBuf.GetData(), '\0', RecvLen));
|
||||
if (EndOfRequest == nullptr)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
const int32 MessageLen =
|
||||
static_cast<int32>(EndOfRequest - RecvBuf.GetData());
|
||||
OutRequest = FString::ConstructFromPtrSize(
|
||||
reinterpret_cast<const UTF8CHAR*>(RecvBuf.GetData()), MessageLen);
|
||||
const int32 RemainingBytes = RecvLen - (MessageLen + 1);
|
||||
if (RemainingBytes > 0)
|
||||
{
|
||||
FMemory::Memmove(
|
||||
RecvBuf.GetData(),
|
||||
RecvBuf.GetData() + MessageLen + 1,
|
||||
RemainingBytes);
|
||||
}
|
||||
RecvLen = RemainingBytes;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool UWingServer::ReceiveMoreBytesIntoBuffer(
|
||||
FSocket* Socket, TArray<uint8>& RecvBuf, int32& RecvLen)
|
||||
{
|
||||
constexpr int32 MaxRecvBufBytes = 1024 * 1024;
|
||||
constexpr int32 MinUnusedRecvSpace = 4096;
|
||||
|
||||
int32 UnusedSpace = RecvBuf.Num() - RecvLen;
|
||||
if (UnusedSpace < MinUnusedRecvSpace)
|
||||
{
|
||||
if (RecvBuf.Num() >= MaxRecvBufBytes)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
RecvBuf.SetNumUninitialized(RecvBuf.Num() * 2);
|
||||
UnusedSpace = RecvBuf.Num() - RecvLen;
|
||||
}
|
||||
|
||||
int32 BytesRead = 0;
|
||||
if (!Socket->Recv(RecvBuf.GetData() + RecvLen, UnusedSpace, BytesRead))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (BytesRead <= 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
RecvLen += BytesRead;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool UWingServer::SendAll(FSocket* Socket, const uint8* Data, int32 BytesToSend)
|
||||
{
|
||||
while (BytesToSend > 0)
|
||||
{
|
||||
int32 BytesSent = 0;
|
||||
if (!Socket->Send(Data, BytesToSend, BytesSent) || (BytesSent <= 0))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
Data += BytesSent;
|
||||
BytesToSend -= BytesSent;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool UWingServer::ProcessRequestOnGameThread(
|
||||
const FString& Request, FString& Response)
|
||||
{
|
||||
// Enqueue the message for game-thread processing.
|
||||
TSharedPtr<UWingServer::FPendingMessage> Msg =
|
||||
MakeShared<UWingServer::FPendingMessage>();
|
||||
Msg->Line = Request;
|
||||
TFuture<FString> Future = Msg->Response.GetFuture();
|
||||
|
||||
{
|
||||
FScopeLock Lock(&GWingServer->Mutex);
|
||||
if (GWingServer->bShuttingDown)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
GWingServer->PendingMessages.Add(Msg);
|
||||
}
|
||||
|
||||
// Block until the game thread processes this message.
|
||||
Response = Future.Get();
|
||||
return true;
|
||||
}
|
||||
|
||||
void UWingServer::WaitForAssetRegistry()
|
||||
{
|
||||
IAssetRegistry& AR =
|
||||
FModuleManager::LoadModuleChecked<FAssetRegistryModule>(
|
||||
"AssetRegistry").Get();
|
||||
while (AR.IsLoadingAssets()) FPlatformProcess::Sleep(0.25f);
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// BuildWingHandlerRegistry
|
||||
// ============================================================
|
||||
|
||||
Reference in New Issue
Block a user