move video handling into main

This commit is contained in:
Ethan Clark 2025-03-24 16:32:07 -07:00
parent ececd98449
commit d6c61b2bb4
2 changed files with 244 additions and 250 deletions

View File

@ -1,19 +1,247 @@
import os
import cv2
import time
import numpy as np
from tqdm import tqdm
from utils.visualizations import *
from process_hand import ProcessHand
from robot_manager import RobotManager
# TODO: Move record_video and process_video here
def record_video() -> str | None:
output_dir = os.path.join(os.getcwd(), "recordings")
os.makedirs(output_dir, exist_ok=True)
timestamp = time.strftime("%Y%m%d-%H%M%S")
video_filename = os.path.join(output_dir, f"recorded_video_{timestamp}.mp4")
cap = cv2.VideoCapture(0)
assert cap.isOpened(), "Failed to open camera."
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = None
recording = False
did_record = False
print("Camera is active. Press 'r' to start/stop recording, 'q' to quit.")
while cap.isOpened():
success, frame = cap.read()
assert success, "Failed to read from camera."
# Mirror the image for more intuitive viewing
frame = cv2.flip(frame, 1)
# Create a separate display frame for showing the recording indicator
display_frame = frame.copy()
# Display recording indicator ONLY on the display frame
if recording:
cv2.circle(display_frame, (30, 30), 15, (0, 0, 255), -1)
cv2.putText(
display_frame,
"RECORDING",
(50, 40),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(0, 0, 255),
2,
cv2.LINE_AA,
)
# Show the display frame (with indicator if recording)
cv2.imshow("Video Recording", display_frame)
# Write the original frame (without indicator) to video file if recording
if recording and out is not None:
out.write(frame)
key = cv2.waitKey(1) & 0xFF
if key == ord("r"):
recording = not recording
if recording:
print(f"Started recording to {video_filename}")
out = cv2.VideoWriter(video_filename, fourcc, fps, (width, height))
else:
if out is not None:
out.release()
print(f"Stopped recording. Video saved to {video_filename}")
did_record = True
break
if out is not None:
out.release()
cap.release()
cv2.destroyAllWindows()
return video_filename if did_record else None
def main() -> None:
processor = ProcessHand()
def process_video(video_path: str) -> tuple:
assert video_path, "Video path is required."
cap = cv2.VideoCapture(video_path)
assert cap.isOpened(), f"Failed to open video file {video_path}"
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Update camera intrinsics based on video dimensions
camera_intrinsics = (width * 0.8, height * 0.8, width / 2, height / 2)
processor = ProcessHand(camera_intrinsics)
base_path = os.path.splitext(video_path)[0]
segmented_output_path = f"{base_path}_masked.mp4"
depth_output_path = f"{base_path}_depth.mp4"
mesh_output_path = f"{base_path}_mesh.mp4"
registration_output_path = f"{base_path}_registration.mp4"
constraints_output_path = f"{base_path}_constraints.mp4"
robot_output_path = f"{base_path}_robot.mp4"
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
segmented_out = cv2.VideoWriter(segmented_output_path, fourcc, fps, (width, height))
depth_out = cv2.VideoWriter(depth_output_path, fourcc, fps, (width, height))
mesh_out = cv2.VideoWriter(mesh_output_path, fourcc, fps, (width, height))
reg_out = cv2.VideoWriter(registration_output_path, fourcc, fps, (width, height))
constraints_out = cv2.VideoWriter(
constraints_output_path, fourcc, fps, (width, height)
)
robot_out = cv2.VideoWriter(robot_output_path, fourcc, fps, (width, height))
print(f"Processing video with {total_frames} frames...")
for _ in tqdm(range(total_frames)):
success, frame = cap.read()
assert success, "Failed to read frame from video."
# Convert image to RGB for MediaPipe
image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Process with MediaPipe Hands
results = processor.hands.process(image_rgb)
# Initialize output frames
segmented_frame = frame.copy()
depth_frame = np.zeros((height, width, 3), dtype=np.uint8)
mesh_frame = frame.copy()
reg_frame = (
np.ones((480, 640, 3), dtype=np.uint8) * 255
) # Fixed size, white background
constraints_frame = frame.copy()
robot_frame = frame.copy()
# Process if hand is detected
if results.multi_hand_landmarks:
for hand_landmarks in results.multi_hand_landmarks:
segmented_mask = processor.create_mask(frame, hand_landmarks)
mask_overlay = frame.copy()
mask_overlay[segmented_mask > 0] = [0, 0, 255] # Red color for mask
segmented_frame = cv2.addWeighted(frame, 0.7, mask_overlay, 0.3, 0)
depth_map, depth_vis = processor.estimate_depth(frame)
depth_colored = cv2.applyColorMap(depth_vis, cv2.COLORMAP_JET)
depth_frame = depth_colored.copy()
cloud = processor.create_cloud(depth_map, segmented_mask)
hand_vertices, hand_faces = processor.create_mesh(
hand_landmarks, (width, height)
)
mesh_frame = visualize_mesh(frame, hand_vertices, hand_faces)
icp_transform = processor.icp_registration(cloud, hand_vertices)
reg_frame = visualize_registration(cloud, hand_vertices, icp_transform)
refined_landmarks = processor.refine_landmarks(
hand_landmarks, icp_transform, (width, height)
)
original_refined = refined_landmarks.copy()
constrained_landmarks = processor.apply_constraints(refined_landmarks)
constraints_frame = visualize_constraints(
frame,
original_refined,
constrained_landmarks,
camera_intrinsics,
)
position, orientation, gripper_width = processor.get_robot_params(
constrained_landmarks
)
robot_frame = visualize_robot_params(
frame,
position,
orientation,
gripper_width,
camera_intrinsics,
)
segmented_out.write(segmented_frame)
depth_out.write(depth_frame)
mesh_out.write(mesh_frame)
reg_out.write(reg_frame)
constraints_out.write(constraints_frame)
robot_out.write(robot_frame)
display_scale = 0.5
display_size = (int(width * display_scale), int(height * display_scale))
reg_display_size = (int(640 * display_scale), int(480 * display_scale))
cv2.imshow("Segmented", cv2.resize(segmented_frame, display_size))
cv2.imshow("Depth", cv2.resize(depth_frame, display_size))
cv2.imshow("Mesh", cv2.resize(mesh_frame, display_size))
cv2.imshow("Registration", cv2.resize(reg_frame, reg_display_size))
cv2.imshow("Constraints", cv2.resize(constraints_frame, display_size))
cv2.imshow("Robot Parameters", cv2.resize(robot_frame, display_size))
if cv2.waitKey(1) & 0xFF == 27: # ESC to exit
break
cap.release()
segmented_out.release()
depth_out.release()
mesh_out.release()
reg_out.release()
constraints_out.release()
robot_out.release()
cv2.destroyAllWindows()
print(f"Processing complete. Results saved to:")
print(f"- Hand mask: {segmented_output_path}")
print(f"- Depth visualization: {depth_output_path}")
print(f"- Mesh visualization: {mesh_output_path}")
print(f"- Registration visualization: {registration_output_path}")
print(f"- Constraints visualization: {constraints_output_path}")
print(f"- Robot parameters: {robot_output_path}")
return {
"segmented": segmented_output_path,
"depth": depth_output_path,
"mesh": mesh_output_path,
"registration": registration_output_path,
"constraints": constraints_output_path,
"robot": robot_output_path,
}
def main():
record_option = input("Record a new video? (y/n): ")
if record_option.lower() == "y":
print("Starting video recording...")
video_path = processor.record_video()
video_path = record_video()
if not video_path:
print("Video recording failed.")
return
@ -29,7 +257,7 @@ def main() -> None:
print("Error: Video file not found.")
video_path = input("Enter the path to the video file: ")
processor.process_video(video_path)
process_video(video_path)
if __name__ == "__main__":

View File

@ -1,14 +1,10 @@
import os
import time
import cv2
import torch
import numpy as np
import open3d as o3d
import mediapipe as mp
from tqdm import tqdm
from collections import deque
from utils.visualizations import *
from sam2.sam2_image_predictor import SAM2ImagePredictor
@ -20,7 +16,9 @@ MAXIMUM_HAND_WIDTH_MM = 100.0
class ProcessHand:
def __init__(self) -> None:
def __init__(self, camera_intrinsics: tuple) -> None:
self.camera_intrinsics = camera_intrinsics
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Initialize MediaPipe Hands for hand detection
@ -57,7 +55,7 @@ class ProcessHand:
Create a segmentation mask over the hand using SAM2 model
"""
def _create_mask(self, frame: np.ndarray, landmarks: list) -> np.ndarray:
def create_mask(self, frame: np.ndarray, landmarks: list) -> np.ndarray:
height, width = frame.shape[:2]
# Convert image to RGB for SAM2 model
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
@ -91,7 +89,7 @@ class ProcessHand:
Estimate depth map using MiDaS model
"""
def _estimate_depth(self, image: np.ndarray) -> tuple:
def estimate_depth(self, image: np.ndarray) -> tuple:
img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Transform img for MiDaS model
@ -121,7 +119,7 @@ class ProcessHand:
by back-projecting to 3D using camera intrinsics and depth values
"""
def _create_cloud(
def create_cloud(
self, depth_map: np.ndarray, segmented_mask: np.ndarray
) -> np.ndarray:
focal_x, focal_y, center_x, center_y = self.camera_intrinsics
@ -149,7 +147,7 @@ class ProcessHand:
Create hand mesh from hand landmarks
"""
def _create_mesh(self, landmarks: list, image_dims: tuple) -> np.ndarray:
def create_mesh(self, landmarks: list, image_dims: tuple) -> np.ndarray:
width, height = image_dims
# Extract just z values to understand their range
@ -209,7 +207,7 @@ class ProcessHand:
Align hand mesh to point cloud using ICP for accurate 3D reconstruction
"""
def _icp_registration(self, cloud: np.ndarray, vertices: np.ndarray) -> np.ndarray:
def icp_registration(self, cloud: np.ndarray, vertices: np.ndarray) -> np.ndarray:
source = o3d.geometry.PointCloud()
source.points = o3d.utility.Vector3dVector(vertices)
@ -246,7 +244,7 @@ class ProcessHand:
Refine landmarks based on the icp transformation
"""
def _refine_landmarks(
def refine_landmarks(
self, landmarks: list, transform: int, image_dims: tuple
) -> list:
width, height = image_dims
@ -271,7 +269,7 @@ class ProcessHand:
return refined_landmarks
# TODO: Implement better constraints that limit last joint of each finger to a single DOF
def _apply_constraints(self, landmarks: list):
def apply_constraints(self, landmarks: list):
constrained = np.array(landmarks)
# Define finger joint indices
@ -313,7 +311,7 @@ class ProcessHand:
3. Gripper Width: Distance between thumb tip and index tip
"""
def _get_robot_params(self, refined_landmarks: list) -> tuple:
def get_robot_params(self, refined_landmarks: list) -> tuple:
landmarks = np.array(refined_landmarks)
# Define indices for specific parts of the hand
@ -388,235 +386,3 @@ class ProcessHand:
# rotation_matrix = rotation_in_robot_frame
return position, rotation_matrix, gripper_width
def record_video(self) -> str:
output_dir = os.path.join(os.getcwd(), "recordings")
os.makedirs(output_dir, exist_ok=True)
timestamp = time.strftime("%Y%m%d-%H%M%S")
video_filename = os.path.join(output_dir, f"recorded_video_{timestamp}.mp4")
cap = cv2.VideoCapture(0)
assert cap.isOpened(), "Failed to open camera."
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
# Update camera intrinsics based on video dimensions
self.camera_intrinsics = (width * 0.8, height * 0.8, width / 2, height / 2)
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = None
recording = False
did_record = False
print("Camera is active. Press 'r' to start/stop recording, 'q' to quit.")
while cap.isOpened():
success, frame = cap.read()
assert success, "Failed to read from camera."
# Mirror the image for more intuitive viewing
frame = cv2.flip(frame, 1)
# Create a separate display frame for showing the recording indicator
display_frame = frame.copy()
# Display recording indicator ONLY on the display frame
if recording:
cv2.circle(display_frame, (30, 30), 15, (0, 0, 255), -1)
cv2.putText(
display_frame,
"RECORDING",
(50, 40),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(0, 0, 255),
2,
cv2.LINE_AA,
)
# Show the display frame (with indicator if recording)
cv2.imshow("Video Recording", display_frame)
# Write the original frame (without indicator) to video file if recording
if recording and out is not None:
out.write(frame)
key = cv2.waitKey(1) & 0xFF
if key == ord("r"):
recording = not recording
if recording:
print(f"Started recording to {video_filename}")
out = cv2.VideoWriter(video_filename, fourcc, fps, (width, height))
else:
if out is not None:
out.release()
print(f"Stopped recording. Video saved to {video_filename}")
did_record = True
break
if out is not None:
out.release()
cap.release()
cv2.destroyAllWindows()
return video_filename if did_record else None
def process_video(self, video_path: str) -> tuple:
assert video_path, "Video path is required."
cap = cv2.VideoCapture(video_path)
assert cap.isOpened(), f"Failed to open video file {video_path}"
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Update camera intrinsics based on video dimensions
self.camera_intrinsics = (width * 0.8, height * 0.8, width / 2, height / 2)
base_path = os.path.splitext(video_path)[0]
segmented_output_path = f"{base_path}_masked.mp4"
depth_output_path = f"{base_path}_depth.mp4"
mesh_output_path = f"{base_path}_mesh.mp4"
registration_output_path = f"{base_path}_registration.mp4"
constraints_output_path = f"{base_path}_constraints.mp4"
robot_output_path = f"{base_path}_robot.mp4"
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
segmented_out = cv2.VideoWriter(
segmented_output_path, fourcc, fps, (width, height)
)
depth_out = cv2.VideoWriter(depth_output_path, fourcc, fps, (width, height))
mesh_out = cv2.VideoWriter(mesh_output_path, fourcc, fps, (width, height))
reg_out = cv2.VideoWriter(
registration_output_path, fourcc, fps, (640, 480)
) # Fixed size
constraints_out = cv2.VideoWriter(
constraints_output_path, fourcc, fps, (width, height)
)
robot_out = cv2.VideoWriter(robot_output_path, fourcc, fps, (width, height))
print(f"Processing video with {total_frames} frames...")
for _ in tqdm(range(total_frames)):
success, frame = cap.read()
assert success, "Failed to read frame from video."
# Convert image to RGB for MediaPipe
image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Process with MediaPipe Hands
results = self.hands.process(image_rgb)
# Initialize output frames
segmented_frame = frame.copy()
depth_frame = np.zeros((height, width, 3), dtype=np.uint8)
mesh_frame = frame.copy()
reg_frame = (
np.ones((480, 640, 3), dtype=np.uint8) * 255
) # Fixed size, white background
constraints_frame = frame.copy()
robot_frame = frame.copy()
# Process if hand is detected
if results.multi_hand_landmarks:
for hand_landmarks in results.multi_hand_landmarks:
segmented_mask = self._create_mask(frame, hand_landmarks)
mask_overlay = frame.copy()
mask_overlay[segmented_mask > 0] = [0, 0, 255] # Red color for mask
segmented_frame = cv2.addWeighted(frame, 0.7, mask_overlay, 0.3, 0)
depth_map, depth_vis = self._estimate_depth(frame)
depth_colored = cv2.applyColorMap(depth_vis, cv2.COLORMAP_JET)
depth_frame = depth_colored.copy()
cloud = self._create_cloud(depth_map, segmented_mask)
hand_vertices, hand_faces = self._create_mesh(
hand_landmarks, (width, height)
)
mesh_frame = visualize_mesh(frame, hand_vertices, hand_faces)
icp_transform = self._icp_registration(cloud, hand_vertices)
reg_frame = visualize_registration(
cloud, hand_vertices, icp_transform
)
refined_landmarks = self._refine_landmarks(
hand_landmarks, icp_transform, (width, height)
)
original_refined = refined_landmarks.copy()
constrained_landmarks = self._apply_constraints(refined_landmarks)
constraints_frame = visualize_constraints(
frame,
original_refined,
constrained_landmarks,
self.camera_intrinsics,
)
position, orientation, gripper_width = self._get_robot_params(
constrained_landmarks
)
robot_frame = visualize_robot_params(
frame,
position,
orientation,
gripper_width,
self.camera_intrinsics,
)
segmented_out.write(segmented_frame)
depth_out.write(depth_frame)
mesh_out.write(mesh_frame)
reg_out.write(reg_frame)
constraints_out.write(constraints_frame)
robot_out.write(robot_frame)
display_scale = 0.5
display_size = (int(width * display_scale), int(height * display_scale))
reg_display_size = (int(640 * display_scale), int(480 * display_scale))
cv2.imshow("Segmented", cv2.resize(segmented_frame, display_size))
cv2.imshow("Depth", cv2.resize(depth_frame, display_size))
cv2.imshow("Mesh", cv2.resize(mesh_frame, display_size))
cv2.imshow("Registration", cv2.resize(reg_frame, reg_display_size))
cv2.imshow("Constraints", cv2.resize(constraints_frame, display_size))
cv2.imshow("Robot Parameters", cv2.resize(robot_frame, display_size))
if cv2.waitKey(1) & 0xFF == 27: # ESC to exit
break
cap.release()
segmented_out.release()
depth_out.release()
mesh_out.release()
reg_out.release()
constraints_out.release()
robot_out.release()
cv2.destroyAllWindows()
print(f"Processing complete. Results saved to:")
print(f"- Hand mask: {segmented_output_path}")
print(f"- Depth visualization: {depth_output_path}")
print(f"- Mesh visualization: {mesh_output_path}")
print(f"- Registration visualization: {registration_output_path}")
print(f"- Constraints visualization: {constraints_output_path}")
print(f"- Robot parameters: {robot_output_path}")
return {
"segmented": segmented_output_path,
"depth": depth_output_path,
"mesh": mesh_output_path,
"registration": registration_output_path,
"constraints": constraints_output_path,
"robot": robot_output_path,
}