You've already forked LuckyWorld
287 lines
7.8 KiB
C++
287 lines
7.8 KiB
C++
// Fill out your copyright notice in the Description page of Project Settings.
|
|
|
|
|
|
#include "LuckyDataTransferSubsystem.h"
|
|
|
|
#include "AutomationBlueprintFunctionLibrary.h"
|
|
#include "ImageUtils.h"
|
|
#include "RenderingThread.h"
|
|
#include "RenderUtils.h"
|
|
#include "RenderGraphUtils.h"
|
|
#include "RHI.h"
|
|
#include "RHICommandList.h"
|
|
#include "ImageWriteQueue.h"
|
|
#include "ImageWriteTask.h"
|
|
#include "ImagePixelData.h"
|
|
#include "JsonUtilities.h"
|
|
#include "JsonObjectConverter.h"
|
|
#include "ReviewComments.h"
|
|
#include "WebSocketsModule.h"
|
|
#include "Kismet/KismetStringLibrary.h"
|
|
#include "Camera/CameraActor.h"
|
|
#include "Camera/CameraComponent.h"
|
|
#include "Components/SceneCaptureComponent2D.h"
|
|
#include "Kismet/GameplayStatics.h"
|
|
#include "Kismet/KismetMathLibrary.h"
|
|
#include "Kismet/KismetRenderingLibrary.h"
|
|
#include "Slate/SceneViewport.h"
|
|
#include "Virtualization/VirtualizationTypes.h"
|
|
|
|
ULuckyDataTransferSubsystem::ULuckyDataTransferSubsystem()
|
|
{
|
|
|
|
}
|
|
|
|
void ULuckyDataTransferSubsystem::Initialize(FSubsystemCollectionBase& Collection)
|
|
{
|
|
Super::Initialize(Collection);
|
|
|
|
if (UGameInstance* GI = GetWorld()->GetGameInstance())
|
|
{
|
|
|
|
}
|
|
|
|
}
|
|
|
|
void ULuckyDataTransferSubsystem::Deinitialize()
|
|
{
|
|
Super::Deinitialize();
|
|
|
|
if (Socket.IsValid() && Socket->IsConnected())
|
|
{
|
|
Socket->Close();
|
|
}
|
|
}
|
|
|
|
void ULuckyDataTransferSubsystem::Internal_OpenWebsocket(const FString& URL, const FString& Protocol)
|
|
{
|
|
const FString NewUrl = URL.IsEmpty() ? TEXT("ws://127.0.0.1:3000/ws") : URL;
|
|
const FString NewProtocol = Protocol.IsEmpty() ? TEXT("ws") : Protocol;
|
|
|
|
UE_LOG(LogTemp, Warning, TEXT("Opening WebSocket URL: %s"), *NewUrl);
|
|
|
|
if (!FModuleManager::Get().IsModuleLoaded("WebSockets"))
|
|
{
|
|
FModuleManager::Get().LoadModule("WebSockets");
|
|
}
|
|
|
|
Socket = FWebSocketsModule::Get().CreateWebSocket(NewUrl);
|
|
Socket->Connect();
|
|
|
|
//Set up callbacks
|
|
Socket->OnConnectionError().AddUObject(this, &ULuckyDataTransferSubsystem::Callback_OnConnectionError);
|
|
Socket->OnConnected().AddUObject(this, &ULuckyDataTransferSubsystem::Callback_OnConnected);
|
|
Socket->OnMessage().AddUObject(this, &ULuckyDataTransferSubsystem::Callback_OnMessage);
|
|
}
|
|
|
|
//Callbacks
|
|
void ULuckyDataTransferSubsystem::Callback_OnConnected()
|
|
{
|
|
if (OnSocketReady.IsBound())
|
|
{
|
|
OnSocketReady.Broadcast(true);
|
|
UE_LOG(LogTemp, VeryVerbose, TEXT("WebSocket connected successfully"));
|
|
}
|
|
}
|
|
|
|
void ULuckyDataTransferSubsystem::Callback_OnConnectionError(const FString& Error)
|
|
{
|
|
UE_LOG(LogTemp, VeryVerbose, TEXT("Websocket connection error: %s"), *Error)
|
|
}
|
|
|
|
void ULuckyDataTransferSubsystem::Callback_OnMessage(const FString& Message)
|
|
{
|
|
if (!Message.IsEmpty())
|
|
{
|
|
CommandReady(InterpretData(Message));
|
|
return;
|
|
}
|
|
|
|
UE_LOG(LogTemp, Warning, TEXT("The message received from the websocket is invalid"));
|
|
}
|
|
|
|
void ULuckyDataTransferSubsystem::Internal_OnMessageSent(const FString& Message)
|
|
{
|
|
}
|
|
|
|
void ULuckyDataTransferSubsystem::Callback_OnConnectionClosed()
|
|
{
|
|
}
|
|
|
|
//Blueprint Exposed Implementation
|
|
void ULuckyDataTransferSubsystem::ConnectToWebsocket(const FString& URL, const FString& Protocol)
|
|
{
|
|
Internal_OpenWebsocket(URL, Protocol);
|
|
}
|
|
|
|
void ULuckyDataTransferSubsystem::SendMessage(const FString& Message)
|
|
{
|
|
if (Socket.IsValid() && Socket->IsConnected())
|
|
{
|
|
Socket->Send(Message);
|
|
return;
|
|
}
|
|
|
|
UE_LOG(LogTemp, Warning, TEXT("WebSocket outgoing message failed"));
|
|
}
|
|
|
|
FPayload ULuckyDataTransferSubsystem::InterpretData(const FString& Message)
|
|
{
|
|
FPayload Payload = FPayload();
|
|
|
|
if (!Message.IsEmpty())
|
|
{
|
|
TSharedPtr<FJsonObject> JsonObj;
|
|
TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(Message);
|
|
|
|
if (FJsonSerializer::Deserialize(Reader, JsonObj) && JsonObj.IsValid())
|
|
{
|
|
for (auto& Elem : JsonObj->Values)
|
|
{
|
|
FCommand Command = FCommand();
|
|
Command.Key = FString(Elem.Key);
|
|
Command.Value = Elem.Value->AsNumber();
|
|
|
|
Payload.Commands.Add(Command);
|
|
}
|
|
}
|
|
}
|
|
|
|
return Payload;
|
|
}
|
|
|
|
void IncomingMessage(const FString& Message)
|
|
{
|
|
const FString outMessage = Message.IsEmpty() ? TEXT("no valid message") : Message;
|
|
UE_LOG(LogTemp, Warning, TEXT("Incoming message: %s"), *outMessage);
|
|
}
|
|
|
|
void ULuckyDataTransferSubsystem::CommandReady(const FPayload& Payload)
|
|
{
|
|
if (OnCommandReady.IsBound())
|
|
{
|
|
OnCommandReady.Broadcast(Payload);
|
|
}
|
|
}
|
|
|
|
//Blueprint Callable function - Use CreateJsonPayload_Observation in C++
|
|
bool ULuckyDataTransferSubsystem::MakeObservationPayload(const FObservationPayload& Data)
|
|
{
|
|
return CreateJsonPayload_Observation(Data);
|
|
}
|
|
|
|
FString ULuckyDataTransferSubsystem::CreateCaptureSessionID()
|
|
{
|
|
SessionID = FGuid::NewGuid().ToString().Left(5);
|
|
return SessionID;
|
|
}
|
|
|
|
bool ULuckyDataTransferSubsystem::CreateJsonPayload_Observation(const FObservationPayload& Data)
|
|
{
|
|
bool bSuccess = false;
|
|
ObservationPayloadString = FString();
|
|
|
|
if (!Data.ObservationState.IsEmpty())
|
|
{
|
|
FJsonObjectConverter::UStructToJsonObjectString(Data, ObservationPayloadString);
|
|
UE_LOG(LogTemp, Warning, TEXT("Payload observation: %s"), *ObservationPayloadString);
|
|
bSuccess = true;
|
|
}
|
|
|
|
return bSuccess;
|
|
}
|
|
|
|
void ULuckyDataTransferSubsystem::RegisterSensor(ALuckySensorPawnBase* Sensor)
|
|
{
|
|
if (IsValid(Sensor))
|
|
{
|
|
SensorPawns.Add(Sensor);
|
|
}
|
|
}
|
|
|
|
bool ULuckyDataTransferSubsystem::WriteImageToDisk(const FString& inPath, const double Timestamp, const FString& Comment)
|
|
{
|
|
if (SessionID.IsEmpty())
|
|
{
|
|
UE_LOG(LogTemp, Warning, TEXT("The Capture Session ID is empty"));
|
|
return false;
|
|
}
|
|
|
|
FString Path = inPath.IsEmpty() ? TEXT("C:/LuckyRobotsImages") : inPath;
|
|
|
|
if (!SensorPawns.IsEmpty())
|
|
{
|
|
for (const ALuckySensorPawnBase* Sensor : SensorPawns)
|
|
{
|
|
if (IsValid(Sensor) && Sensor->SensorInfo.bActive && Sensor->GetCamera() && Sensor->GetCaptureComponent())
|
|
{
|
|
FString Robot = TEXT("Robot_Name");
|
|
|
|
FString Episode = SessionID;
|
|
|
|
ENQUEUE_RENDER_COMMAND(ReadPixelsAsync)([Sensor, Path, Timestamp, Comment, Episode](FRHICommandListImmediate& RHICmdList)
|
|
{
|
|
FTextureResource* Resource = Sensor->RenderTarget->GetResource();
|
|
FRHITexture* ResourceRHI = Resource->GetTexture2DRHI();
|
|
|
|
TArray<FColor> OutPixels;
|
|
|
|
if (ensure(ResourceRHI))
|
|
{
|
|
OutPixels.SetNum(Resource->GetSizeX() * Resource->GetSizeY());
|
|
RHICmdList.ReadSurfaceData(
|
|
ResourceRHI,
|
|
FIntRect(0, 0, Resource->GetSizeX(), Resource->GetSizeY()),
|
|
OutPixels,
|
|
FReadSurfaceDataFlags()
|
|
);
|
|
|
|
UE_LOG(LogTemp, Warning, TEXT("Logged pixels: %d"), OutPixels.Num())
|
|
|
|
FImageWriteTask* ImageTask = new FImageWriteTask();
|
|
|
|
if (Path.Right(1) == "/")
|
|
{
|
|
//Path = Path.Left(Path.Len() - 1);
|
|
}
|
|
|
|
FString Robot = TEXT("Robot_Name");
|
|
|
|
const FString Filename = FString::Printf(
|
|
TEXT("%s/%s/%s/%s/%s_%s_%s_%d"),
|
|
*Path,
|
|
*Robot,
|
|
*Episode,
|
|
*Sensor->SensorInfo.Name,
|
|
*Robot,
|
|
*Sensor->SensorInfo.Name,
|
|
*Comment,
|
|
FMath::Floor<int32>(Timestamp * 1000)
|
|
);
|
|
|
|
//UE_LOG(LogTemp, Warning, TEXT("Evan requested a longer string describing the inner workings of the following string which describes in great detail the file path for the image you've just written to disk. It is: %s"), *Filename);
|
|
|
|
ImageTask->Format = EImageFormat::PNG;
|
|
ImageTask->Filename = Filename;
|
|
ImageTask->PixelData = MakeUnique<TImagePixelData<FColor>>(FIntPoint(Sensor->RenderTarget->GetSurfaceWidth(), Sensor->RenderTarget->GetSurfaceHeight()), TArray64<FColor>(OutPixels));
|
|
ImageTask->bOverwriteFile = true;
|
|
ImageTask->CompressionQuality = (int32)EImageCompressionQuality::Default;
|
|
|
|
//Add to write queue (async)
|
|
FModuleManager::LoadModuleChecked<IImageWriteQueueModule>("ImageWriteQueue").GetWriteQueue().Enqueue(TUniquePtr<IImageWriteTaskBase>(ImageTask));
|
|
|
|
ImageTask->OnCompleted = [](bool bSuccess) {
|
|
UE_LOG(LogTemp, Warning, TEXT("Image write completed: %s"), bSuccess ? TEXT("Success") : TEXT("Failed"));
|
|
};
|
|
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
return false;
|
|
} |