import os import sys import glob import cv2 import numpy as np import argparse import shutil import time import threading from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import List class MediaGrader: # Configuration constants BASE_FRAME_DELAY_MS = 16 # ~30 FPS KEY_REPEAT_RATE_SEC = 0.5 # How often to process key repeats FAST_SEEK_ACTIVATION_TIME = 2.0 # How long to hold before fast seek FRAME_RENDER_TIME_MS = 50 # Time to let frames render between seeks SPEED_INCREMENT = 0.2 MIN_PLAYBACK_SPEED = 0.1 MAX_PLAYBACK_SPEED = 100.0 FAST_SEEK_MULTIPLIER = 60 IMAGE_DISPLAY_DELAY_MS = 100 # Monitor dimensions for full-screen sizing MONITOR_WIDTH = 2560 MONITOR_HEIGHT = 1440 # Timeline configuration TIMELINE_HEIGHT = 60 TIMELINE_MARGIN = 20 TIMELINE_BAR_HEIGHT = 12 TIMELINE_HANDLE_SIZE = 12 TIMELINE_COLOR_BG = (80, 80, 80) TIMELINE_COLOR_PROGRESS = (0, 120, 255) TIMELINE_COLOR_HANDLE = (255, 255, 255) TIMELINE_COLOR_BORDER = (200, 200, 200) # Seek modifiers for A/D keys SHIFT_SEEK_MULTIPLIER = 5 # SHIFT + A/D multiplier CTRL_SEEK_MULTIPLIER = 10 # CTRL + A/D multiplier # Multi-segment mode configuration SEGMENT_COUNT = 16 # Number of video segments (2x2 grid) SEGMENT_OVERLAP_PERCENT = 10 # Percentage overlap between segments def __init__( self, directory: str, seek_frames: int = 30, snap_to_iframe: bool = False ): self.directory = Path(directory) self.seek_frames = seek_frames self.current_index = 0 self.playback_speed = 1.0 self.media_files = [] self.current_cap = None self.is_playing = True self.current_frame = 0 self.total_frames = 0 # Multi-segment mode state self.multi_segment_mode = False self.segment_count = self.SEGMENT_COUNT # Use the class constant self.segment_overlap_percent = self.SEGMENT_OVERLAP_PERCENT # Use the class constant self.segment_caps = [] # List of VideoCapture objects for each segment self.segment_frames = [] # List of current frames for each segment self.segment_positions = [] # List of frame positions for each segment # Timeline visibility state self.timeline_visible = True # Improved frame cache for performance self.frame_cache = {} # Dict[frame_number: frame_data] self.cache_size_limit = 200 # Increased cache size self.cache_lock = threading.Lock() # Thread safety for cache # Key repeat tracking with rate limiting self.last_seek_time = 0 self.current_seek_key = None self.key_first_press_time = 0 self.is_seeking = False # Seeking modes self.fine_seek_frames = 1 # Frame-by-frame self.coarse_seek_frames = self.seek_frames # User-configurable self.fast_seek_frames = self.seek_frames * self.FAST_SEEK_MULTIPLIER # Current frame cache for display self.current_display_frame = None # Supported media extensions self.extensions = [ ".png", ".jpg", ".jpeg", ".gif", ".mp4", ".avi", ".mov", ".mkv", ] # Mouse interaction for timeline self.mouse_dragging = False self.timeline_rect = None self.window_width = 800 self.window_height = 600 # Undo functionality self.undo_history = [] # List of (source_path, destination_path, original_index) tuples # Watch tracking for "good look" feature self.watched_regions = {} # Dict[file_path: List[Tuple[start_frame, end_frame]]] self.current_watch_start = None # Frame where current viewing session started self.last_frame_position = 0 # Track last known frame position # Bisection navigation tracking self.last_jump_position = {} # Dict[file_path: last_frame] for bisection reference # Jump history for H key (undo jump) self.jump_history = {} # Dict[file_path: List[frame_positions]] for jump undo # Performance optimization: Thread pool for parallel operations self.thread_pool = ThreadPoolExecutor(max_workers=4) def display_with_aspect_ratio(self, frame): """Display frame while maintaining aspect ratio and maximizing screen usage""" if frame is None: return # Get frame dimensions frame_height, frame_width = frame.shape[:2] # Calculate aspect ratio frame_aspect_ratio = frame_width / frame_height monitor_aspect_ratio = self.MONITOR_WIDTH / self.MONITOR_HEIGHT # Determine if frame is vertical or horizontal relative to monitor if frame_aspect_ratio < monitor_aspect_ratio: # Frame is more vertical than monitor - maximize height display_height = self.MONITOR_HEIGHT display_width = int(display_height * frame_aspect_ratio) else: # Frame is more horizontal than monitor - maximize width display_width = self.MONITOR_WIDTH display_height = int(display_width / frame_aspect_ratio) # Resize window to calculated dimensions cv2.resizeWindow("Media Grader", display_width, display_height) # Center the window on screen x_position = (self.MONITOR_WIDTH - display_width) // 2 y_position = (self.MONITOR_HEIGHT - display_height) // 2 cv2.moveWindow("Media Grader", x_position, y_position) # Display the frame cv2.imshow("Media Grader", frame) def find_media_files(self) -> List[Path]: """Find all media files recursively in the directory""" media_files = [] for ext in self.extensions: pattern = str(self.directory / "**" / f"*{ext}") files = glob.glob(pattern, recursive=True) media_files.extend([Path(f) for f in files]) # Filter out files already in grade directories filtered_files = [] for file in media_files: # Check if file is not in a grade directory (1-5) if not any(part in ["1", "2", "3", "4", "5"] for part in file.parts): print("Adding file: ", file) filtered_files.append(file) return sorted(filtered_files) def is_video(self, file_path: Path) -> bool: """Check if file is a video""" video_extensions = [".mp4", ".avi", ".mov", ".mkv", ".gif"] return file_path.suffix.lower() in video_extensions def calculate_frame_delay(self) -> int: """Calculate frame delay in milliseconds based on playback speed""" delay_ms = int(self.BASE_FRAME_DELAY_MS / self.playback_speed) return max(1, delay_ms) def calculate_frames_to_skip(self) -> int: """Calculate how many frames to skip for high-speed playback""" if self.playback_speed <= 1.0: return 0 elif self.playback_speed <= 2.0: return 0 elif self.playback_speed <= 5.0: return int(self.playback_speed - 1) else: return int(self.playback_speed * 2) def load_media(self, file_path: Path) -> bool: """Load media file for display""" if self.current_cap: self.current_cap.release() if self.is_video(file_path): # Suppress OpenCV error messages for unsupported codecs self.current_cap = cv2.VideoCapture(str(file_path)) if not self.current_cap.isOpened(): print(f"Warning: Could not open video file {file_path.name} (unsupported codec)") return False self.total_frames = int(self.current_cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.current_frame = 0 else: self.current_cap = None self.total_frames = 1 self.current_frame = 0 # Load initial frame self.load_current_frame() # Start watch tracking session for videos self.start_watch_session() return True def load_current_frame(self): """Load the current frame into display cache""" if self.is_video(self.media_files[self.current_index]): if not self.current_cap: return False ret, frame = self.current_cap.read() if ret: self.current_display_frame = frame self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES)) return True return False else: frame = cv2.imread(str(self.media_files[self.current_index])) if frame is not None: self.current_display_frame = frame return True return False def start_watch_session(self): """Start tracking a new viewing session""" if self.is_video(self.media_files[self.current_index]): self.current_watch_start = self.current_frame self.last_frame_position = self.current_frame def update_watch_tracking(self): """Update watch tracking based on current frame position""" if not self.is_video(self.media_files[self.current_index]) or self.current_watch_start is None: return current_file = str(self.media_files[self.current_index]) # If we've moved more than a few frames from last position, record the watched region if abs(self.current_frame - self.last_frame_position) > 5 or \ abs(self.current_frame - self.current_watch_start) > 30: # Record the region we just watched start_frame = min(self.current_watch_start, self.last_frame_position) end_frame = max(self.current_watch_start, self.last_frame_position) if current_file not in self.watched_regions: self.watched_regions[current_file] = [] # Merge with existing regions if they overlap self.add_watched_region(current_file, start_frame, end_frame) # Start new session from current position self.current_watch_start = self.current_frame self.last_frame_position = self.current_frame def add_watched_region(self, file_path, start_frame, end_frame): """Add a watched region, merging with existing overlapping regions""" if file_path not in self.watched_regions: self.watched_regions[file_path] = [] regions = self.watched_regions[file_path] new_region = [start_frame, end_frame] # Merge overlapping regions merged = [] for region in regions: if new_region[1] < region[0] or new_region[0] > region[1]: # No overlap merged.append(region) else: # Overlap, merge new_region[0] = min(new_region[0], region[0]) new_region[1] = max(new_region[1], region[1]) merged.append(tuple(new_region)) self.watched_regions[file_path] = merged def get_sample_points(self): """Get standardized sample points for video navigation""" segments = 8 # Divide video into 8 segments for sampling segment_size = self.total_frames // segments if segment_size == 0: return [] return [ segment_size * 2, # 1/4 through segment_size * 4, # 1/2 through segment_size * 6, # 3/4 through segment_size * 1, # 1/8 through segment_size * 3, # 3/8 through segment_size * 5, # 5/8 through segment_size * 7, # 7/8 through 0 # Beginning ] def jump_to_unwatched_region(self): """Jump to the next unwatched region of the video""" if not self.is_video(self.media_files[self.current_index]): return False current_file = str(self.media_files[self.current_index]) # Get or initialize jump counter for this file if not hasattr(self, 'jump_counters'): self.jump_counters = {} if current_file not in self.jump_counters: self.jump_counters[current_file] = 0 # Get standardized sample points sample_points = self.get_sample_points() if not sample_points: print("Video too short for sampling") return False current_jump = self.jump_counters[current_file] if current_jump >= len(sample_points): print("All sample points visited! Video fully sampled.") return False target_frame = sample_points[current_jump] target_frame = min(target_frame, self.total_frames - 1) # Track last position for bisection self.last_jump_position[current_file] = self.current_frame # Track jump history for H key undo if current_file not in self.jump_history: self.jump_history[current_file] = [] self.jump_history[current_file].append(self.current_frame) # Jump to the target frame if self.multi_segment_mode: # In multi-segment mode, reposition all segments relative to the jump target self.reposition_segments_around_frame(target_frame) else: # In single mode, just jump the main capture self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) self.load_current_frame() # Increment jump counter self.jump_counters[current_file] += 1 # Calculate percentage through video percentage = (target_frame / self.total_frames) * 100 print(f"Sample {current_jump + 1}/{len(sample_points)}: jumped to frame {target_frame} ({percentage:.1f}% through video)") return True def bisect_backwards(self): """Bisect backwards between last position and current position""" if not self.is_video(self.media_files[self.current_index]): return False current_file = str(self.media_files[self.current_index]) if current_file not in self.last_jump_position: print("No previous position to bisect from. Use L first to establish a reference point.") return False last_pos = self.last_jump_position[current_file] current_pos = self.current_frame if last_pos == current_pos: print("Already at the same position as last jump.") return False # Calculate midpoint if last_pos < current_pos: midpoint = (last_pos + current_pos) // 2 else: midpoint = (current_pos + last_pos) // 2 # Update last position for further bisection self.last_jump_position[current_file] = current_pos # Jump to midpoint self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, midpoint) self.load_current_frame() percentage = (midpoint / self.total_frames) * 100 print(f"Bisected backwards to frame {midpoint} ({percentage:.1f}% through video)") return True def bisect_forwards(self): """Bisect forwards between current position and next sample point""" if not self.is_video(self.media_files[self.current_index]): return False current_file = str(self.media_files[self.current_index]) # Get next sample point if not hasattr(self, 'jump_counters') or current_file not in self.jump_counters: print("No sampling started yet. Use L first to establish sample points.") return False # Use same sampling strategy as L key sample_points = self.get_sample_points() current_jump = self.jump_counters[current_file] if current_jump >= len(sample_points): print("All sample points visited. No forward reference point.") return False next_sample = sample_points[current_jump] next_sample = min(next_sample, self.total_frames - 1) current_pos = self.current_frame # Calculate midpoint between current and next sample midpoint = (current_pos + next_sample) // 2 if midpoint == current_pos: print("Already at or very close to next sample point.") return False # Update last position for further bisection self.last_jump_position[current_file] = current_pos # Jump to midpoint self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, midpoint) self.load_current_frame() percentage = (midpoint / self.total_frames) * 100 print(f"Bisected forwards to frame {midpoint} ({percentage:.1f}% through video)") return True def toggle_multi_segment_mode(self): """Toggle between single and multi-segment video mode""" if not self.is_video(self.media_files[self.current_index]): print("Multi-segment mode only works with videos") return False self.multi_segment_mode = not self.multi_segment_mode if self.multi_segment_mode: print(f"Enabled multi-segment mode ({self.segment_count} segments)") self.setup_segment_captures() else: print("Disabled multi-segment mode") self.cleanup_segment_captures() # Reload single video self.load_media(self.media_files[self.current_index]) return True def toggle_timeline(self): """Toggle timeline visibility""" self.timeline_visible = not self.timeline_visible print(f"Timeline {'visible' if self.timeline_visible else 'hidden'}") return True def setup_segment_captures(self): if not self.is_video(self.media_files[self.current_index]): return start_time = time.time() print(f"Setting up {self.segment_count} segments...") # Clean up existing segment captures self.cleanup_segment_captures() current_file = self.media_files[self.current_index] # Initialize arrays self.segment_caps = [None] * self.segment_count self.segment_frames = [None] * self.segment_count self.segment_positions = [] # Calculate target positions for i in range(self.segment_count): position_ratio = i / max(1, self.segment_count - 1) start_frame = int(position_ratio * (self.total_frames - 1)) self.segment_positions.append(start_frame) load_start = time.time() shared_cap_start = time.time() shared_cap = cv2.VideoCapture(str(current_file)) shared_cap_create_time = (time.time() - shared_cap_start) * 1000 print(f"Capture creation: {shared_cap_create_time:.1f}ms") if shared_cap.isOpened(): frames_start = time.time() # Strategy: Read a much smaller subset and interpolate/approximate # Only read 4-6 key frames and generate the rest through approximation key_frames_to_read = min(6, self.segment_count) frames_read = 0 for i in range(key_frames_to_read): target_frame = self.segment_positions[i * (self.segment_count // key_frames_to_read)] seek_start = time.time() shared_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) seek_time = (time.time() - seek_start) * 1000 read_start = time.time() ret, frame = shared_cap.read() read_time = (time.time() - read_start) * 1000 if ret: # Use this frame for multiple segments (approximation) segments_per_key = self.segment_count // key_frames_to_read start_seg = i * segments_per_key end_seg = min(start_seg + segments_per_key, self.segment_count) for seg_idx in range(start_seg, end_seg): self.segment_frames[seg_idx] = frame.copy() frames_read += 1 print(f"Key frame {i}: Frame {target_frame} -> Segments {start_seg}-{end_seg-1} ({seek_time:.1f}ms + {read_time:.1f}ms)") else: print(f"Failed to read key frame {i} at position {target_frame}") # Fill any remaining segments with the last valid frame last_valid_frame = None for frame in self.segment_frames: if frame is not None: last_valid_frame = frame break if last_valid_frame is not None: for i in range(len(self.segment_frames)): if self.segment_frames[i] is None: self.segment_frames[i] = last_valid_frame.copy() frames_time = (time.time() - frames_start) * 1000 print(f"Smart frame reading: {frames_time:.1f}ms ({frames_read} key frames for {self.segment_count} segments)") shared_cap.release() else: print("Failed to create shared capture!") total_time = time.time() - start_time print(f"Total setup time: {total_time * 1000:.1f}ms") # Report success successful_segments = sum(1 for frame in self.segment_frames if frame is not None) print(f"Successfully approximated {successful_segments}/{self.segment_count} segments") def cleanup_segment_captures(self): """Clean up all segment video captures""" for cap in self.segment_caps: if cap: cap.release() self.segment_caps = [] self.segment_frames = [] self.segment_positions = [] # Clear frame cache self.frame_cache.clear() def get_cached_frame(self, frame_number: int): """Get frame from cache or load it if not cached""" # Check cache first (thread-safe) with self.cache_lock: if frame_number in self.frame_cache: return self.frame_cache[frame_number].copy() # Return a copy to avoid modification # Load frame outside of lock to avoid blocking other threads frame = None if self.current_cap: # Create a temporary capture to avoid interfering with main playback current_file = self.media_files[self.current_index] temp_cap = cv2.VideoCapture(str(current_file)) if temp_cap.isOpened(): temp_cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) ret, frame = temp_cap.read() temp_cap.release() if ret and frame is not None: # Cache the frame (with size limit) - thread-safe with self.cache_lock: if len(self.frame_cache) >= self.cache_size_limit: # Remove oldest cached frames (remove multiple at once for efficiency) keys_to_remove = sorted(self.frame_cache.keys())[:len(self.frame_cache) // 4] for key in keys_to_remove: del self.frame_cache[key] self.frame_cache[frame_number] = frame.copy() return frame return None def get_segment_capture(self, segment_index): """Get or create a capture for a specific segment (lazy loading)""" if segment_index >= len(self.segment_caps) or self.segment_caps[segment_index] is None: if segment_index < len(self.segment_caps): # Create capture on demand current_file = self.media_files[self.current_index] cap = cv2.VideoCapture(str(current_file)) if cap.isOpened(): cap.set(cv2.CAP_PROP_POS_FRAMES, self.segment_positions[segment_index]) self.segment_caps[segment_index] = cap return cap else: return None return None return self.segment_caps[segment_index] def update_segment_frame_parallel(self, segment_index): """Update a single segment frame""" try: cap = self.get_segment_capture(segment_index) if cap and cap.isOpened(): ret, frame = cap.read() if ret: return segment_index, frame else: # Loop back to segment start when reaching end cap.set(cv2.CAP_PROP_POS_FRAMES, self.segment_positions[segment_index]) ret, frame = cap.read() if ret: return segment_index, frame else: return segment_index, None return segment_index, None except Exception as e: print(f"Error updating segment {segment_index}: {e}") return segment_index, None def update_segment_frames(self): """Update frames for all segments during playback with parallel processing""" if not self.multi_segment_mode or not self.segment_frames: return # Only update segments that have valid frames loaded active_segments = [i for i, frame in enumerate(self.segment_frames) if frame is not None] if not active_segments: return # Use thread pool for parallel frame updates (but limit to avoid overwhelming) if len(active_segments) <= 4: # For small numbers, use parallel processing futures = [] for i in active_segments: future = self.thread_pool.submit(self.update_segment_frame_parallel, i) futures.append(future) # Collect results for future in futures: segment_index, frame = future.result() if frame is not None: self.segment_frames[segment_index] = frame else: # For larger numbers, process in smaller batches to avoid resource exhaustion batch_size = 4 for batch_start in range(0, len(active_segments), batch_size): batch = active_segments[batch_start:batch_start + batch_size] futures = [] for i in batch: future = self.thread_pool.submit(self.update_segment_frame_parallel, i) futures.append(future) # Collect batch results for future in futures: segment_index, frame = future.result() if frame is not None: self.segment_frames[segment_index] = frame def reposition_segments_around_frame(self, center_frame: int): """Reposition all segments around a center frame while maintaining spacing""" if not self.multi_segment_mode or not self.segment_caps: return # Calculate new segment positions around the center frame # Keep the same relative spacing but center around the new frame segment_spacing = self.total_frames // (self.segment_count + 1) new_positions = [] for i in range(self.segment_count): # Spread segments around center_frame offset = (i - (self.segment_count - 1) / 2) * segment_spacing new_frame = int(center_frame + offset) new_frame = max(0, min(new_frame, self.total_frames - 1)) new_positions.append(new_frame) # Update segment positions and seek all captures self.segment_positions = new_positions for i, cap in enumerate(self.segment_caps): if cap and cap.isOpened(): cap.set(cv2.CAP_PROP_POS_FRAMES, self.segment_positions[i]) # Load new frame ret, frame = cap.read() if ret: self.segment_frames[i] = frame # Reset position for next read cap.set(cv2.CAP_PROP_POS_FRAMES, self.segment_positions[i]) def seek_segment_parallel(self, segment_index, frames_delta): """Seek a single segment by the specified number of frames""" try: if segment_index >= len(self.segment_positions): return segment_index, None cap = self.get_segment_capture(segment_index) if cap and cap.isOpened(): current_frame = int(cap.get(cv2.CAP_PROP_POS_FRAMES)) segment_start = self.segment_positions[segment_index] segment_duration = self.total_frames // self.segment_count segment_end = min(self.total_frames - 1, segment_start + segment_duration) target_frame = max(segment_start, min(current_frame + frames_delta, segment_end)) # Try cache first for better performance cached_frame = self.get_cached_frame(target_frame) if cached_frame is not None: cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) return segment_index, cached_frame else: # Fall back to normal seeking cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) ret, frame = cap.read() if ret: return segment_index, frame else: return segment_index, None return segment_index, None except Exception as e: print(f"Error seeking segment {segment_index}: {e}") return segment_index, None def seek_all_segments(self, frames_delta: int): """Seek all segments by the specified number of frames with parallel processing""" if not self.multi_segment_mode or not self.segment_frames: return # Only seek segments that have valid frames loaded active_segments = [i for i, frame in enumerate(self.segment_frames) if frame is not None] if not active_segments: return # Use parallel processing for seeking futures = [] for i in active_segments: future = self.thread_pool.submit(self.seek_segment_parallel, i, frames_delta) futures.append(future) # Collect results for future in futures: segment_index, frame = future.result() if frame is not None: self.segment_frames[segment_index] = frame def display_current_frame(self): """Display the current cached frame with overlays""" if self.multi_segment_mode: self.display_multi_segment_frame() else: self.display_single_frame() def display_single_frame(self): """Display single frame view""" if self.current_display_frame is None: return frame = self.current_display_frame.copy() # Add info overlay current_file = self.media_files[self.current_index] if self.is_video(self.media_files[self.current_index]): info_text = f"Speed: {self.playback_speed:.1f}x | Frame: {self.current_frame}/{self.total_frames} | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}" cv2.putText( frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2, ) cv2.putText( frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 1 ) # Draw timeline self.draw_timeline(frame) # Maintain aspect ratio when displaying self.display_with_aspect_ratio(frame) def display_multi_segment_frame(self): """Display multi-segment frame view""" if not self.segment_frames or not any(frame is not None for frame in self.segment_frames): return # Calculate grid dimensions (2x2 for 4 segments) grid_rows = int(self.segment_count ** 0.5) grid_cols = int(self.segment_count / grid_rows) # Get reference frame size ref_frame = next((f for f in self.segment_frames if f is not None), None) if ref_frame is None: return frame_height, frame_width = ref_frame.shape[:2] # Calculate segment display size segment_width = frame_width // grid_cols segment_height = frame_height // grid_rows # Create combined display frame combined_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8) # Place each segment in the grid for i, segment_frame in enumerate(self.segment_frames): if segment_frame is None: continue row = i // grid_cols col = i % grid_cols # Resize segment frame to fit grid cell while maintaining aspect ratio frame_height, frame_width = segment_frame.shape[:2] seg_scale_x = segment_width / frame_width seg_scale_y = segment_height / frame_height seg_scale = min(seg_scale_x, seg_scale_y) new_seg_width = int(frame_width * seg_scale) new_seg_height = int(frame_height * seg_scale) resized_segment = cv2.resize(segment_frame, (new_seg_width, new_seg_height), interpolation=cv2.INTER_AREA) # Center the resized segment in the grid cell y_offset = (segment_height - new_seg_height) // 2 x_offset = (segment_width - new_seg_width) // 2 # Calculate position in combined frame y_start = row * segment_height y_end = y_start + segment_height x_start = col * segment_width x_end = x_start + segment_width # Place segment in combined frame (centered) y_place_start = y_start + y_offset y_place_end = y_place_start + new_seg_height x_place_start = x_start + x_offset x_place_end = x_place_start + new_seg_width # Ensure we don't go out of bounds y_place_end = min(y_place_end, y_end) x_place_end = min(x_place_end, x_end) combined_frame[y_place_start:y_place_end, x_place_start:x_place_end] = resized_segment # Add segment label segment_position = int((self.segment_positions[i] / self.total_frames) * 100) label_text = f"Seg {i+1}: {segment_position}%" cv2.putText( combined_frame, label_text, (x_place_start + 5, y_place_start + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2, ) cv2.putText( combined_frame, label_text, (x_place_start + 5, y_place_start + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, ) # Draw grid borders cv2.rectangle(combined_frame, (x_start, y_start), (x_end-1, y_end-1), (128, 128, 128), 1) # Add overall info overlay current_file = self.media_files[self.current_index] info_text = f"MULTI-SEGMENT | Speed: {self.playback_speed:.1f}x | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}" cv2.putText( combined_frame, info_text, (10, frame_height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, ) cv2.putText( combined_frame, info_text, (10, frame_height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 1 ) # Draw multi-segment timeline self.draw_multi_segment_timeline(combined_frame) # Maintain aspect ratio when displaying self.display_with_aspect_ratio(combined_frame) def draw_multi_segment_timeline(self, frame): """Draw timeline showing all segment positions""" if not self.is_video(self.media_files[self.current_index]) or not self.segment_caps or not self.timeline_visible: return height, width = frame.shape[:2] # Timeline area - smaller than normal timeline timeline_height = 30 timeline_y = height - timeline_height - 25 # Leave space for info text timeline_margin = 20 timeline_bar_height = 8 # Draw timeline background cv2.rectangle(frame, (0, timeline_y), (width, timeline_y + timeline_height), (40, 40, 40), -1) # Calculate timeline bar position bar_y = timeline_y + (timeline_height - timeline_bar_height) // 2 bar_x_start = timeline_margin bar_x_end = width - timeline_margin bar_width = bar_x_end - bar_x_start # Draw timeline background bar cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + timeline_bar_height), (80, 80, 80), -1) cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + timeline_bar_height), (200, 200, 200), 1) # Draw segment markers if self.total_frames > 0: for i, segment_pos in enumerate(self.segment_positions): # Calculate position on timeline progress = segment_pos / max(1, self.total_frames - 1) marker_x = bar_x_start + int(bar_width * progress) # Draw segment marker color = (0, 255, 100) if i < len(self.segment_caps) and self.segment_caps[i] else (100, 100, 100) cv2.circle(frame, (marker_x, bar_y + timeline_bar_height // 2), 4, color, -1) cv2.circle(frame, (marker_x, bar_y + timeline_bar_height // 2), 4, (255, 255, 255), 1) # Add segment number cv2.putText(frame, str(i+1), (marker_x - 3, bar_y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 255), 1) def draw_timeline(self, frame): """Draw timeline at the bottom of the frame""" # Only draw timeline for video files in single mode and when visible if not self.is_video(self.media_files[self.current_index]) or self.multi_segment_mode or not self.timeline_visible: return height, width = frame.shape[:2] self.window_height = height self.window_width = width # Timeline background area timeline_y = height - self.TIMELINE_HEIGHT cv2.rectangle(frame, (0, timeline_y), (width, height), (40, 40, 40), -1) # Calculate timeline bar position bar_y = timeline_y + (self.TIMELINE_HEIGHT - self.TIMELINE_BAR_HEIGHT) // 2 bar_x_start = self.TIMELINE_MARGIN bar_x_end = width - self.TIMELINE_MARGIN bar_width = bar_x_end - bar_x_start self.timeline_rect = (bar_x_start, bar_y, bar_width, self.TIMELINE_BAR_HEIGHT) # Draw timeline background cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BG, -1) cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BORDER, 1) # Draw progress for videos if self.total_frames > 0: progress = self.current_frame / max(1, self.total_frames - 1) progress_width = int(bar_width * progress) if progress_width > 0: cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_start + progress_width, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_PROGRESS, -1) # Draw handle handle_x = bar_x_start + progress_width handle_y = bar_y + self.TIMELINE_BAR_HEIGHT // 2 cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_HANDLE, -1) cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_BORDER, 2) def mouse_callback(self, event, x, y, flags, param): """Handle mouse events for timeline interaction""" if not self.timeline_rect or not self.is_video(self.media_files[self.current_index]) or self.multi_segment_mode: return bar_x_start, bar_y, bar_width, bar_height = self.timeline_rect bar_x_end = bar_x_start + bar_width # Check if mouse is over timeline if bar_y <= y <= bar_y + bar_height + 10: # Add some extra height for easier clicking if event == cv2.EVENT_LBUTTONDOWN: if bar_x_start <= x <= bar_x_end: self.mouse_dragging = True self.seek_to_position(x, bar_x_start, bar_width) elif event == cv2.EVENT_MOUSEMOVE and self.mouse_dragging: if bar_x_start <= x <= bar_x_end: self.seek_to_position(x, bar_x_start, bar_width) elif event == cv2.EVENT_LBUTTONUP: self.mouse_dragging = False def seek_to_position(self, mouse_x, bar_x_start, bar_width): """Seek to position based on mouse click/drag on timeline""" if not self.current_cap or not self.is_video(self.media_files[self.current_index]): return # Calculate position ratio relative_x = mouse_x - bar_x_start position_ratio = max(0, min(1, relative_x / bar_width)) # Calculate target frame target_frame = int(position_ratio * (self.total_frames - 1)) target_frame = max(0, min(target_frame, self.total_frames - 1)) # Seek to target frame self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) self.load_current_frame() def advance_frame(self): """Advance to next frame(s) based on playback speed""" if ( not self.is_video(self.media_files[self.current_index]) or not self.is_playing ): return if self.multi_segment_mode: # Update all segment frames self.update_segment_frames() return True else: frames_to_skip = self.calculate_frames_to_skip() for _ in range(frames_to_skip + 1): ret, frame = self.current_cap.read() if not ret: return False self.current_display_frame = frame self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES)) # Update watch tracking self.update_watch_tracking() return True def seek_video(self, frames_delta: int): """Seek video by specified number of frames""" if not self.is_video(self.media_files[self.current_index]): return if self.multi_segment_mode: self.seek_all_segments(frames_delta) else: if not self.current_cap: return target_frame = max( 0, min(self.current_frame + frames_delta, self.total_frames - 1) ) self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) self.load_current_frame() def process_seek_key(self, key: int) -> bool: """Process seeking keys with proper rate limiting""" current_time = time.time() seek_direction = 0 seek_amount = 0 seek_multiplier = 1 # Default multiplier # Check for A/D keys with modifiers if key == ord("a") or key == ord("A"): seek_direction = -1 # SHIFT+A gives uppercase A if key == ord("A"): seek_multiplier = self.SHIFT_SEEK_MULTIPLIER elif key == ord("d") or key == ord("D"): seek_direction = 1 # SHIFT+D gives uppercase D if key == ord("D"): seek_multiplier = self.SHIFT_SEEK_MULTIPLIER elif key == 1: # CTRL+A seek_direction = -1 seek_multiplier = self.CTRL_SEEK_MULTIPLIER elif key == 4: # CTRL+D seek_direction = 1 seek_multiplier = self.CTRL_SEEK_MULTIPLIER elif key == ord(","): seek_amount = -self.fine_seek_frames elif key == ord("."): seek_amount = self.fine_seek_frames else: if self.current_seek_key is not None: self.current_seek_key = None self.is_seeking = False return False # Handle fine seeking (comma/period) - always immediate if seek_amount != 0: self.seek_video(seek_amount) return True # Handle A/D key seeking with rate limiting and modifiers if seek_direction != 0: if self.current_seek_key != key: self.current_seek_key = key self.key_first_press_time = current_time self.last_seek_time = current_time self.is_seeking = True seek_amount = seek_direction * self.coarse_seek_frames * seek_multiplier self.seek_video(seek_amount) return True elif self.is_seeking: time_since_last_seek = current_time - self.last_seek_time time_held = current_time - self.key_first_press_time if time_since_last_seek >= self.KEY_REPEAT_RATE_SEC: self.last_seek_time = current_time if time_held > self.FAST_SEEK_ACTIVATION_TIME: seek_amount = seek_direction * self.fast_seek_frames * seek_multiplier else: seek_amount = seek_direction * self.coarse_seek_frames * seek_multiplier self.seek_video(seek_amount) return True return False def grade_media(self, grade: int): """Move current media file to grade directory""" if not self.media_files or grade < 1 or grade > 5: return current_file = self.media_files[self.current_index] grade_dir = self.directory / str(grade) # Create grade directory if it doesn't exist grade_dir.mkdir(exist_ok=True) destination = grade_dir / current_file.name counter = 1 while destination.exists(): stem = current_file.stem suffix = current_file.suffix destination = grade_dir / f"{stem}_{counter}{suffix}" counter += 1 # Track this move for undo functionality BEFORE making changes self.undo_history.append((str(destination), str(current_file), self.current_index)) # Release video capture to unlock the file before moving if self.current_cap: self.current_cap.release() self.current_cap = None # Also release segment captures if in multi-segment mode if self.multi_segment_mode: self.cleanup_segment_captures() try: shutil.move(str(current_file), str(destination)) print(f"Moved {current_file.name} to grade {grade}") self.media_files.pop(self.current_index) if self.current_index >= len(self.media_files): self.current_index = 0 if not self.media_files: print("No more media files to grade!") return False except Exception as e: print(f"Error moving file: {e}") # Remove the undo entry since the move failed self.undo_history.pop() return True def undo_last_action(self): """Undo the last grading action by moving file back and restoring to media list""" if not self.undo_history: print("No actions to undo!") return False # Get the last action moved_file_path, original_file_path, original_index = self.undo_history.pop() # Release video capture to unlock any current file before moving if self.current_cap: self.current_cap.release() self.current_cap = None try: # Move the file back to its original location shutil.move(moved_file_path, original_file_path) # Add the file back to the media list at its original position original_file = Path(original_file_path) # Insert the file back at the appropriate position if original_index <= len(self.media_files): self.media_files.insert(original_index, original_file) else: self.media_files.append(original_file) # Navigate to the restored file print("Navigating to: ", original_index) self.current_index = original_index print(f"Undone: Moved {original_file.name} back from grade folder") return True except Exception as e: print(f"Error undoing action: {e}") # If undo failed, put the action back in history self.undo_history.append((moved_file_path, original_file_path, original_index)) return False def run(self): """Main grading loop""" self.media_files = self.find_media_files() if not self.media_files: print("No media files found in directory!") return print(f"Found {len(self.media_files)} media files") print("Controls:") print(" Space: Pause/Play") print(" A/D: Seek backward/forward (hold for FAST seek)") print(" Shift+A/D: Seek backward/forward (5x multiplier)") print(" Ctrl+A/D: Seek backward/forward (10x multiplier)") print(" , / . : Frame-by-frame seek (fine control)") print(" W/S: Decrease/Increase playback speed") print(" G: Toggle multi-segment mode (videos only)") print(" 1-5: Grade and move file") print(" N: Next file") print(" P: Previous file") print(" U: Undo last grading action") print(" L: Sample video at key points (videos only)") print(" H: Toggle timeline visibility") print(" J: Bisect backwards from current position (videos only, disabled in multi-segment)") print(" K: Bisect forwards toward next sample (videos only, disabled in multi-segment)") print(" Q/ESC: Quit") cv2.namedWindow("Media Grader", cv2.WINDOW_NORMAL) cv2.setMouseCallback("Media Grader", self.mouse_callback) # Set initial window size to a reasonable default (will be resized on first frame) cv2.resizeWindow("Media Grader", 1280, 720) while self.media_files and self.current_index < len(self.media_files): current_file = self.media_files[self.current_index] if not self.load_media(current_file): print(f"Could not load {current_file}") self.current_index += 1 continue # Setup multi-segment mode if enabled and this is a video if self.multi_segment_mode and self.is_video(current_file): self.setup_segment_captures() window_title = f"Media Grader - {current_file.name} ({self.current_index + 1}/{len(self.media_files)})" cv2.setWindowTitle("Media Grader", window_title) while True: self.display_current_frame() if self.is_video(current_file): if self.is_seeking: delay = self.FRAME_RENDER_TIME_MS else: delay = self.calculate_frame_delay() else: delay = self.IMAGE_DISPLAY_DELAY_MS key = cv2.waitKey(delay) & 0xFF if key == ord("q") or key == 27: return elif key == ord(" "): self.is_playing = not self.is_playing elif key == ord("s"): self.playback_speed = max( self.MIN_PLAYBACK_SPEED, self.playback_speed - self.SPEED_INCREMENT, ) elif key == ord("w"): self.playback_speed = min( self.MAX_PLAYBACK_SPEED, self.playback_speed + self.SPEED_INCREMENT, ) elif self.process_seek_key(key): pass elif key == ord("n"): break elif key == ord("p"): self.current_index = max(0, self.current_index - 1) print("Navigating to: ", self.current_index) break elif key == ord("u"): if self.undo_last_action(): # File was restored, reload it break elif key == ord("l"): # Jump to largest unwatched region (works in both modes) self.jump_to_unwatched_region() elif key == ord("j"): if not self.multi_segment_mode: self.bisect_backwards() else: print("Navigation keys (H/J/K/L) disabled in multi-segment mode") elif key == ord("k"): if not self.multi_segment_mode: self.bisect_forwards() else: print("Navigation keys (H/J/K/L) disabled in multi-segment mode") elif key == ord("h"): # Toggle timeline visibility self.toggle_timeline() elif key == ord("g"): self.toggle_multi_segment_mode() elif key in [ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]: grade = int(chr(key)) if not self.grade_media(grade): return break elif key == 255: if self.is_seeking and self.current_seek_key is not None: self.process_seek_key(self.current_seek_key) if ( self.is_playing and self.is_video(current_file) and not self.is_seeking ): if not self.advance_frame(): # Video reached the end, restart it instead of navigating self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, 0) self.current_frame = 0 self.load_current_frame() if key not in [ord("p"), ord("u"), ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]: print("Navigating to (pu12345): ", self.current_index) self.current_index += 1 if self.current_cap: self.current_cap.release() self.cleanup_segment_captures() # Cleanup thread pool self.thread_pool.shutdown(wait=True) cv2.destroyAllWindows() print("Grading session complete!") def main(): parser = argparse.ArgumentParser( description="Media Grader - Grade media files by moving them to numbered folders" ) parser.add_argument( "directory", nargs="?", default=".", help="Directory to scan for media files (default: current directory)", ) parser.add_argument( "--seek-frames", type=int, default=30, help="Number of frames to seek when using arrow keys (default: 30)", ) parser.add_argument( "--snap-to-iframe", action="store_true", help="Snap to I-frames when seeking backward for better performance", ) args = parser.parse_args() if not os.path.isdir(args.directory): print(f"Error: {args.directory} is not a valid directory") sys.exit(1) grader = MediaGrader(args.directory, args.seek_frames, args.snap_to_iframe) try: grader.run() except KeyboardInterrupt: print("\nGrading session interrupted") except Exception as e: print(f"Error: {e}") sys.exit(1) if __name__ == "__main__": main()