import os import sys import glob import cv2 import numpy as np import argparse import shutil import time import threading from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import List class MediaGrader: BASE_FRAME_DELAY_MS = 16 KEY_REPEAT_RATE_SEC = 0.5 FAST_SEEK_ACTIVATION_TIME = 2.0 FRAME_RENDER_TIME_MS = 50 SPEED_INCREMENT = 0.2 MIN_PLAYBACK_SPEED = 0.1 MAX_PLAYBACK_SPEED = 100.0 FAST_SEEK_MULTIPLIER = 60 IMAGE_DISPLAY_DELAY_MS = 100 MONITOR_WIDTH = 2560 MONITOR_HEIGHT = 1440 TIMELINE_HEIGHT = 60 TIMELINE_MARGIN = 20 TIMELINE_BAR_HEIGHT = 12 TIMELINE_HANDLE_SIZE = 12 TIMELINE_COLOR_BG = (80, 80, 80) TIMELINE_COLOR_PROGRESS = (0, 120, 255) TIMELINE_COLOR_HANDLE = (255, 255, 255) TIMELINE_COLOR_BORDER = (200, 200, 200) SHIFT_SEEK_MULTIPLIER = 5 CTRL_SEEK_MULTIPLIER = 10 SEGMENT_COUNT = 16 SEGMENT_OVERLAP_PERCENT = 10 def __init__( self, directory: str, seek_frames: int = 30, snap_to_iframe: bool = False ): self.directory = Path(directory) self.seek_frames = seek_frames self.current_index = 0 self.playback_speed = 1.0 self.media_files = [] self.current_cap = None self.is_playing = True self.current_frame = 0 self.total_frames = 0 self.multi_segment_mode = False self.segment_count = self.SEGMENT_COUNT self.segment_overlap_percent = self.SEGMENT_OVERLAP_PERCENT self.segment_caps = [] self.segment_frames = [] self.segment_positions = [] self.timeline_visible = True self.last_seek_time = 0 self.current_seek_key = None self.key_first_press_time = 0 self.is_seeking = False self.fine_seek_frames = 1 self.coarse_seek_frames = self.seek_frames self.fast_seek_frames = self.seek_frames * self.FAST_SEEK_MULTIPLIER self.current_display_frame = None self.extensions = [ ".png", ".jpg", ".jpeg", ".gif", ".mp4", ".avi", ".mov", ".mkv", ] self.mouse_dragging = False self.timeline_rect = None self.window_width = 800 self.window_height = 600 self.undo_history = [] self.watched_regions = {} self.current_watch_start = None self.last_frame_position = 0 self.last_jump_position = {} self.jump_history = {} self.thread_pool = ThreadPoolExecutor(max_workers=4) def display_with_aspect_ratio(self, frame): """Display frame while maintaining aspect ratio and maximizing screen usage""" if frame is None: return # Get frame dimensions frame_height, frame_width = frame.shape[:2] # Calculate aspect ratio frame_aspect_ratio = frame_width / frame_height monitor_aspect_ratio = self.MONITOR_WIDTH / self.MONITOR_HEIGHT # Determine if frame is vertical or horizontal relative to monitor if frame_aspect_ratio < monitor_aspect_ratio: # Frame is more vertical than monitor - maximize height display_height = self.MONITOR_HEIGHT display_width = int(display_height * frame_aspect_ratio) else: # Frame is more horizontal than monitor - maximize width display_width = self.MONITOR_WIDTH display_height = int(display_width / frame_aspect_ratio) # Resize window to calculated dimensions cv2.resizeWindow("Media Grader", display_width, display_height) # Center the window on screen x_position = (self.MONITOR_WIDTH - display_width) // 2 y_position = (self.MONITOR_HEIGHT - display_height) // 2 cv2.moveWindow("Media Grader", x_position, y_position) # Display the frame cv2.imshow("Media Grader", frame) def find_media_files(self) -> List[Path]: """Find all media files recursively in the directory""" media_files = [] for ext in self.extensions: pattern = str(self.directory / "**" / f"*{ext}") files = glob.glob(pattern, recursive=True) media_files.extend([Path(f) for f in files]) # Filter out files already in grade directories filtered_files = [] for file in media_files: # Check if file is not in a grade directory (1-5) if not any(part in ["1", "2", "3", "4", "5"] for part in file.parts): print("Adding file: ", file) filtered_files.append(file) return sorted(filtered_files) def is_video(self, file_path: Path) -> bool: """Check if file is a video""" video_extensions = [".mp4", ".avi", ".mov", ".mkv", ".gif"] return file_path.suffix.lower() in video_extensions def calculate_frame_delay(self) -> int: """Calculate frame delay in milliseconds based on playback speed""" delay_ms = int(self.BASE_FRAME_DELAY_MS / self.playback_speed) return max(1, delay_ms) def calculate_frames_to_skip(self) -> int: """Calculate how many frames to skip for high-speed playback""" if self.playback_speed <= 1.0: return 0 elif self.playback_speed <= 2.0: return 0 elif self.playback_speed <= 5.0: return int(self.playback_speed - 1) else: return int(self.playback_speed * 2) def load_media(self, file_path: Path) -> bool: """Load media file for display""" if self.current_cap: self.current_cap.release() if self.is_video(file_path): # Try different backends for better performance # For video files: FFmpeg is usually best, DirectShow is for cameras backends_to_try = [] if hasattr(cv2, 'CAP_FFMPEG'): # FFmpeg - best for video files backends_to_try.append(cv2.CAP_FFMPEG) if hasattr(cv2, 'CAP_DSHOW'): # DirectShow - usually for cameras, but try as fallback backends_to_try.append(cv2.CAP_DSHOW) backends_to_try.append(cv2.CAP_ANY) # Final fallback self.current_cap = None for backend in backends_to_try: try: self.current_cap = cv2.VideoCapture(str(file_path), backend) if self.current_cap.isOpened(): # Optimize buffer settings for better performance self.current_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Minimize buffer to reduce latency # Try to set hardware acceleration if available if hasattr(cv2, 'CAP_PROP_HW_ACCELERATION'): self.current_cap.set(cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY) break self.current_cap.release() except: continue if not self.current_cap or not self.current_cap.isOpened(): print(f"Warning: Could not open video file {file_path.name} (unsupported codec)") return False self.total_frames = int(self.current_cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.current_frame = 0 # Get codec information for debugging fourcc = int(self.current_cap.get(cv2.CAP_PROP_FOURCC)) codec = "".join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)]) backend = self.current_cap.getBackendName() print(f"Loaded: {file_path.name} | Codec: {codec} | Backend: {backend} | Frames: {self.total_frames}") else: self.current_cap = None self.total_frames = 1 self.current_frame = 0 # Load initial frame self.load_current_frame() # Start watch tracking session for videos self.start_watch_session() return True def load_current_frame(self): """Load the current frame into display cache""" if self.is_video(self.media_files[self.current_index]): if not self.current_cap: return False ret, frame = self.current_cap.read() if ret: self.current_display_frame = frame self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES)) return True return False else: frame = cv2.imread(str(self.media_files[self.current_index])) if frame is not None: self.current_display_frame = frame return True return False def start_watch_session(self): """Start tracking a new viewing session""" if self.is_video(self.media_files[self.current_index]): self.current_watch_start = self.current_frame self.last_frame_position = self.current_frame def update_watch_tracking(self): """Update watch tracking based on current frame position""" if not self.is_video(self.media_files[self.current_index]) or self.current_watch_start is None: return current_file = str(self.media_files[self.current_index]) # If we've moved more than a few frames from last position, record the watched region if abs(self.current_frame - self.last_frame_position) > 5 or \ abs(self.current_frame - self.current_watch_start) > 30: # Record the region we just watched start_frame = min(self.current_watch_start, self.last_frame_position) end_frame = max(self.current_watch_start, self.last_frame_position) if current_file not in self.watched_regions: self.watched_regions[current_file] = [] # Merge with existing regions if they overlap self.add_watched_region(current_file, start_frame, end_frame) # Start new session from current position self.current_watch_start = self.current_frame self.last_frame_position = self.current_frame def add_watched_region(self, file_path, start_frame, end_frame): """Add a watched region, merging with existing overlapping regions""" if file_path not in self.watched_regions: self.watched_regions[file_path] = [] regions = self.watched_regions[file_path] new_region = [start_frame, end_frame] # Merge overlapping regions merged = [] for region in regions: if new_region[1] < region[0] or new_region[0] > region[1]: # No overlap merged.append(region) else: # Overlap, merge new_region[0] = min(new_region[0], region[0]) new_region[1] = max(new_region[1], region[1]) merged.append(tuple(new_region)) self.watched_regions[file_path] = merged def get_sample_points(self): """Get standardized sample points for video navigation""" segments = 8 # Divide video into 8 segments for sampling segment_size = self.total_frames // segments if segment_size == 0: return [] return [ segment_size * 2, # 1/4 through segment_size * 4, # 1/2 through segment_size * 6, # 3/4 through segment_size * 1, # 1/8 through segment_size * 3, # 3/8 through segment_size * 5, # 5/8 through segment_size * 7, # 7/8 through 0 # Beginning ] def jump_to_unwatched_region(self): """Jump to the next unwatched region of the video""" if not self.is_video(self.media_files[self.current_index]): return False current_file = str(self.media_files[self.current_index]) # Get or initialize jump counter for this file if not hasattr(self, 'jump_counters'): self.jump_counters = {} if current_file not in self.jump_counters: self.jump_counters[current_file] = 0 # Get standardized sample points sample_points = self.get_sample_points() if not sample_points: print("Video too short for sampling") return False current_jump = self.jump_counters[current_file] if current_jump >= len(sample_points): print("All sample points visited! Video fully sampled.") return False target_frame = sample_points[current_jump] target_frame = min(target_frame, self.total_frames - 1) # Track last position for bisection self.last_jump_position[current_file] = self.current_frame # Track jump history for H key undo if current_file not in self.jump_history: self.jump_history[current_file] = [] self.jump_history[current_file].append(self.current_frame) # Jump to the target frame if not self.multi_segment_mode: self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) self.load_current_frame() # Increment jump counter self.jump_counters[current_file] += 1 # Calculate percentage through video percentage = (target_frame / self.total_frames) * 100 print(f"Sample {current_jump + 1}/{len(sample_points)}: jumped to frame {target_frame} ({percentage:.1f}% through video)") return True def bisect_backwards(self): """Bisect backwards between last position and current position""" if not self.is_video(self.media_files[self.current_index]): return False current_file = str(self.media_files[self.current_index]) if current_file not in self.last_jump_position: print("No previous position to bisect from. Use L first to establish a reference point.") return False last_pos = self.last_jump_position[current_file] current_pos = self.current_frame if last_pos == current_pos: print("Already at the same position as last jump.") return False # Calculate midpoint if last_pos < current_pos: midpoint = (last_pos + current_pos) // 2 else: midpoint = (current_pos + last_pos) // 2 # Update last position for further bisection self.last_jump_position[current_file] = current_pos # Jump to midpoint self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, midpoint) self.load_current_frame() percentage = (midpoint / self.total_frames) * 100 print(f"Bisected backwards to frame {midpoint} ({percentage:.1f}% through video)") return True def bisect_forwards(self): """Bisect forwards between current position and next sample point""" if not self.is_video(self.media_files[self.current_index]): return False current_file = str(self.media_files[self.current_index]) # Get next sample point if not hasattr(self, 'jump_counters') or current_file not in self.jump_counters: print("No sampling started yet. Use L first to establish sample points.") return False # Use same sampling strategy as L key sample_points = self.get_sample_points() current_jump = self.jump_counters[current_file] if current_jump >= len(sample_points): print("All sample points visited. No forward reference point.") return False next_sample = sample_points[current_jump] next_sample = min(next_sample, self.total_frames - 1) current_pos = self.current_frame # Calculate midpoint between current and next sample midpoint = (current_pos + next_sample) // 2 if midpoint == current_pos: print("Already at or very close to next sample point.") return False # Update last position for further bisection self.last_jump_position[current_file] = current_pos # Jump to midpoint self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, midpoint) self.load_current_frame() percentage = (midpoint / self.total_frames) * 100 print(f"Bisected forwards to frame {midpoint} ({percentage:.1f}% through video)") return True def toggle_multi_segment_mode(self): """Toggle between single and multi-segment video mode""" if not self.is_video(self.media_files[self.current_index]): print("Multi-segment mode only works with videos") return False self.multi_segment_mode = not self.multi_segment_mode if self.multi_segment_mode: print(f"Enabling multi-segment mode ({self.segment_count} segments)...") try: self.setup_segment_captures() print("Multi-segment mode enabled successfully") except Exception as e: print(f"Failed to setup multi-segment mode: {e}") import traceback traceback.print_exc() self.multi_segment_mode = False return False else: print("Disabling multi-segment mode...") self.cleanup_segment_captures() # Reload single video self.load_media(self.media_files[self.current_index]) print("Multi-segment mode disabled") return True def toggle_timeline(self): """Toggle timeline visibility""" self.timeline_visible = not self.timeline_visible print(f"Timeline {'visible' if self.timeline_visible else 'hidden'}") return True def setup_segment_captures(self): if not self.is_video(self.media_files[self.current_index]): return start_time = time.time() print(f"Setting up {self.segment_count} segments with video preloading...") try: # Clean up existing segment captures print("Cleaning up existing captures...") self.cleanup_segment_captures() current_file = self.media_files[self.current_index] print(f"Working with file: {current_file}") # Initialize arrays print("Initializing arrays...") self.segment_caps = [None] * self.segment_count # Keep for compatibility self.segment_frames = [None] * self.segment_count self.segment_positions = [] self.segment_current_frames = [0] * self.segment_count # Track current frame for each segment # Calculate target positions print("Calculating segment positions...") if self.total_frames <= 1: print("Error: Video has insufficient frames for multi-segment mode") return # Use conservative frame range safe_frame_count = max(1, int(self.total_frames * 0.6)) print(f"Using safe frame count: {safe_frame_count} (60% of reported {self.total_frames})") for i in range(self.segment_count): if self.segment_count <= 1: position_ratio = 0 else: position_ratio = i / (self.segment_count - 1) start_frame = int(position_ratio * (safe_frame_count - 1)) start_frame = max(0, min(start_frame, safe_frame_count - 1)) self.segment_positions.append(start_frame) self.segment_current_frames[i] = start_frame # Start each segment at its position print(f"Segment positions: {self.segment_positions}") # Preload the entire video into memory print("Preloading entire video into memory...") preload_start = time.time() self.video_frame_cache = [] if self.current_cap and self.current_cap.isOpened(): self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # Pre-allocate list for better performance self.video_frame_cache = [None] * safe_frame_count frame_count = 0 while frame_count < safe_frame_count: ret, frame = self.current_cap.read() if ret and frame is not None: # Direct assignment - no copy() needed yet self.video_frame_cache[frame_count] = frame frame_count += 1 else: # Truncate list to actual size self.video_frame_cache = self.video_frame_cache[:frame_count] break self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, self.current_frame) preload_time = (time.time() - preload_start) * 1000 print(f"Video preloading: {preload_time:.1f}ms ({len(self.video_frame_cache)} frames)") # Initialize segment frames from the preloaded cache print("Initializing segment frames...") for i in range(self.segment_count): if self.segment_current_frames[i] < len(self.video_frame_cache): self.segment_frames[i] = self.video_frame_cache[self.segment_current_frames[i]].copy() except Exception as e: print(f"Error in setup: {e}") import traceback traceback.print_exc() return total_time = time.time() - start_time print(f"Total setup time: {total_time * 1000:.1f}ms") # Report success successful_segments = sum(1 for frame in self.segment_frames if frame is not None) print(f"Successfully preloaded video with {successful_segments}/{self.segment_count} active segments") def cleanup_segment_captures(self): """Clean up all segment video captures and preloaded cache""" for cap in self.segment_caps: if cap: cap.release() self.segment_caps = [] self.segment_frames = [] self.segment_positions = [] if hasattr(self, 'video_frame_cache'): self.video_frame_cache = [] if hasattr(self, 'segment_current_frames'): self.segment_current_frames = [] def update_segment_frames(self): """Update frames for segments using the preloaded video array - smooth playback!""" if not self.multi_segment_mode or not self.segment_frames or not hasattr(self, 'video_frame_cache'): return for i in range(len(self.segment_frames)): if self.segment_frames[i] is not None and self.video_frame_cache: self.segment_current_frames[i] += 1 if self.segment_current_frames[i] >= len(self.video_frame_cache): self.segment_current_frames[i] = 0 self.segment_frames[i] = self.video_frame_cache[self.segment_current_frames[i]].copy() def display_current_frame(self): """Display the current cached frame with overlays""" if self.multi_segment_mode: self.display_multi_segment_frame() else: self.display_single_frame() def display_single_frame(self): """Display single frame view""" if self.current_display_frame is None: return frame = self.current_display_frame.copy() # Add info overlay current_file = self.media_files[self.current_index] if self.is_video(self.media_files[self.current_index]): info_text = f"Speed: {self.playback_speed:.1f}x | Frame: {self.current_frame}/{self.total_frames} | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}" cv2.putText( frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2, ) cv2.putText( frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 1 ) # Draw timeline self.draw_timeline(frame) # Maintain aspect ratio when displaying self.display_with_aspect_ratio(frame) def display_multi_segment_frame(self): """Display multi-segment frame view""" if not self.segment_frames or not any(frame is not None for frame in self.segment_frames): return # Calculate grid dimensions (2x2 for 4 segments) grid_rows = int(self.segment_count ** 0.5) grid_cols = int(self.segment_count / grid_rows) # Get reference frame size ref_frame = next((f for f in self.segment_frames if f is not None), None) if ref_frame is None: return frame_height, frame_width = ref_frame.shape[:2] # Calculate segment display size segment_width = frame_width // grid_cols segment_height = frame_height // grid_rows # Create combined display frame combined_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8) # Place each segment in the grid for i, segment_frame in enumerate(self.segment_frames): if segment_frame is None: continue row = i // grid_cols col = i % grid_cols # Resize segment frame to fit grid cell while maintaining aspect ratio frame_height, frame_width = segment_frame.shape[:2] seg_scale_x = segment_width / frame_width seg_scale_y = segment_height / frame_height seg_scale = min(seg_scale_x, seg_scale_y) new_seg_width = int(frame_width * seg_scale) new_seg_height = int(frame_height * seg_scale) resized_segment = cv2.resize(segment_frame, (new_seg_width, new_seg_height), interpolation=cv2.INTER_AREA) # Center the resized segment in the grid cell y_offset = (segment_height - new_seg_height) // 2 x_offset = (segment_width - new_seg_width) // 2 # Calculate position in combined frame y_start = row * segment_height y_end = y_start + segment_height x_start = col * segment_width x_end = x_start + segment_width # Place segment in combined frame (centered) y_place_start = y_start + y_offset y_place_end = y_place_start + new_seg_height x_place_start = x_start + x_offset x_place_end = x_place_start + new_seg_width # Ensure we don't go out of bounds y_place_end = min(y_place_end, y_end) x_place_end = min(x_place_end, x_end) combined_frame[y_place_start:y_place_end, x_place_start:x_place_end] = resized_segment # Add segment label segment_position = int((self.segment_positions[i] / self.total_frames) * 100) label_text = f"Seg {i+1}: {segment_position}%" cv2.putText( combined_frame, label_text, (x_place_start + 5, y_place_start + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2, ) cv2.putText( combined_frame, label_text, (x_place_start + 5, y_place_start + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, ) # Draw grid borders cv2.rectangle(combined_frame, (x_start, y_start), (x_end-1, y_end-1), (128, 128, 128), 1) # Add overall info overlay current_file = self.media_files[self.current_index] info_text = f"MULTI-SEGMENT | Speed: {self.playback_speed:.1f}x | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}" cv2.putText( combined_frame, info_text, (10, frame_height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, ) cv2.putText( combined_frame, info_text, (10, frame_height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 1 ) # Draw multi-segment timeline self.draw_multi_segment_timeline(combined_frame) # Maintain aspect ratio when displaying self.display_with_aspect_ratio(combined_frame) def draw_multi_segment_timeline(self, frame): """Draw timeline showing all segment positions""" if not self.is_video(self.media_files[self.current_index]) or not self.segment_caps or not self.timeline_visible: return height, width = frame.shape[:2] # Timeline area - smaller than normal timeline timeline_height = 30 timeline_y = height - timeline_height - 25 # Leave space for info text timeline_margin = 20 timeline_bar_height = 8 # Draw timeline background cv2.rectangle(frame, (0, timeline_y), (width, timeline_y + timeline_height), (40, 40, 40), -1) # Calculate timeline bar position bar_y = timeline_y + (timeline_height - timeline_bar_height) // 2 bar_x_start = timeline_margin bar_x_end = width - timeline_margin bar_width = bar_x_end - bar_x_start # Draw timeline background bar cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + timeline_bar_height), (80, 80, 80), -1) cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + timeline_bar_height), (200, 200, 200), 1) # Draw segment markers if self.total_frames > 0: for i, segment_pos in enumerate(self.segment_positions): # Calculate position on timeline progress = segment_pos / max(1, self.total_frames - 1) marker_x = bar_x_start + int(bar_width * progress) # Draw segment marker color = (0, 255, 100) if i < len(self.segment_caps) and self.segment_caps[i] else (100, 100, 100) cv2.circle(frame, (marker_x, bar_y + timeline_bar_height // 2), 4, color, -1) cv2.circle(frame, (marker_x, bar_y + timeline_bar_height // 2), 4, (255, 255, 255), 1) # Add segment number cv2.putText(frame, str(i+1), (marker_x - 3, bar_y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 255), 1) def draw_timeline(self, frame): """Draw timeline at the bottom of the frame""" # Only draw timeline for video files in single mode and when visible if not self.is_video(self.media_files[self.current_index]) or self.multi_segment_mode or not self.timeline_visible: return height, width = frame.shape[:2] self.window_height = height self.window_width = width # Timeline background area timeline_y = height - self.TIMELINE_HEIGHT cv2.rectangle(frame, (0, timeline_y), (width, height), (40, 40, 40), -1) # Calculate timeline bar position bar_y = timeline_y + (self.TIMELINE_HEIGHT - self.TIMELINE_BAR_HEIGHT) // 2 bar_x_start = self.TIMELINE_MARGIN bar_x_end = width - self.TIMELINE_MARGIN bar_width = bar_x_end - bar_x_start self.timeline_rect = (bar_x_start, bar_y, bar_width, self.TIMELINE_BAR_HEIGHT) # Draw timeline background cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BG, -1) cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BORDER, 1) # Draw progress for videos if self.total_frames > 0: progress = self.current_frame / max(1, self.total_frames - 1) progress_width = int(bar_width * progress) if progress_width > 0: cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_start + progress_width, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_PROGRESS, -1) # Draw handle handle_x = bar_x_start + progress_width handle_y = bar_y + self.TIMELINE_BAR_HEIGHT // 2 cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_HANDLE, -1) cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_BORDER, 2) def mouse_callback(self, event, x, y, flags, param): """Handle mouse events for timeline interaction""" if not self.timeline_rect or not self.is_video(self.media_files[self.current_index]) or self.multi_segment_mode: return bar_x_start, bar_y, bar_width, bar_height = self.timeline_rect bar_x_end = bar_x_start + bar_width # Check if mouse is over timeline if bar_y <= y <= bar_y + bar_height + 10: # Add some extra height for easier clicking if event == cv2.EVENT_LBUTTONDOWN: if bar_x_start <= x <= bar_x_end: self.mouse_dragging = True self.seek_to_position(x, bar_x_start, bar_width) elif event == cv2.EVENT_MOUSEMOVE and self.mouse_dragging: if bar_x_start <= x <= bar_x_end: self.seek_to_position(x, bar_x_start, bar_width) elif event == cv2.EVENT_LBUTTONUP: self.mouse_dragging = False def seek_to_position(self, mouse_x, bar_x_start, bar_width): """Seek to position based on mouse click/drag on timeline""" if not self.current_cap or not self.is_video(self.media_files[self.current_index]): return # Calculate position ratio relative_x = mouse_x - bar_x_start position_ratio = max(0, min(1, relative_x / bar_width)) # Calculate target frame target_frame = int(position_ratio * (self.total_frames - 1)) target_frame = max(0, min(target_frame, self.total_frames - 1)) # Seek to target frame self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) self.load_current_frame() def advance_frame(self): """Advance to next frame(s) based on playback speed""" if ( not self.is_video(self.media_files[self.current_index]) or not self.is_playing ): return if self.multi_segment_mode: self.update_segment_frames() return True else: frames_to_skip = self.calculate_frames_to_skip() for _ in range(frames_to_skip + 1): ret, frame = self.current_cap.read() if not ret: actual_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES)) if actual_frame < self.total_frames - 5: print(f"Frame count mismatch! Reported: {self.total_frames}, Actual: {actual_frame}") self.total_frames = actual_frame return False self.current_display_frame = frame self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES)) self.update_watch_tracking() return True def seek_video(self, frames_delta: int): """Seek video by specified number of frames""" if not self.is_video(self.media_files[self.current_index]): return if self.multi_segment_mode: return if not self.current_cap: return target_frame = max( 0, min(self.current_frame + frames_delta, self.total_frames - 1) ) self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) self.load_current_frame() def process_seek_key(self, key: int) -> bool: """Process seeking keys with proper rate limiting""" current_time = time.time() seek_direction = 0 seek_amount = 0 seek_multiplier = 1 # Default multiplier # Check for A/D keys with modifiers if key == ord("a") or key == ord("A"): seek_direction = -1 # SHIFT+A gives uppercase A if key == ord("A"): seek_multiplier = self.SHIFT_SEEK_MULTIPLIER elif key == ord("d") or key == ord("D"): seek_direction = 1 # SHIFT+D gives uppercase D if key == ord("D"): seek_multiplier = self.SHIFT_SEEK_MULTIPLIER elif key == 1: # CTRL+A seek_direction = -1 seek_multiplier = self.CTRL_SEEK_MULTIPLIER elif key == 4: # CTRL+D seek_direction = 1 seek_multiplier = self.CTRL_SEEK_MULTIPLIER elif key == ord(","): seek_amount = -self.fine_seek_frames elif key == ord("."): seek_amount = self.fine_seek_frames else: if self.current_seek_key is not None: self.current_seek_key = None self.is_seeking = False return False # Handle fine seeking (comma/period) - always immediate if seek_amount != 0: self.seek_video(seek_amount) return True # Handle A/D key seeking with rate limiting and modifiers if seek_direction != 0: if self.current_seek_key != key: self.current_seek_key = key self.key_first_press_time = current_time self.last_seek_time = current_time self.is_seeking = True seek_amount = seek_direction * self.coarse_seek_frames * seek_multiplier self.seek_video(seek_amount) return True elif self.is_seeking: time_since_last_seek = current_time - self.last_seek_time time_held = current_time - self.key_first_press_time if time_since_last_seek >= self.KEY_REPEAT_RATE_SEC: self.last_seek_time = current_time if time_held > self.FAST_SEEK_ACTIVATION_TIME: seek_amount = seek_direction * self.fast_seek_frames * seek_multiplier else: seek_amount = seek_direction * self.coarse_seek_frames * seek_multiplier self.seek_video(seek_amount) return True return False def grade_media(self, grade: int): """Move current media file to grade directory""" if not self.media_files or grade < 1 or grade > 5: return current_file = self.media_files[self.current_index] grade_dir = self.directory / str(grade) # Create grade directory if it doesn't exist grade_dir.mkdir(exist_ok=True) destination = grade_dir / current_file.name counter = 1 while destination.exists(): stem = current_file.stem suffix = current_file.suffix destination = grade_dir / f"{stem}_{counter}{suffix}" counter += 1 # Track this move for undo functionality BEFORE making changes self.undo_history.append((str(destination), str(current_file), self.current_index)) # Release video capture to unlock the file before moving if self.current_cap: self.current_cap.release() self.current_cap = None # Also release segment captures if in multi-segment mode if self.multi_segment_mode: self.cleanup_segment_captures() try: shutil.move(str(current_file), str(destination)) print(f"Moved {current_file.name} to grade {grade}") self.media_files.pop(self.current_index) if self.current_index >= len(self.media_files): self.current_index = 0 if not self.media_files: print("No more media files to grade!") return False except Exception as e: print(f"Error moving file: {e}") # Remove the undo entry since the move failed self.undo_history.pop() return True def undo_last_action(self): """Undo the last grading action by moving file back and restoring to media list""" if not self.undo_history: print("No actions to undo!") return False # Get the last action moved_file_path, original_file_path, original_index = self.undo_history.pop() # Release video capture to unlock any current file before moving if self.current_cap: self.current_cap.release() self.current_cap = None try: # Move the file back to its original location shutil.move(moved_file_path, original_file_path) # Add the file back to the media list at its original position original_file = Path(original_file_path) # Insert the file back at the appropriate position if original_index <= len(self.media_files): self.media_files.insert(original_index, original_file) else: self.media_files.append(original_file) # Navigate to the restored file print("Navigating to: ", original_index) self.current_index = original_index print(f"Undone: Moved {original_file.name} back from grade folder") return True except Exception as e: print(f"Error undoing action: {e}") # If undo failed, put the action back in history self.undo_history.append((moved_file_path, original_file_path, original_index)) return False def run(self): """Main grading loop""" self.media_files = self.find_media_files() if not self.media_files: print("No media files found in directory!") return print(f"Found {len(self.media_files)} media files") print("Controls:") print(" Space: Pause/Play") print(" A/D: Seek backward/forward (hold for FAST seek)") print(" Shift+A/D: Seek backward/forward (5x multiplier)") print(" Ctrl+A/D: Seek backward/forward (10x multiplier)") print(" , / . : Frame-by-frame seek (fine control)") print(" W/S: Decrease/Increase playback speed") print(" G: Toggle multi-segment mode (videos only)") print(" 1-5: Grade and move file") print(" N: Next file") print(" P: Previous file") print(" U: Undo last grading action") print(" L: Sample video at key points (videos only)") print(" H: Toggle timeline visibility") print(" J: Bisect backwards from current position (videos only, disabled in multi-segment)") print(" K: Bisect forwards toward next sample (videos only, disabled in multi-segment)") print(" Q/ESC: Quit") cv2.namedWindow("Media Grader", cv2.WINDOW_NORMAL) cv2.setMouseCallback("Media Grader", self.mouse_callback) # Set initial window size to a reasonable default (will be resized on first frame) cv2.resizeWindow("Media Grader", 1280, 720) while self.media_files and self.current_index < len(self.media_files): current_file = self.media_files[self.current_index] if not self.load_media(current_file): print(f"Could not load {current_file}") self.current_index += 1 continue # Setup multi-segment mode if enabled and this is a video if self.multi_segment_mode and self.is_video(current_file): self.setup_segment_captures() window_title = f"Media Grader - {current_file.name} ({self.current_index + 1}/{len(self.media_files)})" cv2.setWindowTitle("Media Grader", window_title) while True: self.display_current_frame() if self.is_video(current_file): if self.is_seeking: delay = self.FRAME_RENDER_TIME_MS else: delay = self.calculate_frame_delay() else: delay = self.IMAGE_DISPLAY_DELAY_MS key = cv2.waitKey(delay) & 0xFF if key == ord("q") or key == 27: return elif key == ord(" "): self.is_playing = not self.is_playing elif key == ord("s"): self.playback_speed = max( self.MIN_PLAYBACK_SPEED, self.playback_speed - self.SPEED_INCREMENT, ) elif key == ord("w"): self.playback_speed = min( self.MAX_PLAYBACK_SPEED, self.playback_speed + self.SPEED_INCREMENT, ) elif self.process_seek_key(key): continue elif key == ord("n"): break elif key == ord("p"): self.current_index = max(0, self.current_index - 1) print("Navigating to: ", self.current_index) break elif key == ord("u"): if self.undo_last_action(): # File was restored, reload it break elif key == ord("l"): # Jump to largest unwatched region (works in both modes) self.jump_to_unwatched_region() elif key == ord("j"): if not self.multi_segment_mode: self.bisect_backwards() else: print("Navigation keys (H/J/K/L) disabled in multi-segment mode") elif key == ord("k"): if not self.multi_segment_mode: self.bisect_forwards() else: print("Navigation keys (H/J/K/L) disabled in multi-segment mode") elif key == ord("h"): # Toggle timeline visibility self.toggle_timeline() elif key == ord("g"): self.toggle_multi_segment_mode() elif key in [ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]: grade = int(chr(key)) if not self.grade_media(grade): return break elif key == 255: if self.is_seeking and self.current_seek_key is not None: self.process_seek_key(self.current_seek_key) if ( self.is_playing and self.is_video(current_file) and not self.is_seeking ): if not self.advance_frame(): # Video reached the end, restart it instead of navigating self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, 0) self.current_frame = 0 self.load_current_frame() if key not in [ord("p"), ord("u"), ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]: print("Navigating to (pu12345): ", self.current_index) self.current_index += 1 if self.current_cap: self.current_cap.release() self.cleanup_segment_captures() # Cleanup thread pool self.thread_pool.shutdown(wait=True) cv2.destroyAllWindows() print("Grading session complete!") def main(): parser = argparse.ArgumentParser( description="Media Grader - Grade media files by moving them to numbered folders" ) parser.add_argument( "directory", nargs="?", default=".", help="Directory to scan for media files (default: current directory)", ) parser.add_argument( "--seek-frames", type=int, default=30, help="Number of frames to seek when using arrow keys (default: 30)", ) parser.add_argument( "--snap-to-iframe", action="store_true", help="Snap to I-frames when seeking backward for better performance", ) args = parser.parse_args() if not os.path.isdir(args.directory): print(f"Error: {args.directory} is not a valid directory") sys.exit(1) grader = MediaGrader(args.directory, args.seek_frames, args.snap_to_iframe) try: grader.run() except KeyboardInterrupt: print("\nGrading session interrupted") except Exception as e: print(f"Error: {e}") sys.exit(1) if __name__ == "__main__": main()