import os import sys import glob import cv2 import numpy as np import argparse import shutil import time import threading from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import List class Cv2BufferedCap: """Buffered wrapper around cv2.VideoCapture that handles frame loading, seeking, and caching correctly""" def __init__(self, video_path, backend=None): self.video_path = video_path self.cap = cv2.VideoCapture(str(video_path), backend) if not self.cap.isOpened(): raise ValueError(f"Could not open video: {video_path}") # Video properties self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.fps = self.cap.get(cv2.CAP_PROP_FPS) self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Current position tracking self.current_frame = 0 def get_frame(self, frame_number): """Get frame at specific index - always accurate""" # Clamp frame number to valid range frame_number = max(0, min(frame_number, self.total_frames - 1)) # Optimize for sequential reading (next frame) if frame_number == self.current_frame + 1: ret, frame = self.cap.read() else: # Seek for non-sequential access self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) ret, frame = self.cap.read() if ret: self.current_frame = frame_number return frame else: raise ValueError(f"Failed to read frame {frame_number}") def advance_frame(self, frames=1): """Advance by specified number of frames""" new_frame = self.current_frame + frames return self.get_frame(new_frame) def release(self): """Release the video capture""" if self.cap: self.cap.release() def isOpened(self): """Check if capture is opened""" return self.cap and self.cap.isOpened() class MediaGrader: # Configuration constants - matching croppa implementation TARGET_FPS = 80 # Target FPS for speed calculations SPEED_INCREMENT = 0.1 MIN_PLAYBACK_SPEED = 0.05 MAX_PLAYBACK_SPEED = 1.0 # Legacy constants for compatibility KEY_REPEAT_RATE_SEC = 0.5 FAST_SEEK_ACTIVATION_TIME = 2.0 FAST_SEEK_MULTIPLIER = 60 MONITOR_WIDTH = 2560 MONITOR_HEIGHT = 1440 TIMELINE_HEIGHT = 60 TIMELINE_MARGIN = 20 TIMELINE_BAR_HEIGHT = 12 TIMELINE_HANDLE_SIZE = 12 TIMELINE_COLOR_BG = (80, 80, 80) TIMELINE_COLOR_PROGRESS = (0, 120, 255) TIMELINE_COLOR_HANDLE = (255, 255, 255) TIMELINE_COLOR_BORDER = (200, 200, 200) SHIFT_SEEK_MULTIPLIER = 5 CTRL_SEEK_MULTIPLIER = 10 SEGMENT_COUNT = 16 def __init__( self, directory: str, seek_frames: int = 30, snap_to_iframe: bool = False ): self.directory = Path(directory) self.seek_frames = seek_frames self.current_index = 0 self.playback_speed = 1.0 self.media_files = [] self.current_cap = None self.is_playing = True self.current_frame = 0 self.total_frames = 0 self.multi_segment_mode = False self.segment_count = self.SEGMENT_COUNT self.segment_caps = [] self.segment_frames = [] self.segment_positions = [] self.segment_end_positions = [] # Track where each segment should loop back to self.timeline_visible = True self.last_seek_time = 0 self.current_seek_key = None self.key_first_press_time = 0 self.is_seeking = False self.fine_seek_frames = 1 self.coarse_seek_frames = self.seek_frames self.fast_seek_frames = self.seek_frames * self.FAST_SEEK_MULTIPLIER self.current_display_frame = None self.extensions = [ ".png", ".jpg", ".jpeg", ".gif", ".mp4", ".avi", ".mov", ".mkv", ] self.mouse_dragging = False self.timeline_rect = None self.undo_history = [] self.watched_regions = {} self.current_watch_start = None self.last_frame_position = 0 self.last_jump_position = {} self.jump_history = {} self.thread_pool = ThreadPoolExecutor(max_workers=4) def display_with_aspect_ratio(self, frame): """Display frame while maintaining aspect ratio and maximizing screen usage""" if frame is None: return # Get frame dimensions frame_height, frame_width = frame.shape[:2] # Calculate available height (subtract timeline height for videos) timeline_height = self.TIMELINE_HEIGHT if self.is_video(self.media_files[self.current_index]) else 0 available_height = self.MONITOR_HEIGHT - timeline_height # Calculate scale to fit within monitor bounds while maintaining aspect ratio scale_x = self.MONITOR_WIDTH / frame_width scale_y = available_height / frame_height scale = min(scale_x, scale_y) # Calculate display dimensions display_width = int(frame_width * scale) display_height = int(frame_height * scale) # Resize the frame to maintain aspect ratio if scale != 1.0: resized_frame = cv2.resize(frame, (display_width, display_height), interpolation=cv2.INTER_AREA) else: resized_frame = frame # Create canvas with proper dimensions canvas_height = self.MONITOR_HEIGHT canvas_width = self.MONITOR_WIDTH canvas = np.zeros((canvas_height, canvas_width, 3), dtype=np.uint8) # Center the resized frame on canvas start_y = (available_height - display_height) // 2 start_x = (self.MONITOR_WIDTH - display_width) // 2 # Ensure frame fits within canvas bounds end_y = min(start_y + display_height, available_height) end_x = min(start_x + display_width, self.MONITOR_WIDTH) actual_height = end_y - start_y actual_width = end_x - start_x if actual_height > 0 and actual_width > 0: canvas[start_y:end_y, start_x:end_x] = resized_frame[:actual_height, :actual_width] # Resize window to full monitor size cv2.resizeWindow("Media Grader", self.MONITOR_WIDTH, self.MONITOR_HEIGHT) # Center the window on screen x_position = 0 y_position = 0 cv2.moveWindow("Media Grader", x_position, y_position) # Display the canvas with properly aspect-ratioed frame cv2.imshow("Media Grader", canvas) def find_media_files(self) -> List[Path]: """Find all media files recursively in the directory""" media_files = [] for ext in self.extensions: pattern = str(self.directory / "**" / f"*{ext}") files = glob.glob(pattern, recursive=True) media_files.extend([Path(f) for f in files]) # Filter out files already in grade directories filtered_files = [] for file in media_files: # Check if file is not in a grade directory (1-5) if not any(part in ["1", "2", "3", "4", "5"] for part in file.parts): print("Adding file: ", file) filtered_files.append(file) return sorted(filtered_files) def is_video(self, file_path: Path) -> bool: """Check if file is a video""" video_extensions = [".mp4", ".avi", ".mov", ".mkv", ".gif"] return file_path.suffix.lower() in video_extensions def calculate_frame_delay(self) -> int: """Calculate frame delay in milliseconds based on playback speed""" # Round to 2 decimals to handle floating point precision issues speed = round(self.playback_speed, 2) if speed >= 1.0: # Speed >= 1: maximum FPS (no delay) return 1 else: # Speed < 1: scale FPS based on speed # Formula: fps = TARGET_FPS * speed, so delay = 1000 / fps target_fps = self.TARGET_FPS * speed delay_ms = int(1000 / target_fps) return max(1, delay_ms) def load_media(self, file_path: Path) -> bool: """Load media file for display""" if self.current_cap: self.current_cap.release() if self.is_video(file_path): try: # Use Cv2BufferedCap for better frame handling self.current_cap = Cv2BufferedCap(file_path) self.total_frames = self.current_cap.total_frames self.current_frame = 0 print(f"Loaded: {file_path.name} | Frames: {self.total_frames} | FPS: {self.current_cap.fps:.2f}") except Exception as e: print(f"Warning: Could not open video file {file_path.name}: {e}") return False else: self.current_cap = None self.total_frames = 1 self.current_frame = 0 # Load initial frame self.load_current_frame() # Start watch tracking session for videos self.start_watch_session() return True def load_current_frame(self): """Load the current frame into display cache""" if self.is_video(self.media_files[self.current_index]): if not self.current_cap: return False try: # Use Cv2BufferedCap to get frame self.current_display_frame = self.current_cap.get_frame(self.current_frame) return True except Exception as e: print(f"Failed to load frame {self.current_frame}: {e}") return False else: frame = cv2.imread(str(self.media_files[self.current_index])) if frame is not None: self.current_display_frame = frame return True return False def start_watch_session(self): """Start tracking a new viewing session""" if self.is_video(self.media_files[self.current_index]): self.current_watch_start = self.current_frame self.last_frame_position = self.current_frame def update_watch_tracking(self): """Update watch tracking based on current frame position""" if not self.is_video(self.media_files[self.current_index]) or self.current_watch_start is None: return current_file = str(self.media_files[self.current_index]) # If we've moved more than a few frames from last position, record the watched region if abs(self.current_frame - self.last_frame_position) > 5 or \ abs(self.current_frame - self.current_watch_start) > 30: # Record the region we just watched start_frame = min(self.current_watch_start, self.last_frame_position) end_frame = max(self.current_watch_start, self.last_frame_position) if current_file not in self.watched_regions: self.watched_regions[current_file] = [] # Merge with existing regions if they overlap self.add_watched_region(current_file, start_frame, end_frame) # Start new session from current position self.current_watch_start = self.current_frame self.last_frame_position = self.current_frame def add_watched_region(self, file_path, start_frame, end_frame): """Add a watched region, merging with existing overlapping regions""" if file_path not in self.watched_regions: self.watched_regions[file_path] = [] regions = self.watched_regions[file_path] new_region = [start_frame, end_frame] # Merge overlapping regions merged = [] for region in regions: if new_region[1] < region[0] or new_region[0] > region[1]: # No overlap merged.append(region) else: # Overlap, merge new_region[0] = min(new_region[0], region[0]) new_region[1] = max(new_region[1], region[1]) merged.append(tuple(new_region)) self.watched_regions[file_path] = merged def get_sample_points(self): """Get standardized sample points for video navigation""" segments = 8 # Divide video into 8 segments for sampling segment_size = self.total_frames // segments if segment_size == 0: return [] return [ segment_size * 2, # 1/4 through segment_size * 4, # 1/2 through segment_size * 6, # 3/4 through segment_size * 1, # 1/8 through segment_size * 3, # 3/8 through segment_size * 5, # 5/8 through segment_size * 7, # 7/8 through 0 # Beginning ] def jump_to_unwatched_region(self): """Jump to the next unwatched region of the video""" if not self.is_video(self.media_files[self.current_index]): return False current_file = str(self.media_files[self.current_index]) # Get or initialize jump counter for this file if not hasattr(self, 'jump_counters'): self.jump_counters = {} if current_file not in self.jump_counters: self.jump_counters[current_file] = 0 # Get standardized sample points sample_points = self.get_sample_points() if not sample_points: print("Video too short for sampling") return False current_jump = self.jump_counters[current_file] if current_jump >= len(sample_points): print("All sample points visited! Video fully sampled.") return False target_frame = sample_points[current_jump] target_frame = min(target_frame, self.total_frames - 1) # Track last position for bisection self.last_jump_position[current_file] = self.current_frame # Track jump history for H key undo if current_file not in self.jump_history: self.jump_history[current_file] = [] self.jump_history[current_file].append(self.current_frame) # Jump to the target frame if not self.multi_segment_mode: self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) self.load_current_frame() # Increment jump counter self.jump_counters[current_file] += 1 # Calculate percentage through video percentage = (target_frame / self.total_frames) * 100 print(f"Sample {current_jump + 1}/{len(sample_points)}: jumped to frame {target_frame} ({percentage:.1f}% through video)") return True def bisect_backwards(self): """Bisect backwards between last position and current position""" if not self.is_video(self.media_files[self.current_index]): return False current_file = str(self.media_files[self.current_index]) if current_file not in self.last_jump_position: print("No previous position to bisect from. Use L first to establish a reference point.") return False last_pos = self.last_jump_position[current_file] current_pos = self.current_frame if last_pos == current_pos: print("Already at the same position as last jump.") return False # Calculate midpoint if last_pos < current_pos: midpoint = (last_pos + current_pos) // 2 else: midpoint = (current_pos + last_pos) // 2 # Update last position for further bisection self.last_jump_position[current_file] = current_pos # Jump to midpoint self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, midpoint) self.load_current_frame() percentage = (midpoint / self.total_frames) * 100 print(f"Bisected backwards to frame {midpoint} ({percentage:.1f}% through video)") return True def bisect_forwards(self): """Bisect forwards between current position and next sample point""" if not self.is_video(self.media_files[self.current_index]): return False current_file = str(self.media_files[self.current_index]) # Get next sample point if not hasattr(self, 'jump_counters') or current_file not in self.jump_counters: print("No sampling started yet. Use L first to establish sample points.") return False # Use same sampling strategy as L key sample_points = self.get_sample_points() current_jump = self.jump_counters[current_file] if current_jump >= len(sample_points): print("All sample points visited. No forward reference point.") return False next_sample = sample_points[current_jump] next_sample = min(next_sample, self.total_frames - 1) current_pos = self.current_frame # Calculate midpoint between current and next sample midpoint = (current_pos + next_sample) // 2 if midpoint == current_pos: print("Already at or very close to next sample point.") return False # Update last position for further bisection self.last_jump_position[current_file] = current_pos # Jump to midpoint self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, midpoint) self.load_current_frame() percentage = (midpoint / self.total_frames) * 100 print(f"Bisected forwards to frame {midpoint} ({percentage:.1f}% through video)") return True def toggle_multi_segment_mode(self): """Toggle between single and multi-segment video mode""" if not self.is_video(self.media_files[self.current_index]): print("Multi-segment mode only works with videos") return False self.multi_segment_mode = not self.multi_segment_mode if self.multi_segment_mode: print(f"Enabling multi-segment mode ({self.segment_count} segments)...") try: self.setup_segment_captures() print("Multi-segment mode enabled successfully") except Exception as e: print(f"Failed to setup multi-segment mode: {e}") import traceback traceback.print_exc() self.multi_segment_mode = False return False else: print("Disabling multi-segment mode...") self.cleanup_segment_captures() # Reload single video self.load_media(self.media_files[self.current_index]) print("Multi-segment mode disabled") return True def toggle_timeline(self): """Toggle timeline visibility""" self.timeline_visible = not self.timeline_visible print(f"Timeline {'visible' if self.timeline_visible else 'hidden'}") return True def setup_segment_captures(self): if not self.is_video(self.media_files[self.current_index]): return # Calculate actual memory usage based on frame dimensions frame_width = int(self.current_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(self.current_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_mb = frame_width * frame_height * 3 / (1024 * 1024) # Memory-based limits (not frame count) if total_mb > 8000: # 8GB limit print(f"Video too large for preloading!") print(f" Resolution: {frame_width}x{frame_height}") print(f" Frames: {self.total_frames} frames would use {total_mb:.1f}GB RAM") print(f"Multi-segment mode not available for videos requiring >8GB RAM") return elif total_mb > 500: # 500MB warning print(f"Large video detected:") print(f" Resolution: {frame_width}x{frame_height}") print(f" Memory: {self.total_frames} frames will use {total_mb:.0f}GB RAM") print("Press any key to continue or 'q' to cancel...") # Note: In a real implementation, you'd want proper input handling here start_time = time.time() print(f"Setting up {self.segment_count} segments with video preloading...") try: print("Cleaning up existing captures...") self.cleanup_segment_captures() current_file = self.media_files[self.current_index] print(f"Working with file: {current_file}") # Initialize arrays print("Initializing arrays...") self.segment_caps = [None] * self.segment_count # Keep for compatibility self.segment_frames = [None] * self.segment_count self.segment_positions = [] self.segment_end_positions = [] self.segment_current_frames = [0] * self.segment_count # Track current frame for each segment # Calculate target positions print("Calculating segment positions...") if self.total_frames <= 1: print("Error: Video has insufficient frames for multi-segment mode") return for i in range(self.segment_count): if self.segment_count <= 1: position_ratio = 0 end_ratio = 1.0 else: position_ratio = i / (self.segment_count - 1) end_ratio = (i + 1) / (self.segment_count - 1) if i < self.segment_count - 1 else 1.0 start_frame = int(position_ratio * (self.total_frames - 1)) end_frame = int(end_ratio * (self.total_frames - 1)) start_frame = max(0, min(start_frame, self.total_frames - 1)) end_frame = max(start_frame + 1, min(end_frame, self.total_frames - 1)) # Ensure at least 1 frame per segment self.segment_positions.append(start_frame) self.segment_end_positions.append(end_frame) self.segment_current_frames[i] = start_frame # Start each segment at its position print(f"Segment positions: {self.segment_positions}") print(f"Segment end positions: {self.segment_end_positions}") # Preload the entire video into memory - simple and fast print("Preloading entire video into memory...") preload_start = time.time() if self.current_cap and self.current_cap.isOpened(): self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # Simple, fast sequential read frames = [] frame_count = 0 while frame_count < self.total_frames: ret, frame = self.current_cap.read() if ret and frame is not None: frames.append(frame) frame_count += 1 else: break self.video_frame_cache = frames self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, self.current_frame) else: self.video_frame_cache = [] preload_time = (time.time() - preload_start) * 1000 print(f"Video preloading: {preload_time:.1f}ms ({len(self.video_frame_cache)} frames)") # Initialize segment frames from the preloaded cache print("Initializing segment frames...") for i in range(self.segment_count): if self.segment_current_frames[i] < len(self.video_frame_cache): self.segment_frames[i] = self.video_frame_cache[self.segment_current_frames[i]] except Exception as e: print(f"Error in setup: {e}") import traceback traceback.print_exc() return total_time = time.time() - start_time print(f"Total setup time: {total_time * 1000:.1f}ms") # Report success successful_segments = sum(1 for frame in self.segment_frames if frame is not None) print(f"Successfully preloaded video with {successful_segments}/{self.segment_count} active segments") def cleanup_segment_captures(self): """Clean up all segment video captures and preloaded cache""" for cap in self.segment_caps: if cap: cap.release() self.segment_caps = [] self.segment_frames = [] self.segment_positions = [] self.segment_end_positions = [] if hasattr(self, 'video_frame_cache'): self.video_frame_cache = [] if hasattr(self, 'segment_current_frames'): self.segment_current_frames = [] def update_segment_frames(self): """Update frames for segments - each segment loops within its own range""" if not self.multi_segment_mode or not self.segment_frames or not hasattr(self, 'video_frame_cache'): return for i in range(len(self.segment_frames)): if self.segment_frames[i] is not None and self.video_frame_cache: # Advance to next frame in this segment self.segment_current_frames[i] += 1 # Get the segment boundaries start_frame = self.segment_positions[i] end_frame = self.segment_end_positions[i] # Loop within the segment bounds if self.segment_current_frames[i] > end_frame: # Loop back to start of segment self.segment_current_frames[i] = start_frame # Ensure we don't go beyond the video cache if self.segment_current_frames[i] < len(self.video_frame_cache): # Direct reference - no copy needed for display self.segment_frames[i] = self.video_frame_cache[self.segment_current_frames[i]] def display_current_frame(self): """Display the current cached frame with overlays""" if self.multi_segment_mode: self.display_multi_segment_frame() else: self.display_single_frame() def display_single_frame(self): """Display single frame view""" if self.current_display_frame is None: return frame = self.current_display_frame.copy() # Add info overlay current_file = self.media_files[self.current_index] if self.is_video(self.media_files[self.current_index]): info_text = f"Speed: {self.playback_speed:.1f}x | Frame: {self.current_frame}/{self.total_frames} | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}" cv2.putText( frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2, ) cv2.putText( frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 1 ) # Draw timeline self.draw_timeline(frame) # Display with proper aspect ratio self.display_with_aspect_ratio(frame) def display_multi_segment_frame(self): """Display multi-segment frame view""" if not self.segment_frames or not any(frame is not None for frame in self.segment_frames): return # Calculate grid dimensions (2x2 for 4 segments) grid_rows = int(self.segment_count ** 0.5) grid_cols = int(self.segment_count / grid_rows) # Get reference frame size ref_frame = next((f for f in self.segment_frames if f is not None), None) if ref_frame is None: return frame_height, frame_width = ref_frame.shape[:2] # Calculate segment display size segment_width = frame_width // grid_cols segment_height = frame_height // grid_rows # Create combined display frame combined_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8) # Place each segment in the grid for i, segment_frame in enumerate(self.segment_frames): if segment_frame is None: continue row = i // grid_cols col = i % grid_cols # Resize segment frame to fit grid cell while maintaining aspect ratio frame_height, frame_width = segment_frame.shape[:2] seg_scale_x = segment_width / frame_width seg_scale_y = segment_height / frame_height seg_scale = min(seg_scale_x, seg_scale_y) new_seg_width = int(frame_width * seg_scale) new_seg_height = int(frame_height * seg_scale) resized_segment = cv2.resize(segment_frame, (new_seg_width, new_seg_height), interpolation=cv2.INTER_AREA) # Center the resized segment in the grid cell y_offset = (segment_height - new_seg_height) // 2 x_offset = (segment_width - new_seg_width) // 2 # Calculate position in combined frame y_start = row * segment_height y_end = y_start + segment_height x_start = col * segment_width x_end = x_start + segment_width # Place segment in combined frame (centered) y_place_start = y_start + y_offset y_place_end = y_place_start + new_seg_height x_place_start = x_start + x_offset x_place_end = x_place_start + new_seg_width # Ensure we don't go out of bounds y_place_end = min(y_place_end, y_end) x_place_end = min(x_place_end, x_end) combined_frame[y_place_start:y_place_end, x_place_start:x_place_end] = resized_segment # Add segment label segment_position = int((self.segment_positions[i] / self.total_frames) * 100) label_text = f"Seg {i+1}: {segment_position}%" cv2.putText( combined_frame, label_text, (x_place_start + 5, y_place_start + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2, ) cv2.putText( combined_frame, label_text, (x_place_start + 5, y_place_start + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, ) # Draw grid borders cv2.rectangle(combined_frame, (x_start, y_start), (x_end-1, y_end-1), (128, 128, 128), 1) # Add overall info overlay current_file = self.media_files[self.current_index] info_text = f"MULTI-SEGMENT | Speed: {self.playback_speed:.1f}x | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}" cv2.putText( combined_frame, info_text, (10, frame_height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, ) cv2.putText( combined_frame, info_text, (10, frame_height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 1 ) # Draw multi-segment timeline self.draw_multi_segment_timeline(combined_frame) # Display with proper aspect ratio self.display_with_aspect_ratio(combined_frame) def draw_multi_segment_timeline(self, frame): """Draw timeline showing all segment positions""" if not self.is_video(self.media_files[self.current_index]) or not self.segment_caps or not self.timeline_visible: return height, width = frame.shape[:2] # Timeline area - smaller than normal timeline timeline_height = 30 timeline_y = height - timeline_height - 25 # Leave space for info text timeline_margin = 20 timeline_bar_height = 8 # Draw timeline background cv2.rectangle(frame, (0, timeline_y), (width, timeline_y + timeline_height), (40, 40, 40), -1) # Calculate timeline bar position bar_y = timeline_y + (timeline_height - timeline_bar_height) // 2 bar_x_start = timeline_margin bar_x_end = width - timeline_margin bar_width = bar_x_end - bar_x_start # Draw timeline background bar cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + timeline_bar_height), (80, 80, 80), -1) cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + timeline_bar_height), (200, 200, 200), 1) # Draw segment markers if self.total_frames > 0: for i, segment_pos in enumerate(self.segment_positions): # Calculate position on timeline progress = segment_pos / max(1, self.total_frames - 1) marker_x = bar_x_start + int(bar_width * progress) # Draw segment marker color = (0, 255, 100) if i < len(self.segment_caps) and self.segment_caps[i] else (100, 100, 100) cv2.circle(frame, (marker_x, bar_y + timeline_bar_height // 2), 4, color, -1) cv2.circle(frame, (marker_x, bar_y + timeline_bar_height // 2), 4, (255, 255, 255), 1) # Add segment number cv2.putText(frame, str(i+1), (marker_x - 3, bar_y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 255), 1) def draw_timeline(self, frame): """Draw timeline at the bottom of the frame""" # Only draw timeline for video files in single mode and when visible if not self.is_video(self.media_files[self.current_index]) or self.multi_segment_mode or not self.timeline_visible: return height, width = frame.shape[:2] # Timeline background area timeline_y = height - self.TIMELINE_HEIGHT cv2.rectangle(frame, (0, timeline_y), (width, height), (40, 40, 40), -1) # Calculate timeline bar position bar_y = timeline_y + (self.TIMELINE_HEIGHT - self.TIMELINE_BAR_HEIGHT) // 2 bar_x_start = self.TIMELINE_MARGIN bar_x_end = width - self.TIMELINE_MARGIN bar_width = bar_x_end - bar_x_start self.timeline_rect = (bar_x_start, bar_y, bar_width, self.TIMELINE_BAR_HEIGHT) # Draw timeline background cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BG, -1) cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BORDER, 1) # Draw progress for videos if self.total_frames > 0: progress = self.current_frame / max(1, self.total_frames - 1) progress_width = int(bar_width * progress) if progress_width > 0: cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_start + progress_width, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_PROGRESS, -1) # Draw handle handle_x = bar_x_start + progress_width handle_y = bar_y + self.TIMELINE_BAR_HEIGHT // 2 cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_HANDLE, -1) cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_BORDER, 2) def mouse_callback(self, event, x, y, _, __): """Handle mouse events for timeline interaction""" if not self.timeline_rect or not self.is_video(self.media_files[self.current_index]) or self.multi_segment_mode: return bar_x_start, bar_y, bar_width, bar_height = self.timeline_rect bar_x_end = bar_x_start + bar_width # Check if mouse is over timeline if bar_y <= y <= bar_y + bar_height + 10: # Add some extra height for easier clicking if event == cv2.EVENT_LBUTTONDOWN: if bar_x_start <= x <= bar_x_end: self.mouse_dragging = True self.seek_to_position(x, bar_x_start, bar_width) elif event == cv2.EVENT_MOUSEMOVE and self.mouse_dragging: if bar_x_start <= x <= bar_x_end: self.seek_to_position(x, bar_x_start, bar_width) elif event == cv2.EVENT_LBUTTONUP: self.mouse_dragging = False def seek_to_position(self, mouse_x, bar_x_start, bar_width): """Seek to position based on mouse click/drag on timeline""" if not self.current_cap or not self.is_video(self.media_files[self.current_index]): return # Calculate position ratio relative_x = mouse_x - bar_x_start position_ratio = max(0, min(1, relative_x / bar_width)) # Calculate target frame target_frame = int(position_ratio * (self.total_frames - 1)) target_frame = max(0, min(target_frame, self.total_frames - 1)) # Seek to target frame self.current_frame = target_frame self.load_current_frame() def advance_frame(self): """Advance to next frame - handles playback speed and marker looping""" if not self.is_playing: return True if self.multi_segment_mode: self.update_segment_frames() return True else: # Always advance by 1 frame - speed is controlled by delay timing new_frame = self.current_frame + 1 # Handle looping bounds if new_frame >= self.total_frames: # Loop to beginning new_frame = 0 # Update current frame and load it self.current_frame = new_frame self.update_watch_tracking() return self.load_current_frame() def seek_video(self, frames_delta: int): """Seek video by specified number of frames""" if not self.is_video(self.media_files[self.current_index]): return if self.multi_segment_mode: return if not self.current_cap: return target_frame = max( 0, min(self.current_frame + frames_delta, self.total_frames - 1) ) self.current_frame = target_frame self.load_current_frame() def process_seek_key(self, key: int) -> bool: """Process seeking keys with proper rate limiting""" current_time = time.time() seek_direction = 0 seek_amount = 0 seek_multiplier = 1 # Default multiplier # Check for A/D keys with modifiers if key == ord("a") or key == ord("A"): seek_direction = -1 # SHIFT+A gives uppercase A if key == ord("A"): seek_multiplier = self.SHIFT_SEEK_MULTIPLIER elif key == ord("d") or key == ord("D"): seek_direction = 1 # SHIFT+D gives uppercase D if key == ord("D"): seek_multiplier = self.SHIFT_SEEK_MULTIPLIER elif key == 1: # CTRL+A seek_direction = -1 seek_multiplier = self.CTRL_SEEK_MULTIPLIER elif key == 4: # CTRL+D seek_direction = 1 seek_multiplier = self.CTRL_SEEK_MULTIPLIER elif key == ord(","): seek_amount = -self.fine_seek_frames elif key == ord("."): seek_amount = self.fine_seek_frames else: if self.current_seek_key is not None: self.current_seek_key = None self.is_seeking = False return False # Handle fine seeking (comma/period) - always immediate if seek_amount != 0: self.seek_video(seek_amount) return True # Handle A/D key seeking with rate limiting and modifiers if seek_direction != 0: if self.current_seek_key != key: self.current_seek_key = key self.key_first_press_time = current_time self.last_seek_time = current_time self.is_seeking = True seek_amount = seek_direction * self.coarse_seek_frames * seek_multiplier self.seek_video(seek_amount) return True elif self.is_seeking: time_since_last_seek = current_time - self.last_seek_time time_held = current_time - self.key_first_press_time if time_since_last_seek >= self.KEY_REPEAT_RATE_SEC: self.last_seek_time = current_time if time_held > self.FAST_SEEK_ACTIVATION_TIME: seek_amount = seek_direction * self.fast_seek_frames * seek_multiplier else: seek_amount = seek_direction * self.coarse_seek_frames * seek_multiplier self.seek_video(seek_amount) return True return False def grade_media(self, grade: int): """Move current media file to grade directory""" if not self.media_files or grade < 1 or grade > 5: return current_file = self.media_files[self.current_index] grade_dir = self.directory / str(grade) # Create grade directory if it doesn't exist grade_dir.mkdir(exist_ok=True) destination = grade_dir / current_file.name counter = 1 while destination.exists(): stem = current_file.stem suffix = current_file.suffix destination = grade_dir / f"{stem}_{counter}{suffix}" counter += 1 # Track this move for undo functionality BEFORE making changes self.undo_history.append((str(destination), str(current_file), self.current_index)) # Release video capture to unlock the file before moving if self.current_cap: self.current_cap.release() self.current_cap = None # Also release segment captures if in multi-segment mode if self.multi_segment_mode: self.cleanup_segment_captures() try: shutil.move(str(current_file), str(destination)) print(f"Moved {current_file.name} to grade {grade}") self.media_files.pop(self.current_index) if self.current_index >= len(self.media_files): self.current_index = 0 if not self.media_files: print("No more media files to grade!") return False except Exception as e: print(f"Error moving file: {e}") # Remove the undo entry since the move failed self.undo_history.pop() return True def undo_last_action(self): """Undo the last grading action by moving file back and restoring to media list""" if not self.undo_history: print("No actions to undo!") return False # Get the last action moved_file_path, original_file_path, original_index = self.undo_history.pop() # Release video capture to unlock any current file before moving if self.current_cap: self.current_cap.release() self.current_cap = None try: # Move the file back to its original location shutil.move(moved_file_path, original_file_path) # Add the file back to the media list at its original position original_file = Path(original_file_path) # Insert the file back at the appropriate position if original_index <= len(self.media_files): self.media_files.insert(original_index, original_file) else: self.media_files.append(original_file) # Navigate to the restored file print("Navigating to: ", original_index) self.current_index = original_index print(f"Undone: Moved {original_file.name} back from grade folder") return True except Exception as e: print(f"Error undoing action: {e}") # If undo failed, put the action back in history self.undo_history.append((moved_file_path, original_file_path, original_index)) return False def run(self): """Main grading loop""" self.media_files = self.find_media_files() if not self.media_files: print("No media files found in directory!") return print(f"Found {len(self.media_files)} media files") print("Controls:") print(" Space: Pause/Play") print(" A/D: Seek backward/forward (hold for FAST seek)") print(" Shift+A/D: Seek backward/forward (5x multiplier)") print(" Ctrl+A/D: Seek backward/forward (10x multiplier)") print(" , / . : Frame-by-frame seek (fine control)") print(" W/S: Decrease/Increase playback speed") print(" G: Toggle multi-segment mode (videos only)") print(" 1-5: Grade and move file") print(" N: Next file") print(" P: Previous file") print(" U: Undo last grading action") print(" L: Sample video at key points (videos only)") print(" H: Toggle timeline visibility") print(" J: Bisect backwards from current position (videos only, disabled in multi-segment)") print(" K: Bisect forwards toward next sample (videos only, disabled in multi-segment)") print(" Q/ESC: Quit") cv2.namedWindow("Media Grader", cv2.WINDOW_NORMAL) cv2.setMouseCallback("Media Grader", self.mouse_callback) # Set initial window size to a reasonable default (will be resized on first frame) cv2.resizeWindow("Media Grader", 1280, 720) while self.media_files and self.current_index < len(self.media_files): current_file = self.media_files[self.current_index] if not self.load_media(current_file): print(f"Could not load {current_file}") self.current_index += 1 continue # Setup multi-segment mode if enabled and this is a video if self.multi_segment_mode and self.is_video(current_file): self.setup_segment_captures() window_title = f"Media Grader - {current_file.name} ({self.current_index + 1}/{len(self.media_files)})" cv2.setWindowTitle("Media Grader", window_title) while True: # Update display self.display_current_frame() # Calculate appropriate delay based on playback state if self.is_playing and self.is_video(current_file): # Use calculated frame delay for proper playback speed delay_ms = self.calculate_frame_delay() else: # Use minimal delay for immediate responsiveness when not playing delay_ms = 1 # Auto advance frame when playing (videos only) if self.is_playing and self.is_video(current_file): self.advance_frame() # Key capture with appropriate delay key = cv2.waitKey(delay_ms) & 0xFF if key == ord("q") or key == 27: return elif key == ord(" "): self.is_playing = not self.is_playing elif key == ord("s"): # Speed control only for videos if self.is_video(current_file): self.playback_speed = max( self.MIN_PLAYBACK_SPEED, self.playback_speed - self.SPEED_INCREMENT, ) elif key == ord("w"): # Speed control only for videos if self.is_video(current_file): self.playback_speed = min( self.MAX_PLAYBACK_SPEED, self.playback_speed + self.SPEED_INCREMENT, ) elif self.process_seek_key(key): continue elif key == ord("n"): break elif key == ord("p"): self.current_index = max(0, self.current_index - 1) print("Navigating to: ", self.current_index) break elif key == ord("u"): if self.undo_last_action(): # File was restored, reload it break elif key == ord("l"): # Jump to largest unwatched region (works in both modes) self.jump_to_unwatched_region() elif key == ord("j"): if not self.multi_segment_mode: self.bisect_backwards() else: print("Navigation keys (H/J/K/L) disabled in multi-segment mode") elif key == ord("k"): if not self.multi_segment_mode: self.bisect_forwards() else: print("Navigation keys (H/J/K/L) disabled in multi-segment mode") elif key == ord("h"): # Toggle timeline visibility self.toggle_timeline() elif key == ord("g"): self.toggle_multi_segment_mode() elif key in [ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]: grade = int(chr(key)) if not self.grade_media(grade): return break elif key == 255: if self.is_seeking and self.current_seek_key is not None: self.process_seek_key(self.current_seek_key) if key not in [ord("p"), ord("u"), ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]: print("Navigating to (pu12345): ", self.current_index) self.current_index += 1 if self.current_cap: self.current_cap.release() self.cleanup_segment_captures() # Cleanup thread pool self.thread_pool.shutdown(wait=True) cv2.destroyAllWindows() print("Grading session complete!") def main(): parser = argparse.ArgumentParser( description="Media Grader - Grade media files by moving them to numbered folders" ) parser.add_argument( "directory", nargs="?", default=".", help="Directory to scan for media files (default: current directory)", ) parser.add_argument( "--seek-frames", type=int, default=30, help="Number of frames to seek when using arrow keys (default: 30)", ) parser.add_argument( "--snap-to-iframe", action="store_true", help="Snap to I-frames when seeking backward for better performance", ) args = parser.parse_args() if not os.path.isdir(args.directory): print(f"Error: {args.directory} is not a valid directory") sys.exit(1) grader = MediaGrader(args.directory, args.seek_frames, args.snap_to_iframe) try: grader.run() except KeyboardInterrupt: print("\nGrading session interrupted") except Exception as e: print(f"Error: {e}") sys.exit(1) if __name__ == "__main__": main()