From d7aa008d7abd8eb6fabe1f73272f32fa3327f372 Mon Sep 17 00:00:00 2001
From: Ethan Clark <eclark715@gmail.com>
Date: Tue, 25 Mar 2025 16:44:47 -0700
Subject: [PATCH] add robot overlay frame

---
 open_phantom/main.py | 97 +++++++++++++++++++++++++++++---------------
 1 file changed, 65 insertions(+), 32 deletions(-)

diff --git a/open_phantom/main.py b/open_phantom/main.py
index a135a88..f65ca88 100644
--- a/open_phantom/main.py
+++ b/open_phantom/main.py
@@ -85,7 +85,7 @@ def record_video() -> str | None:
     return video_filename if did_record else None
 
 
-def process_video(video_path: str) -> tuple:
+def process_video(video_path: str, urdf_path: str) -> None:
     assert video_path, "Video path is required."
 
     cap = cv2.VideoCapture(video_path)
@@ -101,6 +101,7 @@ def process_video(video_path: str) -> tuple:
     camera_intrinsics = (width * 0.8, height * 0.8, width / 2, height / 2)
 
     processor = ProcessHand(camera_intrinsics)
+    robot_manager = RobotManager(urdf_path, camera_intrinsics)
 
     base_path = os.path.splitext(video_path)[0]
     segmented_output_path = f"{base_path}_masked.mp4"
@@ -108,22 +109,33 @@ def process_video(video_path: str) -> tuple:
     mesh_output_path = f"{base_path}_mesh.mp4"
     registration_output_path = f"{base_path}_registration.mp4"
     constraints_output_path = f"{base_path}_constraints.mp4"
-    robot_output_path = f"{base_path}_robot.mp4"
+    robot_params_output_path = f"{base_path}_robot_params.mp4"
+    inpainted_output_path = f"{base_path}_inpainted.mp4"
+    robot_overlay_output_path = f"{base_path}_robot_overlay.mp4"
 
     fourcc = cv2.VideoWriter_fourcc(*"mp4v")
     segmented_out = cv2.VideoWriter(segmented_output_path, fourcc, fps, (width, height))
     depth_out = cv2.VideoWriter(depth_output_path, fourcc, fps, (width, height))
     mesh_out = cv2.VideoWriter(mesh_output_path, fourcc, fps, (width, height))
+    cloud_out = cv2.VideoWriter(f"{base_path}_cloud.mp4", fourcc, fps, (width, height))
     reg_out = cv2.VideoWriter(registration_output_path, fourcc, fps, (width, height))
     constraints_out = cv2.VideoWriter(
         constraints_output_path, fourcc, fps, (width, height)
     )
-    robot_out = cv2.VideoWriter(robot_output_path, fourcc, fps, (width, height))
+    robot_params_out = cv2.VideoWriter(
+        robot_params_output_path, fourcc, fps, (width, height)
+    )
+    inpainted_out = cv2.VideoWriter(inpainted_output_path, fourcc, fps, (width, height))
+    robot_overlay_out = cv2.VideoWriter(
+        robot_overlay_output_path, fourcc, fps, (width, height)
+    )
 
     print(f"Processing video with {total_frames} frames...")
     for _ in tqdm(range(total_frames)):
         success, frame = cap.read()
-        assert success, "Failed to read frame from video."
+        if not success:
+            print("Failed to read frame from video.")
+            break
 
         # Convert image to RGB for MediaPipe
         image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
@@ -135,25 +147,31 @@ def process_video(video_path: str) -> tuple:
         segmented_frame = frame.copy()
         depth_frame = np.zeros((height, width, 3), dtype=np.uint8)
         mesh_frame = frame.copy()
+        cloud_frame = (
+            np.ones((480, 640, 3), dtype=np.uint8) * 255
+        )  # Fixed size, white background
         reg_frame = (
             np.ones((480, 640, 3), dtype=np.uint8) * 255
         )  # Fixed size, white background
         constraints_frame = frame.copy()
-        robot_frame = frame.copy()
+        robot_params_frame = frame.copy()
+        inpainted_frame = frame.copy()
+        robot_overlay_frame = frame.copy()
 
         # Process if hand is detected
         if results.multi_hand_landmarks:
             for hand_landmarks in results.multi_hand_landmarks:
-                segmented_mask = processor.create_mask(frame, hand_landmarks)
+                hand_mask = processor.create_mask(frame, hand_landmarks)
                 mask_overlay = frame.copy()
-                mask_overlay[segmented_mask > 0] = [0, 0, 255]  # Red color for mask
+                mask_overlay[hand_mask > 0] = [0, 0, 255]  # Red color for mask
                 segmented_frame = cv2.addWeighted(frame, 0.7, mask_overlay, 0.3, 0)
 
                 depth_map, depth_vis = processor.estimate_depth(frame)
                 depth_colored = cv2.applyColorMap(depth_vis, cv2.COLORMAP_JET)
                 depth_frame = depth_colored.copy()
 
-                cloud = processor.create_cloud(depth_map, segmented_mask)
+                cloud = processor.create_cloud(depth_map, hand_mask)
+                cloud_frame = visualize_cloud(cloud, (width, height))
 
                 hand_vertices, hand_faces = processor.create_mesh(
                     hand_landmarks, (width, height)
@@ -180,7 +198,7 @@ def process_video(video_path: str) -> tuple:
                 position, orientation, gripper_width = processor.get_robot_params(
                     constrained_landmarks
                 )
-                robot_frame = visualize_robot_params(
+                robot_params_frame = visualize_robot_params(
                     frame,
                     position,
                     orientation,
@@ -188,12 +206,20 @@ def process_video(video_path: str) -> tuple:
                     camera_intrinsics,
                 )
 
+                # inpainted_frame = processor.inpaint_hand(frame, hand_mask)
+
+                robot_manager.set_robot_pose(position, orientation, gripper_width)
+                robot_overlay_frame = robot_manager.render_robot(frame, depth_map)
+
         segmented_out.write(segmented_frame)
         depth_out.write(depth_frame)
         mesh_out.write(mesh_frame)
+        cloud_out.write(cloud_frame)
         reg_out.write(reg_frame)
         constraints_out.write(constraints_frame)
-        robot_out.write(robot_frame)
+        robot_params_out.write(robot_params_frame)
+        # inpainted_out.write(inpainted_frame)
+        robot_overlay_out.write(robot_overlay_frame)
 
         display_scale = 0.5
         display_size = (int(width * display_scale), int(height * display_scale))
@@ -202,9 +228,12 @@ def process_video(video_path: str) -> tuple:
         cv2.imshow("Segmented", cv2.resize(segmented_frame, display_size))
         cv2.imshow("Depth", cv2.resize(depth_frame, display_size))
         cv2.imshow("Mesh", cv2.resize(mesh_frame, display_size))
+        cv2.imshow("Cloud", cv2.resize(cloud_frame, display_size))
         cv2.imshow("Registration", cv2.resize(reg_frame, reg_display_size))
         cv2.imshow("Constraints", cv2.resize(constraints_frame, display_size))
-        cv2.imshow("Robot Parameters", cv2.resize(robot_frame, display_size))
+        cv2.imshow("Robot Parameters", cv2.resize(robot_params_frame, display_size))
+        # cv2.imshow("Inpainted", cv2.resize(inpainted_frame, display_size))
+        cv2.imshow("Robot Overlay", cv2.resize(robot_overlay_frame, display_size))
 
         if cv2.waitKey(1) & 0xFF == 27:  # ESC to exit
             break
@@ -213,30 +242,24 @@ def process_video(video_path: str) -> tuple:
     segmented_out.release()
     depth_out.release()
     mesh_out.release()
+    cloud_out.release()
     reg_out.release()
     constraints_out.release()
-    robot_out.release()
+    robot_params_out.release()
+    inpainted_out.release()
+    robot_overlay_out.release()
     cv2.destroyAllWindows()
 
-    print(f"Processing complete. Results saved to:")
-    print(f"- Hand mask: {segmented_output_path}")
-    print(f"- Depth visualization: {depth_output_path}")
-    print(f"- Mesh visualization: {mesh_output_path}")
-    print(f"- Registration visualization: {registration_output_path}")
-    print(f"- Constraints visualization: {constraints_output_path}")
-    print(f"- Robot parameters: {robot_output_path}")
 
-    return {
-        "segmented": segmented_output_path,
-        "depth": depth_output_path,
-        "mesh": mesh_output_path,
-        "registration": registration_output_path,
-        "constraints": constraints_output_path,
-        "robot": robot_output_path,
-    }
+def main(debug: bool = True) -> None:
+    if debug:
+        video_path = "recordings/recorded_video_20250325-141723.mp4"
+        urdf_path = (
+            "urdf/SO_5DOF_ARM100_05d.SLDASM/urdf/SO_5DOF_ARM100_05d.SLDASM_fixed.urdf"
+        )
+        process_video(video_path, urdf_path)
+        return
 
-
-def main():
     record_option = input("Record a new video? (y/n): ")
 
     if record_option.lower() == "y":
@@ -248,16 +271,26 @@ def main():
 
         print(f"Video recorded successfully to {video_path}")
         process_option = input("Process the video now? (y/n): ")
-        if process_option.lower() == "n":
+        if process_option.lower() == "y":
+            urdf_path = input("Enter the path to a robot URDF file: ")
+            while not os.path.exists(urdf_path):
+                print("Error: URDF file not found.")
+                urdf_path = input("Enter the path to a robot URDF file: ")
+        else:
             print("Video processing skipped.")
             return
     else:
         video_path = input("Enter the path to a video file: ")
         while not os.path.exists(video_path):
             print("Error: Video file not found.")
-            video_path = input("Enter the path to the video file: ")
+            video_path = input("Enter the path to a video file: ")
 
-    process_video(video_path)
+        urdf_path = input("Enter the path to a robot URDF file: ")
+        while not os.path.exists(urdf_path):
+            print("Error: URDF file not found.")
+            urdf_path = input("Enter the path to a robot URDF file: ")
+
+    process_video(video_path, urdf_path)
 
 
 if __name__ == "__main__":