diff --git a/croppa/main.py b/croppa/main.py index 5017b02..e310edb 100644 --- a/croppa/main.py +++ b/croppa/main.py @@ -11,612 +11,10 @@ import json import threading import queue import subprocess -import ctypes - -class Cv2BufferedCap: - """Buffered wrapper around cv2.VideoCapture that handles frame loading, seeking, and caching correctly""" - - def __init__(self, video_path, backend=None): - self.video_path = video_path - self.cap = cv2.VideoCapture(str(video_path), backend) - if not self.cap.isOpened(): - raise ValueError(f"Could not open video: {video_path}") - - # Video properties - self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) - self.fps = self.cap.get(cv2.CAP_PROP_FPS) - self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - - # Frame cache - self.frame_cache = {} - self.cache_access_order = [] - self.MAX_CACHE_FRAMES = 3000 - - # Current position tracking - self.current_frame = 0 - - def _manage_cache(self): - """Manage cache size using LRU eviction""" - while len(self.frame_cache) > self.MAX_CACHE_FRAMES: - oldest_frame = self.cache_access_order.pop(0) - if oldest_frame in self.frame_cache: - del self.frame_cache[oldest_frame] - - def _add_to_cache(self, frame_number, frame): - """Add frame to cache""" - self.frame_cache[frame_number] = frame.copy() - if frame_number in self.cache_access_order: - self.cache_access_order.remove(frame_number) - self.cache_access_order.append(frame_number) - self._manage_cache() - - def _get_from_cache(self, frame_number): - """Get frame from cache and update LRU""" - if frame_number in self.frame_cache: - if frame_number in self.cache_access_order: - self.cache_access_order.remove(frame_number) - self.cache_access_order.append(frame_number) - return self.frame_cache[frame_number].copy() - return None - - def get_frame(self, frame_number): - """Get frame at specific index - always accurate""" - # Clamp frame number to valid range - frame_number = max(0, min(frame_number, self.total_frames - 1)) - - # Check cache first - cached_frame = self._get_from_cache(frame_number) - if cached_frame is not None: - self.current_frame = frame_number - return cached_frame - - # Not in cache, seek to frame and read - self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) - ret, frame = self.cap.read() - - if ret: - self._add_to_cache(frame_number, frame) - self.current_frame = frame_number - return frame - else: - raise ValueError(f"Failed to read frame {frame_number}") - - def advance_frame(self, frames=1): - """Advance by specified number of frames""" - new_frame = self.current_frame + frames - return self.get_frame(new_frame) - - def release(self): - """Release the video capture""" - if self.cap: - self.cap.release() - - def isOpened(self): - """Check if capture is opened""" - return self.cap and self.cap.isOpened() - - -class MotionTracker: - """Handles motion tracking for crop and pan operations""" - - def __init__(self): - self.tracking_points = {} # {frame_number: [(x, y), ...]} - self.tracking_enabled = False - self.base_crop_rect = None # Original crop rect when tracking started - self.base_zoom_center = None # Original zoom center when tracking started - - def add_tracking_point(self, frame_number: int, x: int, y: int): - """Add a tracking point at the specified frame and coordinates""" - if frame_number not in self.tracking_points: - self.tracking_points[frame_number] = [] - self.tracking_points[frame_number].append((x, y)) - - def remove_tracking_point(self, frame_number: int, point_index: int): - """Remove a tracking point by frame and index""" - if frame_number in self.tracking_points and 0 <= point_index < len(self.tracking_points[frame_number]): - del self.tracking_points[frame_number][point_index] - if not self.tracking_points[frame_number]: - del self.tracking_points[frame_number] - - def clear_tracking_points(self): - """Clear all tracking points""" - self.tracking_points.clear() - - def get_tracking_points_for_frame(self, frame_number: int) -> List[Tuple[int, int]]: - """Get all tracking points for a specific frame""" - return self.tracking_points.get(frame_number, []) - - def has_tracking_points(self) -> bool: - """Check if any tracking points exist""" - return bool(self.tracking_points) - - def get_interpolated_position(self, frame_number: int) -> Optional[Tuple[float, float]]: - """Get interpolated position for a frame based on tracking points""" - if not self.tracking_points: - return None - - # Get all frames with tracking points - frames = sorted(self.tracking_points.keys()) - - if not frames: - return None - - # If we have a point at this exact frame, return it - if frame_number in self.tracking_points: - points = self.tracking_points[frame_number] - if points: - # Return average of all points at this frame - avg_x = sum(p[0] for p in points) / len(points) - avg_y = sum(p[1] for p in points) / len(points) - return (avg_x, avg_y) - - # If frame is before first tracking point - if frame_number < frames[0]: - points = self.tracking_points[frames[0]] - if points: - avg_x = sum(p[0] for p in points) / len(points) - avg_y = sum(p[1] for p in points) / len(points) - return (avg_x, avg_y) - - # If frame is after last tracking point - if frame_number > frames[-1]: - points = self.tracking_points[frames[-1]] - if points: - avg_x = sum(p[0] for p in points) / len(points) - avg_y = sum(p[1] for p in points) / len(points) - return (avg_x, avg_y) - - # Find the two frames to interpolate between - for i in range(len(frames) - 1): - if frames[i] <= frame_number <= frames[i + 1]: - frame1, frame2 = frames[i], frames[i + 1] - points1 = self.tracking_points[frame1] - points2 = self.tracking_points[frame2] - - if not points1 or not points2: - continue - - # Get average positions for each frame - avg_x1 = sum(p[0] for p in points1) / len(points1) - avg_y1 = sum(p[1] for p in points1) / len(points1) - avg_x2 = sum(p[0] for p in points2) / len(points2) - avg_y2 = sum(p[1] for p in points2) / len(points2) - - # Linear interpolation - t = (frame_number - frame1) / (frame2 - frame1) - interp_x = avg_x1 + t * (avg_x2 - avg_x1) - interp_y = avg_y1 + t * (avg_y2 - avg_y1) - - return (interp_x, interp_y) - - return None - - def get_tracking_offset(self, frame_number: int) -> Tuple[float, float]: - """Get the offset to center the crop on the tracked point""" - if not self.tracking_enabled or not self.base_zoom_center: - return (0.0, 0.0) - - current_pos = self.get_interpolated_position(frame_number) - if not current_pos: - return (0.0, 0.0) - - # Calculate offset to center the crop on the tracked point - # The offset should move the display so the tracked point stays centered - offset_x = current_pos[0] - self.base_zoom_center[0] - offset_y = current_pos[1] - self.base_zoom_center[1] - - return (offset_x, offset_y) - - def start_tracking(self, base_crop_rect: Tuple[int, int, int, int], base_zoom_center: Tuple[int, int]): - """Start motion tracking with base positions""" - self.tracking_enabled = True - self.base_crop_rect = base_crop_rect - self.base_zoom_center = base_zoom_center - - def stop_tracking(self): - """Stop motion tracking""" - self.tracking_enabled = False - self.base_crop_rect = None - self.base_zoom_center = None - - def to_dict(self) -> Dict: - """Convert to dictionary for serialization""" - return { - 'tracking_points': self.tracking_points, - 'tracking_enabled': self.tracking_enabled, - 'base_crop_rect': self.base_crop_rect, - 'base_zoom_center': self.base_zoom_center - } - - def from_dict(self, data: Dict): - """Load from dictionary for deserialization""" - # Convert string keys back to integers for tracking_points - tracking_points_data = data.get('tracking_points', {}) - self.tracking_points = {} - for frame_str, points in tracking_points_data.items(): - frame_num = int(frame_str) # Convert string key to integer - self.tracking_points[frame_num] = points - - self.tracking_enabled = data.get('tracking_enabled', False) - self.base_crop_rect = data.get('base_crop_rect', None) - self.base_zoom_center = data.get('base_zoom_center', None) - - -def get_active_window_title(): - """Get the title of the currently active window""" - try: - # Get handle to foreground window - hwnd = ctypes.windll.user32.GetForegroundWindow() - - # Get window title length - length = ctypes.windll.user32.GetWindowTextLengthW(hwnd) - - # Create buffer and get window title - buffer = ctypes.create_unicode_buffer(length + 1) - ctypes.windll.user32.GetWindowTextW(hwnd, buffer, length + 1) - - return buffer.value - except: - return "" - -class ProjectView: - """Project view that displays videos in current directory with progress bars""" - - # Project view configuration - THUMBNAIL_SIZE = (200, 150) # Width, Height - THUMBNAIL_MARGIN = 20 - PROGRESS_BAR_HEIGHT = 8 - TEXT_HEIGHT = 30 - - # Colors - BG_COLOR = (40, 40, 40) - THUMBNAIL_BG_COLOR = (60, 60, 60) - PROGRESS_BG_COLOR = (80, 80, 80) - PROGRESS_FILL_COLOR = (0, 120, 255) - TEXT_COLOR = (255, 255, 255) - SELECTED_COLOR = (255, 165, 0) - - def __init__(self, directory: Path, video_editor): - self.directory = directory - self.video_editor = video_editor - self.video_files = [] - self.thumbnails = {} - self.progress_data = {} - self.selected_index = 0 - self.scroll_offset = 0 - self.items_per_row = 2 # Default to 2 items per row - self.window_width = 1200 - self.window_height = 800 - - self._load_video_files() - self._load_progress_data() - - def _calculate_thumbnail_size(self, window_width: int) -> tuple: - """Calculate thumbnail size based on items per row and window width""" - available_width = window_width - self.THUMBNAIL_MARGIN - item_width = (available_width - (self.items_per_row - 1) * self.THUMBNAIL_MARGIN) // self.items_per_row - thumbnail_width = max(50, item_width) # Minimum 50px width - thumbnail_height = int(thumbnail_width * self.THUMBNAIL_SIZE[1] / self.THUMBNAIL_SIZE[0]) # Maintain aspect ratio - return (thumbnail_width, thumbnail_height) - - def _load_video_files(self): - """Load all video files from directory""" - self.video_files = [] - for file_path in self.directory.iterdir(): - if (file_path.is_file() and - file_path.suffix.lower() in self.video_editor.VIDEO_EXTENSIONS): - self.video_files.append(file_path) - self.video_files.sort(key=lambda x: x.name) - - def _load_progress_data(self): - """Load progress data from JSON state files""" - self.progress_data = {} - for video_path in self.video_files: - state_file = video_path.with_suffix('.json') - if state_file.exists(): - try: - with open(state_file, 'r') as f: - state = json.load(f) - current_frame = state.get('current_frame', 0) - - # Get total frames from video - cap = cv2.VideoCapture(str(video_path)) - if cap.isOpened(): - total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) - cap.release() - - if total_frames > 0: - progress = current_frame / (total_frames - 1) - self.progress_data[video_path] = { - 'current_frame': current_frame, - 'total_frames': total_frames, - 'progress': progress - } - except Exception as e: - print(f"Error loading progress for {video_path.name}: {e}") - - def refresh_progress_data(self): - """Refresh progress data from JSON files (call when editor state changes)""" - self._load_progress_data() - - def get_progress_for_video(self, video_path: Path) -> float: - """Get progress (0.0 to 1.0) for a video""" - if video_path in self.progress_data: - return self.progress_data[video_path]['progress'] - return 0.0 - - def get_thumbnail_for_video(self, video_path: Path, size: tuple = None) -> np.ndarray: - """Get thumbnail for a video, generating it if needed""" - if size is None: - size = self.THUMBNAIL_SIZE - - # Cache the original thumbnail by video path only (not size) - if video_path in self.thumbnails: - original_thumbnail = self.thumbnails[video_path] - # Resize the cached thumbnail to the requested size - return cv2.resize(original_thumbnail, size) - - # Generate original thumbnail on demand (only once per video) - try: - cap = cv2.VideoCapture(str(video_path)) - if cap.isOpened(): - total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) - if total_frames > 0: - middle_frame = total_frames // 2 - cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame) - ret, frame = cap.read() - if ret: - # Store original thumbnail at original size - original_thumbnail = cv2.resize(frame, self.THUMBNAIL_SIZE) - self.thumbnails[video_path] = original_thumbnail - cap.release() - # Return resized version - return cv2.resize(original_thumbnail, size) - cap.release() - except Exception as e: - print(f"Error generating thumbnail for {video_path.name}: {e}") - - # Return a placeholder if thumbnail generation failed - placeholder = np.full((size[1], size[0], 3), - self.THUMBNAIL_BG_COLOR, dtype=np.uint8) - return placeholder - - def draw(self) -> np.ndarray: - """Draw the project view""" - # Get actual window size dynamically - try: - # Try to get the actual window size from OpenCV - window_rect = cv2.getWindowImageRect("Project View") - if window_rect[2] > 0 and window_rect[3] > 0: # width and height > 0 - actual_width = window_rect[2] - actual_height = window_rect[3] - else: - # Fallback to default size - actual_width = self.window_width - actual_height = self.window_height - except: - # Fallback to default size - actual_width = self.window_width - actual_height = self.window_height - - canvas = np.full((actual_height, actual_width, 3), self.BG_COLOR, dtype=np.uint8) - - if not self.video_files: - # No videos message - text = "No videos found in directory" - font = cv2.FONT_HERSHEY_SIMPLEX - text_size = cv2.getTextSize(text, font, 1.0, 2)[0] - text_x = (actual_width - text_size[0]) // 2 - text_y = (actual_height - text_size[1]) // 2 - cv2.putText(canvas, text, (text_x, text_y), font, 1.0, self.TEXT_COLOR, 2) - return canvas - - # Calculate layout - use fixed items_per_row and calculate thumbnail size to fit - items_per_row = min(self.items_per_row, len(self.video_files)) # Don't exceed number of videos - - # Calculate thumbnail size to fit the desired number of items per row - thumbnail_width, thumbnail_height = self._calculate_thumbnail_size(actual_width) - - # Calculate item height dynamically based on thumbnail size - item_height = thumbnail_height + self.PROGRESS_BAR_HEIGHT + self.TEXT_HEIGHT + self.THUMBNAIL_MARGIN - - item_width = (actual_width - (items_per_row + 1) * self.THUMBNAIL_MARGIN) // items_per_row - - # Draw videos in grid - for i, video_path in enumerate(self.video_files): - row = i // items_per_row - col = i % items_per_row - - # Skip if scrolled out of view - if row < self.scroll_offset: - continue - if row > self.scroll_offset + (actual_height // item_height): - break - - # Calculate position - x = self.THUMBNAIL_MARGIN + col * (item_width + self.THUMBNAIL_MARGIN) - y = self.THUMBNAIL_MARGIN + (row - self.scroll_offset) * item_height - - # Draw thumbnail background - cv2.rectangle(canvas, - (x, y), - (x + thumbnail_width, y + thumbnail_height), - self.THUMBNAIL_BG_COLOR, -1) - - # Draw selection highlight - if i == self.selected_index: - cv2.rectangle(canvas, - (x - 2, y - 2), - (x + thumbnail_width + 2, y + thumbnail_height + 2), - self.SELECTED_COLOR, 3) - - # Draw thumbnail - thumbnail = self.get_thumbnail_for_video(video_path, (thumbnail_width, thumbnail_height)) - # Thumbnail is already the correct size, no need to resize - resized_thumbnail = thumbnail - - # Ensure thumbnail doesn't exceed canvas bounds - end_y = min(y + thumbnail_height, actual_height) - end_x = min(x + thumbnail_width, actual_width) - thumb_height = end_y - y - thumb_width = end_x - x - - if thumb_height > 0 and thumb_width > 0: - # Resize thumbnail to fit within bounds if necessary - if thumb_height != thumbnail_height or thumb_width != thumbnail_width: - resized_thumbnail = cv2.resize(thumbnail, (thumb_width, thumb_height)) - - canvas[y:end_y, x:end_x] = resized_thumbnail - - # Draw progress bar - progress_y = y + thumbnail_height + 5 - progress_width = thumbnail_width - progress = self.get_progress_for_video(video_path) - - # Progress background - cv2.rectangle(canvas, - (x, progress_y), - (x + progress_width, progress_y + self.PROGRESS_BAR_HEIGHT), - self.PROGRESS_BG_COLOR, -1) - - # Progress fill - if progress > 0: - fill_width = int(progress_width * progress) - cv2.rectangle(canvas, - (x, progress_y), - (x + fill_width, progress_y + self.PROGRESS_BAR_HEIGHT), - self.PROGRESS_FILL_COLOR, -1) - - # Draw filename - filename = video_path.name - # Truncate if too long - if len(filename) > 25: - filename = filename[:22] + "..." - - text_y = progress_y + self.PROGRESS_BAR_HEIGHT + 20 - cv2.putText(canvas, filename, (x, text_y), - cv2.FONT_HERSHEY_SIMPLEX, 0.6, self.TEXT_COLOR, 2) - - # Draw progress percentage - if video_path in self.progress_data: - progress_text = f"{progress * 100:.0f}%" - text_size = cv2.getTextSize(progress_text, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)[0] - progress_text_x = x + progress_width - text_size[0] - cv2.putText(canvas, progress_text, (progress_text_x, text_y), - cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.TEXT_COLOR, 1) - - # Draw instructions - instructions = [ - "Project View - Videos in current directory", - "WASD: Navigate | E: Open video | Q: Fewer items per row | Y: More items per row | q: Quit | ESC: Back to editor", - f"Showing {len(self.video_files)} videos | {items_per_row} per row | Thumbnail: {thumbnail_width}x{thumbnail_height}" - ] - - for i, instruction in enumerate(instructions): - y_pos = actual_height - 60 + i * 20 - cv2.putText(canvas, instruction, (10, y_pos), - cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.TEXT_COLOR, 1) - - return canvas - - def handle_key(self, key: int) -> str: - """Handle keyboard input, returns action taken""" - if key == 27: # ESC - return "back_to_editor" - elif key == ord('q'): # lowercase q - Quit - return "quit" - elif key == ord('e') or key == ord('E'): # E - Open video - if self.video_files and 0 <= self.selected_index < len(self.video_files): - return f"open_video:{self.video_files[self.selected_index]}" - elif key == ord('w') or key == ord('W'): # W - Up - current_items_per_row = min(self.items_per_row, len(self.video_files)) - if self.selected_index >= current_items_per_row: - self.selected_index -= current_items_per_row - else: - self.selected_index = 0 - self._update_scroll() - elif key == ord('s') or key == ord('S'): # S - Down - current_items_per_row = min(self.items_per_row, len(self.video_files)) - if self.selected_index + current_items_per_row < len(self.video_files): - self.selected_index += current_items_per_row - else: - self.selected_index = len(self.video_files) - 1 - self._update_scroll() - elif key == ord('a') or key == ord('A'): # A - Left - if self.selected_index > 0: - self.selected_index -= 1 - self._update_scroll() - elif key == ord('d') or key == ord('D'): # D - Right - if self.selected_index < len(self.video_files) - 1: - self.selected_index += 1 - self._update_scroll() - elif key == ord('Q'): # uppercase Q - Fewer items per row (larger thumbnails) - if self.items_per_row > 1: - self.items_per_row -= 1 - print(f"Items per row: {self.items_per_row}") - elif key == ord('y') or key == ord('Y'): # Y - More items per row (smaller thumbnails) - self.items_per_row += 1 - print(f"Items per row: {self.items_per_row}") - - return "none" - - def _update_scroll(self): - """Update scroll offset based on selected item""" - if not self.video_files: - return - - # Use fixed items per row - items_per_row = min(self.items_per_row, len(self.video_files)) - - # Get window dimensions for calculations - try: - window_rect = cv2.getWindowImageRect("Project View") - if window_rect[2] > 0 and window_rect[3] > 0: - window_width = window_rect[2] - window_height = window_rect[3] - else: - window_width = self.window_width - window_height = self.window_height - except: - window_width = self.window_width - window_height = self.window_height - - # Calculate thumbnail size and item height dynamically - thumbnail_width, thumbnail_height = self._calculate_thumbnail_size(window_width) - item_height = thumbnail_height + self.PROGRESS_BAR_HEIGHT + self.TEXT_HEIGHT + self.THUMBNAIL_MARGIN - - selected_row = self.selected_index // items_per_row - visible_rows = max(1, window_height // item_height) - - # Calculate how many rows we can actually show - total_rows = (len(self.video_files) + items_per_row - 1) // items_per_row - - # If we can show all rows, no scrolling needed - if total_rows <= visible_rows: - self.scroll_offset = 0 - return - - # Update scroll to keep selected item visible - if selected_row < self.scroll_offset: - self.scroll_offset = selected_row - elif selected_row >= self.scroll_offset + visible_rows: - self.scroll_offset = selected_row - visible_rows + 1 - - # Ensure scroll offset doesn't go negative or beyond available content - self.scroll_offset = max(0, min(self.scroll_offset, total_rows - visible_rows)) - -# Use extracted implementations while preserving original definitions above -from .capture import Cv2BufferedCap as _ExtCv2BufferedCap -Cv2BufferedCap = _ExtCv2BufferedCap - -from .tracking import MotionTracker as _ExtMotionTracker -MotionTracker = _ExtMotionTracker - -from .utils import get_active_window_title as _ext_get_active_window_title -get_active_window_title = _ext_get_active_window_title - - +from .capture import Cv2BufferedCap +from .tracking import MotionTracker +from .utils import get_active_window_title +from .project_view import ProjectView class VideoEditor: # Configuration constants