WIP - Json/Parquet

+ Compute stats for each episode
+ Skeletal for json / parquet functions
This commit is contained in:
JB Briant
2025-05-07 15:43:18 +07:00
parent 58301dd039
commit e130c08975
9 changed files with 376 additions and 82 deletions

View File

@ -7,6 +7,11 @@
#include "LuckyDataTransferSubsystem.h"
#include "Components/TextRenderComponent.h"
#include "Engine/TextRenderActor.h"
#include "Misc/FileHelper.h"
#include "Misc/Paths.h"
#include "Dom/JsonObject.h"
#include "Serialization/JsonWriter.h"
#include "Serialization/JsonSerializer.h"
UEpisodeSubSystem::UEpisodeSubSystem()
{
@ -16,12 +21,15 @@ UEpisodeSubSystem::UEpisodeSubSystem()
void UEpisodeSubSystem::Initialize(FSubsystemCollectionBase& Collection)
{
Super::Initialize(Collection);
if (ULuckyDataTransferSubsystem* DataTransferSubSystem = GetWorld()->GetSubsystem<ULuckyDataTransferSubsystem>())
{
DataTransfer = DataTransferSubSystem;
}
}
void UEpisodeSubSystem::Deinitialize()
{
bTickEnabled = false;
FTSTicker::GetCoreTicker().RemoveTicker(TickHandle);
StopTicking();
Super::Deinitialize();
}
@ -45,16 +53,19 @@ void UEpisodeSubSystem::Tick(float DeltaTime)
{
const bool bIsEpisodeCompleted = CheckEpisodeCompletion();
// Write Image on the disk
if (DataTransfer) DataTransfer->WriteImageToDisk(CurrentRobot->PhysicsSceneProxy->GetMujocoData().time);
EpisodeFrames++;
if (bIsEpisodeCompleted && CapturedEpisodes <= EpisodesToCapture)
{
return StartEpisode();
EndEpisode();
StartEpisode();
}
else
{
EndTraining();
}
// Here shouldn't we rewrite the frames to know if the episode was a success or a failure?
// Maybe this should not be done in the tick but after episode completion
const auto Payload = CreatePayload();
SendEpisodeData(Payload);
}
}
@ -68,6 +79,12 @@ void UEpisodeSubSystem::StartTicking()
TickHandle = FTSTicker::GetCoreTicker().AddTicker(TickDelegate);
}
void UEpisodeSubSystem::StopTicking()
{
bTickEnabled = false;
FTSTicker::GetCoreTicker().RemoveTicker(TickHandle);
}
void UEpisodeSubSystem::UpdateDebugTextActor() const
{
if (!IsValid(DebugTextActor)) return;
@ -77,7 +94,7 @@ void UEpisodeSubSystem::UpdateDebugTextActor() const
TextRender->SetText(FText::FromString(Txt));
}
void UEpisodeSubSystem::StartNewEpisodesSeries(const int32 EpisodesCountIn, FString BaseImageDataPathIn)
void UEpisodeSubSystem::StartTraining(const int32 EpisodesCountIn, FString BaseImageDataPathIn, FString TaskDescriptionIn)
{
// Debug
const auto DebugTextActorPtr = UGameplayStatics::GetActorOfClass(this->GetWorld(), ATextRenderActor::StaticClass());
@ -97,10 +114,17 @@ void UEpisodeSubSystem::StartNewEpisodesSeries(const int32 EpisodesCountIn, FStr
// Data
ConfigureDataCapture();
BaseImageDataPath = BaseImageDataPathIn;
TaskDescription = TaskDescriptionIn;
StartTicking();
}
void UEpisodeSubSystem::EndTraining()
{
StopTicking();
// Create jsonl files
}
void UEpisodeSubSystem::StartEpisode()
{
// Robot should be in its ready state - overriden per PilotComponent
@ -134,6 +158,24 @@ void UEpisodeSubSystem::StartEpisode()
UpdateDebugTextActor();
}
void UEpisodeSubSystem::EndEpisode()
{
// Gather the robot data
const FTrainingEpisodeData TrainingEpisodeData = CurrentRobot->RobotPilotComponent->GetTrainingEpisodeData();
// Create episodes_stats.jsonl single line and append to EpisodeStatLines
CreateEpisodeStatJsonLine(TrainingEpisodeData);
// create a parquet file
CreateEpisodeParquetFile();
// Convert images into video
// TODO Find a good FFMPEG plugin - maybe the Unreal base one is good
// Reset values for the next episode
EpisodeFrames = 0;
}
bool UEpisodeSubSystem::CheckEpisodeCompletion()
{
const auto GeomTransform = CurrentRobot->PhysicsSceneProxy->GetGeometryTransform(EpisodeTargetObject->MainActorBody.GetName());
@ -201,64 +243,76 @@ void UEpisodeSubSystem::InitCameras()
void UEpisodeSubSystem::ConfigureDataCapture()
{
if (ULuckyDataTransferSubsystem* DataTransfer = GetWorld()->GetSubsystem<ULuckyDataTransferSubsystem>())
if (!DataTransfer) return;
DataTransfer->CreateCaptureSessionID();
InitCameras();
for (const auto& Cam : Cameras)
{
//Do this before your tick operation - shouldn't happen on tick
//Connect to websocket and create session id
DataTransfer->ConnectToWebsocket("ws://127.0.0.1:3000", "");
DataTransfer->CreateCaptureSessionID();
InitCameras();
for (const auto& Cam : Cameras)
{
DataTransfer->RegisterSensor(Cam.Get());
Cam->SensorInfo.bActive = true;
}
DataTransfer->RegisterSensor(Cam.Get());
Cam->SensorInfo.bActive = true;
}
}
FObservationPayload UEpisodeSubSystem::CreatePayload()
void UEpisodeSubSystem::CreateEpisodeStatJsonLine(const FTrainingEpisodeData& TrainingEpisodeData)
{
// CurrentRobot->Cameras
// CurrentRobot -> Tell JB what he should expose on the RobotPawn
// const auto TimeStamp = CurrentRobot->PhysicsSceneProxy->GetMujocoData().time;
// const auto So100PilotCmp = Cast<URobotPilotSO100Component>(CurrentRobot->RobotPilotComponent);
// const auto Joints = So100PilotCmp->GetCurrentControlsFromPhysicScene();
// EpisodeStatLines.
const TSharedPtr<FJsonObject> Root = MakeShared<FJsonObject>();
Root->SetNumberField("episode_index", CapturedEpisodes);
// Tick operation
// Create the payload
return FObservationPayload {
// timestamp goes here - FString,
// "observation", //just leave this because this is what ethan and anuj will expect
// enter a message here - FString,
// TMap of FString (Actuator name or index), and Float (value of actuator)
// Camera info struct goes here, don't worry about this for now, just use TArray<FObservationCameraObject>()
// What about episode success? -> can be stated after the result is known
// How to invalidate data
const TSharedPtr<FJsonObject> Stats = MakeShared<FJsonObject>();
Stats->SetObjectField("action", MakeShared<FJsonObject>(TrainingEpisodeData.ControlsStats));
Stats->SetObjectField("observation.state", MakeShared<FJsonObject>(TrainingEpisodeData.JointsStats));
// Anuj -> How many frames do we need to store in a single parquet chunk
// Exact data structure with correct data types
};
}
void UEpisodeSubSystem::SendEpisodeData(const FObservationPayload& Payload) const
{
// PayloadBuffer.Add(Payload)
// Every X frames -> Write parquet chunk
// TODO Once all json and parquet files are written on disk and the PR is merged into main and tested, we will do it
// TODO "observation.images.webcam"
// TODO "timestamp"
// TODO "frame_index"
// TODO "episode_index"
// TODO "index"
// TODO "task_index"
if (ULuckyDataTransferSubsystem* DataTransfer = GetWorld()->GetSubsystem<ULuckyDataTransferSubsystem>())
{
// Here generate the path for each image?
DataTransfer->WriteImageToDisk(CurrentRobot->PhysicsSceneProxy->GetMujocoData().time);
// Don't send data if socket is disconnected
if (!DataTransfer->Socket->IsConnected()) return;
// Send the Data
//Queue and convert the payload to json
DataTransfer->CreateJsonPayload_Observation(Payload);
//Send the payload over websocket
DataTransfer->SendMessage(DataTransfer->ObservationPayloadString);
}
// Append
Root->SetObjectField("stats", Stats);
// Serialize into FString
FString Output;
const TSharedRef<TJsonWriter<>> Writer = TJsonWriterFactory<>::Create(&Output);
FJsonSerializer::Serialize(Root.ToSharedRef(), Writer);
EpisodeStatLines.Add(Output);
}
void UEpisodeSubSystem::CreateEpisodeParquetFile()
{
// TODO Use Anuj plugin to create one parquet file per episode
}
void UEpisodeSubSystem::ConvertImagesToVideo()
{
// TODO Once every json and parquet tasks are done
}
void UEpisodeSubSystem::CreateEpisodesStatsJsonFile()
{
// TODO Do not use FJsonObject - simply concat the FStrings into a file
// Create a jsonl file and store in the correct directory
// concat TArray<FString> EpisodeStatLines into a single file
// https://huggingface.co/datasets/youliangtan/so100_strawberry_grape/blob/main/meta/episodes_stats.jsonl
}
void UEpisodeSubSystem::CreateEpisodesJsonFile()
{
// Create a jsonl file and store in the correct directory
// https://huggingface.co/datasets/youliangtan/so100_strawberry_grape/blob/main/meta/episodes.jsonl
}
void UEpisodeSubSystem::CreateInfoJsonFile()
{
// https://huggingface.co/datasets/youliangtan/so100_strawberry_grape/blob/main/meta/info.json
}
void UEpisodeSubSystem::CreateTasksJsonFile()
{
// https://huggingface.co/datasets/youliangtan/so100_strawberry_grape/blob/main/meta/tasks.jsonl
}