diff --git a/croppa/capture.py b/croppa/capture.py new file mode 100644 index 0000000..28eab56 --- /dev/null +++ b/croppa/capture.py @@ -0,0 +1,87 @@ +import cv2 + + +class Cv2BufferedCap: + """Buffered wrapper around cv2.VideoCapture that handles frame loading, seeking, and caching correctly""" + + def __init__(self, video_path, backend=None): + self.video_path = video_path + self.cap = cv2.VideoCapture(str(video_path), backend) + if not self.cap.isOpened(): + raise ValueError(f"Could not open video: {video_path}") + + # Video properties + self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) + self.fps = self.cap.get(cv2.CAP_PROP_FPS) + self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + + # Frame cache + self.frame_cache = {} + self.cache_access_order = [] + self.MAX_CACHE_FRAMES = 3000 + + # Current position tracking + self.current_frame = 0 + + def _manage_cache(self): + """Manage cache size using LRU eviction""" + while len(self.frame_cache) > self.MAX_CACHE_FRAMES: + oldest_frame = self.cache_access_order.pop(0) + if oldest_frame in self.frame_cache: + del self.frame_cache[oldest_frame] + + def _add_to_cache(self, frame_number, frame): + """Add frame to cache""" + self.frame_cache[frame_number] = frame.copy() + if frame_number in self.cache_access_order: + self.cache_access_order.remove(frame_number) + self.cache_access_order.append(frame_number) + self._manage_cache() + + def _get_from_cache(self, frame_number): + """Get frame from cache and update LRU""" + if frame_number in self.frame_cache: + if frame_number in self.cache_access_order: + self.cache_access_order.remove(frame_number) + self.cache_access_order.append(frame_number) + return self.frame_cache[frame_number].copy() + return None + + def get_frame(self, frame_number): + """Get frame at specific index - always accurate""" + # Clamp frame number to valid range + frame_number = max(0, min(frame_number, self.total_frames - 1)) + + # Check cache first + cached_frame = self._get_from_cache(frame_number) + if cached_frame is not None: + self.current_frame = frame_number + return cached_frame + + # Not in cache, seek to frame and read + self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) + ret, frame = self.cap.read() + + if ret: + self._add_to_cache(frame_number, frame) + self.current_frame = frame_number + return frame + else: + raise ValueError(f"Failed to read frame {frame_number}") + + def advance_frame(self, frames=1): + """Advance by specified number of frames""" + new_frame = self.current_frame + frames + return self.get_frame(new_frame) + + def release(self): + """Release the video capture""" + if self.cap: + self.cap.release() + + def isOpened(self): + """Check if capture is opened""" + return self.cap and self.cap.isOpened() + + diff --git a/croppa/main.py b/croppa/main.py index cc0264e..5017b02 100644 --- a/croppa/main.py +++ b/croppa/main.py @@ -606,6 +606,18 @@ class ProjectView: # Ensure scroll offset doesn't go negative or beyond available content self.scroll_offset = max(0, min(self.scroll_offset, total_rows - visible_rows)) +# Use extracted implementations while preserving original definitions above +from .capture import Cv2BufferedCap as _ExtCv2BufferedCap +Cv2BufferedCap = _ExtCv2BufferedCap + +from .tracking import MotionTracker as _ExtMotionTracker +MotionTracker = _ExtMotionTracker + +from .utils import get_active_window_title as _ext_get_active_window_title +get_active_window_title = _ext_get_active_window_title + + + class VideoEditor: # Configuration constants BASE_FRAME_DELAY_MS = 16 # ~60 FPS diff --git a/croppa/project_view.py b/croppa/project_view.py new file mode 100644 index 0000000..b0d760d --- /dev/null +++ b/croppa/project_view.py @@ -0,0 +1,110 @@ +from pathlib import Path +from typing import Dict + +import cv2 +import numpy as np + + +class ProjectView: + """Project view that displays videos in current directory with progress bars""" + + THUMBNAIL_SIZE = (200, 150) + THUMBNAIL_MARGIN = 20 + PROGRESS_BAR_HEIGHT = 8 + TEXT_HEIGHT = 30 + + BG_COLOR = (40, 40, 40) + THUMBNAIL_BG_COLOR = (60, 60, 60) + PROGRESS_BG_COLOR = (80, 80, 80) + PROGRESS_FILL_COLOR = (0, 120, 255) + TEXT_COLOR = (255, 255, 255) + SELECTED_COLOR = (255, 165, 0) + + def __init__(self, directory: Path, video_editor): + self.directory = directory + self.video_editor = video_editor + self.video_files = [] + self.thumbnails: Dict[Path, np.ndarray] = {} + self.progress_data = {} + self.selected_index = 0 + self.scroll_offset = 0 + self.items_per_row = 2 + self.window_width = 1200 + self.window_height = 800 + + self._load_video_files() + self._load_progress_data() + + def _calculate_thumbnail_size(self, window_width: int) -> tuple: + available_width = window_width - self.THUMBNAIL_MARGIN + item_width = (available_width - (self.items_per_row - 1) * self.THUMBNAIL_MARGIN) // self.items_per_row + thumbnail_width = max(50, item_width) + thumbnail_height = int(thumbnail_width * self.THUMBNAIL_SIZE[1] / self.THUMBNAIL_SIZE[0]) + return (thumbnail_width, thumbnail_height) + + def _load_video_files(self): + self.video_files = [] + for file_path in self.directory.iterdir(): + if (file_path.is_file() and + file_path.suffix.lower() in self.video_editor.VIDEO_EXTENSIONS): + self.video_files.append(file_path) + self.video_files.sort(key=lambda x: x.name) + + def _load_progress_data(self): + self.progress_data = {} + for video_path in self.video_files: + state_file = video_path.with_suffix('.json') + if state_file.exists(): + try: + with open(state_file, 'r') as f: + import json + state = json.load(f) + current_frame = state.get('current_frame', 0) + cap = cv2.VideoCapture(str(video_path)) + if cap.isOpened(): + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + cap.release() + if total_frames > 0: + progress = current_frame / (total_frames - 1) + self.progress_data[video_path] = { + 'current_frame': current_frame, + 'total_frames': total_frames, + 'progress': progress + } + except Exception as e: # noqa: BLE001 - preserve original behavior + print(f"Error loading progress for {video_path.name}: {e}") + + def refresh_progress_data(self): + self._load_progress_data() + + def get_progress_for_video(self, video_path: Path) -> float: + if video_path in self.progress_data: + return self.progress_data[video_path]['progress'] + return 0.0 + + def get_thumbnail_for_video(self, video_path: Path, size: tuple = None) -> np.ndarray: + if size is None: + size = self.THUMBNAIL_SIZE + if video_path in self.thumbnails: + original_thumbnail = self.thumbnails[video_path] + return cv2.resize(original_thumbnail, size) + try: + cap = cv2.VideoCapture(str(video_path)) + if cap.isOpened(): + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + if total_frames > 0: + middle_frame = total_frames // 2 + cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame) + ret, frame = cap.read() + if ret: + original_thumbnail = cv2.resize(frame, self.THUMBNAIL_SIZE) + self.thumbnails[video_path] = original_thumbnail + cap.release() + return cv2.resize(original_thumbnail, size) + cap.release() + except Exception as e: # noqa: BLE001 - preserve original behavior + print(f"Error generating thumbnail for {video_path.name}: {e}") + placeholder = np.full((size[1], size[0], 3), self.THUMBNAIL_BG_COLOR, dtype=np.uint8) + return placeholder + + # draw() and input handling remain in main editor for now to minimize churn diff --git a/croppa/tracking.py b/croppa/tracking.py new file mode 100644 index 0000000..ded1394 --- /dev/null +++ b/croppa/tracking.py @@ -0,0 +1,149 @@ +from typing import List, Dict, Tuple, Optional + + +class MotionTracker: + """Handles motion tracking for crop and pan operations""" + + def __init__(self): + self.tracking_points = {} # {frame_number: [(x, y), ...]} + self.tracking_enabled = False + self.base_crop_rect = None # Original crop rect when tracking started + self.base_zoom_center = None # Original zoom center when tracking started + + def add_tracking_point(self, frame_number: int, x: int, y: int): + """Add a tracking point at the specified frame and coordinates""" + if frame_number not in self.tracking_points: + self.tracking_points[frame_number] = [] + self.tracking_points[frame_number].append((x, y)) + + def remove_tracking_point(self, frame_number: int, point_index: int): + """Remove a tracking point by frame and index""" + if frame_number in self.tracking_points and 0 <= point_index < len(self.tracking_points[frame_number]): + del self.tracking_points[frame_number][point_index] + if not self.tracking_points[frame_number]: + del self.tracking_points[frame_number] + + def clear_tracking_points(self): + """Clear all tracking points""" + self.tracking_points.clear() + + def get_tracking_points_for_frame(self, frame_number: int) -> List[Tuple[int, int]]: + """Get all tracking points for a specific frame""" + return self.tracking_points.get(frame_number, []) + + def has_tracking_points(self) -> bool: + """Check if any tracking points exist""" + return bool(self.tracking_points) + + def get_interpolated_position(self, frame_number: int) -> Optional[Tuple[float, float]]: + """Get interpolated position for a frame based on tracking points""" + if not self.tracking_points: + return None + + # Get all frames with tracking points + frames = sorted(self.tracking_points.keys()) + + if not frames: + return None + + # If we have a point at this exact frame, return it + if frame_number in self.tracking_points: + points = self.tracking_points[frame_number] + if points: + # Return average of all points at this frame + avg_x = sum(p[0] for p in points) / len(points) + avg_y = sum(p[1] for p in points) / len(points) + return (avg_x, avg_y) + + # If frame is before first tracking point + if frame_number < frames[0]: + points = self.tracking_points[frames[0]] + if points: + avg_x = sum(p[0] for p in points) / len(points) + avg_y = sum(p[1] for p in points) / len(points) + return (avg_x, avg_y) + + # If frame is after last tracking point + if frame_number > frames[-1]: + points = self.tracking_points[frames[-1]] + if points: + avg_x = sum(p[0] for p in points) / len(points) + avg_y = sum(p[1] for p in points) / len(points) + return (avg_x, avg_y) + + # Find the two frames to interpolate between + for i in range(len(frames) - 1): + if frames[i] <= frame_number <= frames[i + 1]: + frame1, frame2 = frames[i], frames[i + 1] + points1 = self.tracking_points[frame1] + points2 = self.tracking_points[frame2] + + if not points1 or not points2: + continue + + # Get average positions for each frame + avg_x1 = sum(p[0] for p in points1) / len(points1) + avg_y1 = sum(p[1] for p in points1) / len(points1) + avg_x2 = sum(p[0] for p in points2) / len(points2) + avg_y2 = sum(p[1] for p in points2) / len(points2) + + # Linear interpolation + t = (frame_number - frame1) / (frame2 - frame1) + interp_x = avg_x1 + t * (avg_x2 - avg_x1) + interp_y = avg_y1 + t * (avg_y2 - avg_y1) + + return (interp_x, interp_y) + + return None + + def get_tracking_offset(self, frame_number: int) -> Tuple[float, float]: + """Get the offset to center the crop on the tracked point""" + if not self.tracking_enabled or not self.base_zoom_center: + return (0.0, 0.0) + + current_pos = self.get_interpolated_position(frame_number) + if not current_pos: + return (0.0, 0.0) + + # Calculate offset to center the crop on the tracked point + # The offset should move the display so the tracked point stays centered + offset_x = current_pos[0] - self.base_zoom_center[0] + offset_y = current_pos[1] - self.base_zoom_center[1] + + return (offset_x, offset_y) + + def start_tracking(self, base_crop_rect: Tuple[int, int, int, int], base_zoom_center: Tuple[int, int]): + """Start motion tracking with base positions""" + self.tracking_enabled = True + self.base_crop_rect = base_crop_rect + self.base_zoom_center = base_zoom_center + + def stop_tracking(self): + """Stop motion tracking""" + self.tracking_enabled = False + self.base_crop_rect = None + self.base_zoom_center = None + + def to_dict(self) -> Dict: + """Convert to dictionary for serialization""" + return { + 'tracking_points': self.tracking_points, + 'tracking_enabled': self.tracking_enabled, + 'base_crop_rect': self.base_crop_rect, + 'base_zoom_center': self.base_zoom_center + } + + def from_dict(self, data: Dict): + """Load from dictionary for deserialization""" + # Convert string keys back to integers for tracking_points + tracking_points_data = data.get('tracking_points', {}) + self.tracking_points = {} + for frame_str, points in tracking_points_data.items(): + frame_num = int(frame_str) # Convert string key to integer + self.tracking_points[frame_num] = points + + self.tracking_enabled = data.get('tracking_enabled', False) + self.base_crop_rect = data.get('base_crop_rect', None) + self.base_zoom_center = data.get('base_zoom_center', None) + + diff --git a/croppa/utils.py b/croppa/utils.py new file mode 100644 index 0000000..d3580e2 --- /dev/null +++ b/croppa/utils.py @@ -0,0 +1,13 @@ +import ctypes + + +def get_active_window_title(): + """Get the title of the currently active window""" + try: + hwnd = ctypes.windll.user32.GetForegroundWindow() + length = ctypes.windll.user32.GetWindowTextLengthW(hwnd) + buffer = ctypes.create_unicode_buffer(length + 1) + ctypes.windll.user32.GetWindowTextW(hwnd, buffer, length + 1) + return buffer.value + except: # noqa: E722 - preserve original broad exception style + return ""