diff --git a/open_phantom/main.py b/open_phantom/main.py index 211dfe9..a135a88 100644 --- a/open_phantom/main.py +++ b/open_phantom/main.py @@ -1,19 +1,247 @@ import os +import cv2 +import time +import numpy as np +from tqdm import tqdm +from utils.visualizations import * from process_hand import ProcessHand from robot_manager import RobotManager -# TODO: Move record_video and process_video here + +def record_video() -> str | None: + output_dir = os.path.join(os.getcwd(), "recordings") + os.makedirs(output_dir, exist_ok=True) + + timestamp = time.strftime("%Y%m%d-%H%M%S") + video_filename = os.path.join(output_dir, f"recorded_video_{timestamp}.mp4") + + cap = cv2.VideoCapture(0) + + assert cap.isOpened(), "Failed to open camera." + + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fps = int(cap.get(cv2.CAP_PROP_FPS)) + + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + out = None + + recording = False + did_record = False + + print("Camera is active. Press 'r' to start/stop recording, 'q' to quit.") + + while cap.isOpened(): + success, frame = cap.read() + assert success, "Failed to read from camera." + + # Mirror the image for more intuitive viewing + frame = cv2.flip(frame, 1) + + # Create a separate display frame for showing the recording indicator + display_frame = frame.copy() + + # Display recording indicator ONLY on the display frame + if recording: + cv2.circle(display_frame, (30, 30), 15, (0, 0, 255), -1) + cv2.putText( + display_frame, + "RECORDING", + (50, 40), + cv2.FONT_HERSHEY_SIMPLEX, + 1, + (0, 0, 255), + 2, + cv2.LINE_AA, + ) + + # Show the display frame (with indicator if recording) + cv2.imshow("Video Recording", display_frame) + + # Write the original frame (without indicator) to video file if recording + if recording and out is not None: + out.write(frame) + + key = cv2.waitKey(1) & 0xFF + + if key == ord("r"): + recording = not recording + if recording: + print(f"Started recording to {video_filename}") + out = cv2.VideoWriter(video_filename, fourcc, fps, (width, height)) + else: + if out is not None: + out.release() + print(f"Stopped recording. Video saved to {video_filename}") + did_record = True + break + + if out is not None: + out.release() + cap.release() + cv2.destroyAllWindows() + + return video_filename if did_record else None -def main() -> None: - processor = ProcessHand() +def process_video(video_path: str) -> tuple: + assert video_path, "Video path is required." + cap = cv2.VideoCapture(video_path) + + assert cap.isOpened(), f"Failed to open video file {video_path}" + + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fps = int(cap.get(cv2.CAP_PROP_FPS)) + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + + # Update camera intrinsics based on video dimensions + camera_intrinsics = (width * 0.8, height * 0.8, width / 2, height / 2) + + processor = ProcessHand(camera_intrinsics) + + base_path = os.path.splitext(video_path)[0] + segmented_output_path = f"{base_path}_masked.mp4" + depth_output_path = f"{base_path}_depth.mp4" + mesh_output_path = f"{base_path}_mesh.mp4" + registration_output_path = f"{base_path}_registration.mp4" + constraints_output_path = f"{base_path}_constraints.mp4" + robot_output_path = f"{base_path}_robot.mp4" + + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + segmented_out = cv2.VideoWriter(segmented_output_path, fourcc, fps, (width, height)) + depth_out = cv2.VideoWriter(depth_output_path, fourcc, fps, (width, height)) + mesh_out = cv2.VideoWriter(mesh_output_path, fourcc, fps, (width, height)) + reg_out = cv2.VideoWriter(registration_output_path, fourcc, fps, (width, height)) + constraints_out = cv2.VideoWriter( + constraints_output_path, fourcc, fps, (width, height) + ) + robot_out = cv2.VideoWriter(robot_output_path, fourcc, fps, (width, height)) + + print(f"Processing video with {total_frames} frames...") + for _ in tqdm(range(total_frames)): + success, frame = cap.read() + assert success, "Failed to read frame from video." + + # Convert image to RGB for MediaPipe + image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + + # Process with MediaPipe Hands + results = processor.hands.process(image_rgb) + + # Initialize output frames + segmented_frame = frame.copy() + depth_frame = np.zeros((height, width, 3), dtype=np.uint8) + mesh_frame = frame.copy() + reg_frame = ( + np.ones((480, 640, 3), dtype=np.uint8) * 255 + ) # Fixed size, white background + constraints_frame = frame.copy() + robot_frame = frame.copy() + + # Process if hand is detected + if results.multi_hand_landmarks: + for hand_landmarks in results.multi_hand_landmarks: + segmented_mask = processor.create_mask(frame, hand_landmarks) + mask_overlay = frame.copy() + mask_overlay[segmented_mask > 0] = [0, 0, 255] # Red color for mask + segmented_frame = cv2.addWeighted(frame, 0.7, mask_overlay, 0.3, 0) + + depth_map, depth_vis = processor.estimate_depth(frame) + depth_colored = cv2.applyColorMap(depth_vis, cv2.COLORMAP_JET) + depth_frame = depth_colored.copy() + + cloud = processor.create_cloud(depth_map, segmented_mask) + + hand_vertices, hand_faces = processor.create_mesh( + hand_landmarks, (width, height) + ) + mesh_frame = visualize_mesh(frame, hand_vertices, hand_faces) + + icp_transform = processor.icp_registration(cloud, hand_vertices) + reg_frame = visualize_registration(cloud, hand_vertices, icp_transform) + + refined_landmarks = processor.refine_landmarks( + hand_landmarks, icp_transform, (width, height) + ) + + original_refined = refined_landmarks.copy() + + constrained_landmarks = processor.apply_constraints(refined_landmarks) + constraints_frame = visualize_constraints( + frame, + original_refined, + constrained_landmarks, + camera_intrinsics, + ) + + position, orientation, gripper_width = processor.get_robot_params( + constrained_landmarks + ) + robot_frame = visualize_robot_params( + frame, + position, + orientation, + gripper_width, + camera_intrinsics, + ) + + segmented_out.write(segmented_frame) + depth_out.write(depth_frame) + mesh_out.write(mesh_frame) + reg_out.write(reg_frame) + constraints_out.write(constraints_frame) + robot_out.write(robot_frame) + + display_scale = 0.5 + display_size = (int(width * display_scale), int(height * display_scale)) + reg_display_size = (int(640 * display_scale), int(480 * display_scale)) + + cv2.imshow("Segmented", cv2.resize(segmented_frame, display_size)) + cv2.imshow("Depth", cv2.resize(depth_frame, display_size)) + cv2.imshow("Mesh", cv2.resize(mesh_frame, display_size)) + cv2.imshow("Registration", cv2.resize(reg_frame, reg_display_size)) + cv2.imshow("Constraints", cv2.resize(constraints_frame, display_size)) + cv2.imshow("Robot Parameters", cv2.resize(robot_frame, display_size)) + + if cv2.waitKey(1) & 0xFF == 27: # ESC to exit + break + + cap.release() + segmented_out.release() + depth_out.release() + mesh_out.release() + reg_out.release() + constraints_out.release() + robot_out.release() + cv2.destroyAllWindows() + + print(f"Processing complete. Results saved to:") + print(f"- Hand mask: {segmented_output_path}") + print(f"- Depth visualization: {depth_output_path}") + print(f"- Mesh visualization: {mesh_output_path}") + print(f"- Registration visualization: {registration_output_path}") + print(f"- Constraints visualization: {constraints_output_path}") + print(f"- Robot parameters: {robot_output_path}") + + return { + "segmented": segmented_output_path, + "depth": depth_output_path, + "mesh": mesh_output_path, + "registration": registration_output_path, + "constraints": constraints_output_path, + "robot": robot_output_path, + } + + +def main(): record_option = input("Record a new video? (y/n): ") if record_option.lower() == "y": print("Starting video recording...") - video_path = processor.record_video() + video_path = record_video() if not video_path: print("Video recording failed.") return @@ -29,7 +257,7 @@ def main() -> None: print("Error: Video file not found.") video_path = input("Enter the path to the video file: ") - processor.process_video(video_path) + process_video(video_path) if __name__ == "__main__": diff --git a/open_phantom/process_hand.py b/open_phantom/process_hand.py index 9aa577b..2c659f8 100644 --- a/open_phantom/process_hand.py +++ b/open_phantom/process_hand.py @@ -1,14 +1,10 @@ -import os -import time import cv2 import torch import numpy as np import open3d as o3d import mediapipe as mp -from tqdm import tqdm from collections import deque -from utils.visualizations import * from sam2.sam2_image_predictor import SAM2ImagePredictor @@ -20,7 +16,9 @@ MAXIMUM_HAND_WIDTH_MM = 100.0 class ProcessHand: - def __init__(self) -> None: + def __init__(self, camera_intrinsics: tuple) -> None: + self.camera_intrinsics = camera_intrinsics + self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Initialize MediaPipe Hands for hand detection @@ -57,7 +55,7 @@ class ProcessHand: Create a segmentation mask over the hand using SAM2 model """ - def _create_mask(self, frame: np.ndarray, landmarks: list) -> np.ndarray: + def create_mask(self, frame: np.ndarray, landmarks: list) -> np.ndarray: height, width = frame.shape[:2] # Convert image to RGB for SAM2 model frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) @@ -91,7 +89,7 @@ class ProcessHand: Estimate depth map using MiDaS model """ - def _estimate_depth(self, image: np.ndarray) -> tuple: + def estimate_depth(self, image: np.ndarray) -> tuple: img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Transform img for MiDaS model @@ -121,7 +119,7 @@ class ProcessHand: by back-projecting to 3D using camera intrinsics and depth values """ - def _create_cloud( + def create_cloud( self, depth_map: np.ndarray, segmented_mask: np.ndarray ) -> np.ndarray: focal_x, focal_y, center_x, center_y = self.camera_intrinsics @@ -149,7 +147,7 @@ class ProcessHand: Create hand mesh from hand landmarks """ - def _create_mesh(self, landmarks: list, image_dims: tuple) -> np.ndarray: + def create_mesh(self, landmarks: list, image_dims: tuple) -> np.ndarray: width, height = image_dims # Extract just z values to understand their range @@ -209,7 +207,7 @@ class ProcessHand: Align hand mesh to point cloud using ICP for accurate 3D reconstruction """ - def _icp_registration(self, cloud: np.ndarray, vertices: np.ndarray) -> np.ndarray: + def icp_registration(self, cloud: np.ndarray, vertices: np.ndarray) -> np.ndarray: source = o3d.geometry.PointCloud() source.points = o3d.utility.Vector3dVector(vertices) @@ -246,7 +244,7 @@ class ProcessHand: Refine landmarks based on the icp transformation """ - def _refine_landmarks( + def refine_landmarks( self, landmarks: list, transform: int, image_dims: tuple ) -> list: width, height = image_dims @@ -271,7 +269,7 @@ class ProcessHand: return refined_landmarks # TODO: Implement better constraints that limit last joint of each finger to a single DOF - def _apply_constraints(self, landmarks: list): + def apply_constraints(self, landmarks: list): constrained = np.array(landmarks) # Define finger joint indices @@ -313,7 +311,7 @@ class ProcessHand: 3. Gripper Width: Distance between thumb tip and index tip """ - def _get_robot_params(self, refined_landmarks: list) -> tuple: + def get_robot_params(self, refined_landmarks: list) -> tuple: landmarks = np.array(refined_landmarks) # Define indices for specific parts of the hand @@ -388,235 +386,3 @@ class ProcessHand: # rotation_matrix = rotation_in_robot_frame return position, rotation_matrix, gripper_width - - def record_video(self) -> str: - output_dir = os.path.join(os.getcwd(), "recordings") - os.makedirs(output_dir, exist_ok=True) - - timestamp = time.strftime("%Y%m%d-%H%M%S") - video_filename = os.path.join(output_dir, f"recorded_video_{timestamp}.mp4") - - cap = cv2.VideoCapture(0) - - assert cap.isOpened(), "Failed to open camera." - - width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - fps = int(cap.get(cv2.CAP_PROP_FPS)) - - # Update camera intrinsics based on video dimensions - self.camera_intrinsics = (width * 0.8, height * 0.8, width / 2, height / 2) - - fourcc = cv2.VideoWriter_fourcc(*"mp4v") - out = None - - recording = False - did_record = False - - print("Camera is active. Press 'r' to start/stop recording, 'q' to quit.") - - while cap.isOpened(): - success, frame = cap.read() - assert success, "Failed to read from camera." - - # Mirror the image for more intuitive viewing - frame = cv2.flip(frame, 1) - - # Create a separate display frame for showing the recording indicator - display_frame = frame.copy() - - # Display recording indicator ONLY on the display frame - if recording: - cv2.circle(display_frame, (30, 30), 15, (0, 0, 255), -1) - cv2.putText( - display_frame, - "RECORDING", - (50, 40), - cv2.FONT_HERSHEY_SIMPLEX, - 1, - (0, 0, 255), - 2, - cv2.LINE_AA, - ) - - # Show the display frame (with indicator if recording) - cv2.imshow("Video Recording", display_frame) - - # Write the original frame (without indicator) to video file if recording - if recording and out is not None: - out.write(frame) - - key = cv2.waitKey(1) & 0xFF - - if key == ord("r"): - recording = not recording - if recording: - print(f"Started recording to {video_filename}") - out = cv2.VideoWriter(video_filename, fourcc, fps, (width, height)) - else: - if out is not None: - out.release() - print(f"Stopped recording. Video saved to {video_filename}") - did_record = True - break - - if out is not None: - out.release() - cap.release() - cv2.destroyAllWindows() - - return video_filename if did_record else None - - def process_video(self, video_path: str) -> tuple: - assert video_path, "Video path is required." - - cap = cv2.VideoCapture(video_path) - - assert cap.isOpened(), f"Failed to open video file {video_path}" - - width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - fps = int(cap.get(cv2.CAP_PROP_FPS)) - total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) - - # Update camera intrinsics based on video dimensions - self.camera_intrinsics = (width * 0.8, height * 0.8, width / 2, height / 2) - - base_path = os.path.splitext(video_path)[0] - segmented_output_path = f"{base_path}_masked.mp4" - depth_output_path = f"{base_path}_depth.mp4" - mesh_output_path = f"{base_path}_mesh.mp4" - registration_output_path = f"{base_path}_registration.mp4" - constraints_output_path = f"{base_path}_constraints.mp4" - robot_output_path = f"{base_path}_robot.mp4" - - fourcc = cv2.VideoWriter_fourcc(*"mp4v") - segmented_out = cv2.VideoWriter( - segmented_output_path, fourcc, fps, (width, height) - ) - depth_out = cv2.VideoWriter(depth_output_path, fourcc, fps, (width, height)) - mesh_out = cv2.VideoWriter(mesh_output_path, fourcc, fps, (width, height)) - reg_out = cv2.VideoWriter( - registration_output_path, fourcc, fps, (640, 480) - ) # Fixed size - constraints_out = cv2.VideoWriter( - constraints_output_path, fourcc, fps, (width, height) - ) - robot_out = cv2.VideoWriter(robot_output_path, fourcc, fps, (width, height)) - - print(f"Processing video with {total_frames} frames...") - for _ in tqdm(range(total_frames)): - success, frame = cap.read() - assert success, "Failed to read frame from video." - - # Convert image to RGB for MediaPipe - image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - - # Process with MediaPipe Hands - results = self.hands.process(image_rgb) - - # Initialize output frames - segmented_frame = frame.copy() - depth_frame = np.zeros((height, width, 3), dtype=np.uint8) - mesh_frame = frame.copy() - reg_frame = ( - np.ones((480, 640, 3), dtype=np.uint8) * 255 - ) # Fixed size, white background - constraints_frame = frame.copy() - robot_frame = frame.copy() - - # Process if hand is detected - if results.multi_hand_landmarks: - for hand_landmarks in results.multi_hand_landmarks: - segmented_mask = self._create_mask(frame, hand_landmarks) - mask_overlay = frame.copy() - mask_overlay[segmented_mask > 0] = [0, 0, 255] # Red color for mask - segmented_frame = cv2.addWeighted(frame, 0.7, mask_overlay, 0.3, 0) - - depth_map, depth_vis = self._estimate_depth(frame) - depth_colored = cv2.applyColorMap(depth_vis, cv2.COLORMAP_JET) - depth_frame = depth_colored.copy() - - cloud = self._create_cloud(depth_map, segmented_mask) - - hand_vertices, hand_faces = self._create_mesh( - hand_landmarks, (width, height) - ) - mesh_frame = visualize_mesh(frame, hand_vertices, hand_faces) - - icp_transform = self._icp_registration(cloud, hand_vertices) - reg_frame = visualize_registration( - cloud, hand_vertices, icp_transform - ) - - refined_landmarks = self._refine_landmarks( - hand_landmarks, icp_transform, (width, height) - ) - - original_refined = refined_landmarks.copy() - - constrained_landmarks = self._apply_constraints(refined_landmarks) - constraints_frame = visualize_constraints( - frame, - original_refined, - constrained_landmarks, - self.camera_intrinsics, - ) - - position, orientation, gripper_width = self._get_robot_params( - constrained_landmarks - ) - robot_frame = visualize_robot_params( - frame, - position, - orientation, - gripper_width, - self.camera_intrinsics, - ) - - segmented_out.write(segmented_frame) - depth_out.write(depth_frame) - mesh_out.write(mesh_frame) - reg_out.write(reg_frame) - constraints_out.write(constraints_frame) - robot_out.write(robot_frame) - - display_scale = 0.5 - display_size = (int(width * display_scale), int(height * display_scale)) - reg_display_size = (int(640 * display_scale), int(480 * display_scale)) - - cv2.imshow("Segmented", cv2.resize(segmented_frame, display_size)) - cv2.imshow("Depth", cv2.resize(depth_frame, display_size)) - cv2.imshow("Mesh", cv2.resize(mesh_frame, display_size)) - cv2.imshow("Registration", cv2.resize(reg_frame, reg_display_size)) - cv2.imshow("Constraints", cv2.resize(constraints_frame, display_size)) - cv2.imshow("Robot Parameters", cv2.resize(robot_frame, display_size)) - - if cv2.waitKey(1) & 0xFF == 27: # ESC to exit - break - - cap.release() - segmented_out.release() - depth_out.release() - mesh_out.release() - reg_out.release() - constraints_out.release() - robot_out.release() - cv2.destroyAllWindows() - - print(f"Processing complete. Results saved to:") - print(f"- Hand mask: {segmented_output_path}") - print(f"- Depth visualization: {depth_output_path}") - print(f"- Mesh visualization: {mesh_output_path}") - print(f"- Registration visualization: {registration_output_path}") - print(f"- Constraints visualization: {constraints_output_path}") - print(f"- Robot parameters: {robot_output_path}") - - return { - "segmented": segmented_output_path, - "depth": depth_output_path, - "mesh": mesh_output_path, - "registration": registration_output_path, - "constraints": constraints_output_path, - "robot": robot_output_path, - }