import os import sys import glob import cv2 import numpy as np import argparse import shutil import time from pathlib import Path from typing import List class MediaGrader: # Configuration constants BASE_FRAME_DELAY_MS = 16 # ~30 FPS KEY_REPEAT_RATE_SEC = 0.5 # How often to process key repeats FAST_SEEK_ACTIVATION_TIME = 2.0 # How long to hold before fast seek FRAME_RENDER_TIME_MS = 50 # Time to let frames render between seeks SPEED_INCREMENT = 0.2 MIN_PLAYBACK_SPEED = 0.1 MAX_PLAYBACK_SPEED = 100.0 FAST_SEEK_MULTIPLIER = 60 IMAGE_DISPLAY_DELAY_MS = 100 # Timeline configuration TIMELINE_HEIGHT = 60 TIMELINE_MARGIN = 20 TIMELINE_BAR_HEIGHT = 12 TIMELINE_HANDLE_SIZE = 12 TIMELINE_COLOR_BG = (80, 80, 80) TIMELINE_COLOR_PROGRESS = (0, 120, 255) TIMELINE_COLOR_HANDLE = (255, 255, 255) TIMELINE_COLOR_BORDER = (200, 200, 200) # Seek modifiers for A/D keys SHIFT_SEEK_MULTIPLIER = 5 # SHIFT + A/D multiplier CTRL_SEEK_MULTIPLIER = 10 # CTRL + A/D multiplier def __init__( self, directory: str, seek_frames: int = 30, snap_to_iframe: bool = False ): self.directory = Path(directory) self.seek_frames = seek_frames self.current_index = 0 self.playback_speed = 1.0 self.media_files = [] self.current_cap = None self.is_playing = True self.current_frame = 0 self.total_frames = 0 # Multi-segment mode state self.multi_segment_mode = False self.segment_count = self.SEGMENT_COUNT # Use the class constant self.segment_overlap_percent = self.SEGMENT_OVERLAP_PERCENT # Use the class constant self.segment_caps = [] # List of VideoCapture objects for each segment self.segment_frames = [] # List of current frames for each segment self.segment_positions = [] # List of frame positions for each segment # Key repeat tracking with rate limiting self.last_seek_time = 0 self.current_seek_key = None self.key_first_press_time = 0 self.is_seeking = False # Seeking modes self.fine_seek_frames = 1 # Frame-by-frame self.coarse_seek_frames = self.seek_frames # User-configurable self.fast_seek_frames = self.seek_frames * self.FAST_SEEK_MULTIPLIER # Current frame cache for display self.current_display_frame = None # Supported media extensions self.extensions = [ ".png", ".jpg", ".jpeg", ".gif", ".mp4", ".avi", ".mov", ".mkv", ] # Mouse interaction for timeline self.mouse_dragging = False self.timeline_rect = None self.window_width = 800 self.window_height = 600 # Undo functionality self.undo_history = [] # List of (source_path, destination_path, original_index) tuples # Watch tracking for "good look" feature self.watched_regions = {} # Dict[file_path: List[Tuple[start_frame, end_frame]]] self.current_watch_start = None # Frame where current viewing session started self.last_frame_position = 0 # Track last known frame position # Bisection navigation tracking self.last_jump_position = {} # Dict[file_path: last_frame] for bisection reference # Jump history for H key (undo jump) self.jump_history = {} # Dict[file_path: List[frame_positions]] for jump undo # Undo functionality self.undo_history = [] # List of (source_path, destination_path, original_index) tuples # Watch tracking for "good look" feature self.watched_regions = {} # Dict[file_path: List[Tuple[start_frame, end_frame]]] self.current_watch_start = None # Frame where current viewing session started self.last_frame_position = 0 # Track last known frame position # Bisection navigation tracking self.last_jump_position = {} # Dict[file_path: last_frame] for bisection reference # Jump history for H key (undo jump) self.jump_history = {} # Dict[file_path: List[frame_positions]] for jump undo # Multi-segment mode configuration MULTI_SEGMENT_MODE = False SEGMENT_COUNT = 16 # Number of video segments (2x2 grid) SEGMENT_OVERLAP_PERCENT = 10 # Percentage overlap between segments # Seek modifiers for A/D keys SHIFT_SEEK_MULTIPLIER = 5 # SHIFT + A/D multiplier def find_media_files(self) -> List[Path]: """Find all media files recursively in the directory""" media_files = [] for ext in self.extensions: pattern = str(self.directory / "**" / f"*{ext}") files = glob.glob(pattern, recursive=True) media_files.extend([Path(f) for f in files]) # Filter out files already in grade directories filtered_files = [] for file in media_files: # Check if file is not in a grade directory (1-5) if not any(part in ["1", "2", "3", "4", "5"] for part in file.parts): print("Adding file: ", file) filtered_files.append(file) return sorted(filtered_files) def is_video(self, file_path: Path) -> bool: """Check if file is a video""" video_extensions = [".mp4", ".avi", ".mov", ".mkv", ".gif"] return file_path.suffix.lower() in video_extensions def calculate_frame_delay(self) -> int: """Calculate frame delay in milliseconds based on playback speed""" delay_ms = int(self.BASE_FRAME_DELAY_MS / self.playback_speed) return max(1, delay_ms) def calculate_frames_to_skip(self) -> int: """Calculate how many frames to skip for high-speed playback""" if self.playback_speed <= 1.0: return 0 elif self.playback_speed <= 2.0: return 0 elif self.playback_speed <= 5.0: return int(self.playback_speed - 1) else: return int(self.playback_speed * 2) def load_media(self, file_path: Path) -> bool: """Load media file for display""" if self.current_cap: self.current_cap.release() if self.is_video(file_path): # Suppress OpenCV error messages for unsupported codecs self.current_cap = cv2.VideoCapture(str(file_path)) if not self.current_cap.isOpened(): print(f"Warning: Could not open video file {file_path.name} (unsupported codec)") return False self.total_frames = int(self.current_cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.current_frame = 0 else: self.current_cap = None self.total_frames = 1 self.current_frame = 0 # Load initial frame self.load_current_frame() # Start watch tracking session for videos self.start_watch_session() return True def load_current_frame(self): """Load the current frame into display cache""" if self.is_video(self.media_files[self.current_index]): if not self.current_cap: return False ret, frame = self.current_cap.read() if ret: self.current_display_frame = frame self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES)) return True return False else: frame = cv2.imread(str(self.media_files[self.current_index])) if frame is not None: self.current_display_frame = frame return True return False def start_watch_session(self): """Start tracking a new viewing session""" if self.is_video(self.media_files[self.current_index]): self.current_watch_start = self.current_frame self.last_frame_position = self.current_frame def update_watch_tracking(self): """Update watch tracking based on current frame position""" if not self.is_video(self.media_files[self.current_index]) or self.current_watch_start is None: return current_file = str(self.media_files[self.current_index]) # If we've moved more than a few frames from last position, record the watched region if abs(self.current_frame - self.last_frame_position) > 5 or \ abs(self.current_frame - self.current_watch_start) > 30: # Record the region we just watched start_frame = min(self.current_watch_start, self.last_frame_position) end_frame = max(self.current_watch_start, self.last_frame_position) if current_file not in self.watched_regions: self.watched_regions[current_file] = [] # Merge with existing regions if they overlap self.add_watched_region(current_file, start_frame, end_frame) # Start new session from current position self.current_watch_start = self.current_frame self.last_frame_position = self.current_frame def add_watched_region(self, file_path, start_frame, end_frame): """Add a watched region, merging with existing overlapping regions""" if file_path not in self.watched_regions: self.watched_regions[file_path] = [] regions = self.watched_regions[file_path] new_region = [start_frame, end_frame] # Merge overlapping regions merged = [] for region in regions: if new_region[1] < region[0] or new_region[0] > region[1]: # No overlap merged.append(region) else: # Overlap, merge new_region[0] = min(new_region[0], region[0]) new_region[1] = max(new_region[1], region[1]) merged.append(tuple(new_region)) self.watched_regions[file_path] = merged def find_largest_unwatched_region(self): """Find the largest unwatched region in the current video""" if not self.is_video(self.media_files[self.current_index]): return None current_file = str(self.media_files[self.current_index]) watched = self.watched_regions.get(current_file, []) if not watched: # No regions watched yet, return the beginning return (0, self.total_frames // 4) # Sort watched regions by start frame watched.sort(key=lambda x: x[0]) # Find gaps between watched regions gaps = [] # Gap before first watched region if watched[0][0] > 0: gaps.append((0, watched[0][0])) # Gaps between watched regions for i in range(len(watched) - 1): gap_start = watched[i][1] gap_end = watched[i + 1][0] if gap_end > gap_start: gaps.append((gap_start, gap_end)) # Gap after last watched region if watched[-1][1] < self.total_frames: gaps.append((watched[-1][1], self.total_frames)) if not gaps: # Everything watched, return None return None # Return the largest gap largest_gap = max(gaps, key=lambda x: x[1] - x[0]) return largest_gap def get_sample_points(self): """Get standardized sample points for video navigation""" segments = 8 # Divide video into 8 segments for sampling segment_size = self.total_frames // segments if segment_size == 0: return [] return [ segment_size * 2, # 1/4 through segment_size * 4, # 1/2 through segment_size * 6, # 3/4 through segment_size * 1, # 1/8 through segment_size * 3, # 3/8 through segment_size * 5, # 5/8 through segment_size * 7, # 7/8 through 0 # Beginning ] def jump_to_unwatched_region(self): """Jump to the next unwatched region of the video""" if not self.is_video(self.media_files[self.current_index]): return False current_file = str(self.media_files[self.current_index]) # Get or initialize jump counter for this file if not hasattr(self, 'jump_counters'): self.jump_counters = {} if current_file not in self.jump_counters: self.jump_counters[current_file] = 0 # Get standardized sample points sample_points = self.get_sample_points() if not sample_points: print("Video too short for sampling") return False current_jump = self.jump_counters[current_file] if current_jump >= len(sample_points): print("All sample points visited! Video fully sampled.") return False target_frame = sample_points[current_jump] target_frame = min(target_frame, self.total_frames - 1) # Track last position for bisection self.last_jump_position[current_file] = self.current_frame # Track jump history for H key undo if current_file not in self.jump_history: self.jump_history[current_file] = [] self.jump_history[current_file].append(self.current_frame) # Jump to the target frame if self.multi_segment_mode: # In multi-segment mode, reposition all segments relative to the jump target self.reposition_segments_around_frame(target_frame) else: # In single mode, just jump the main capture self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) self.load_current_frame() # Increment jump counter self.jump_counters[current_file] += 1 # Calculate percentage through video percentage = (target_frame / self.total_frames) * 100 print(f"Sample {current_jump + 1}/{len(sample_points)}: jumped to frame {target_frame} ({percentage:.1f}% through video)") return True def bisect_backwards(self): """Bisect backwards between last position and current position""" if not self.is_video(self.media_files[self.current_index]): return False current_file = str(self.media_files[self.current_index]) if current_file not in self.last_jump_position: print("No previous position to bisect from. Use L first to establish a reference point.") return False last_pos = self.last_jump_position[current_file] current_pos = self.current_frame if last_pos == current_pos: print("Already at the same position as last jump.") return False # Calculate midpoint if last_pos < current_pos: midpoint = (last_pos + current_pos) // 2 else: midpoint = (current_pos + last_pos) // 2 # Update last position for further bisection self.last_jump_position[current_file] = current_pos # Jump to midpoint self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, midpoint) self.load_current_frame() percentage = (midpoint / self.total_frames) * 100 print(f"Bisected backwards to frame {midpoint} ({percentage:.1f}% through video)") return True def bisect_forwards(self): """Bisect forwards between current position and next sample point""" if not self.is_video(self.media_files[self.current_index]): return False current_file = str(self.media_files[self.current_index]) # Get next sample point if not hasattr(self, 'jump_counters') or current_file not in self.jump_counters: print("No sampling started yet. Use L first to establish sample points.") return False # Use same sampling strategy as L key sample_points = self.get_sample_points() current_jump = self.jump_counters[current_file] if current_jump >= len(sample_points): print("All sample points visited. No forward reference point.") return False next_sample = sample_points[current_jump] next_sample = min(next_sample, self.total_frames - 1) current_pos = self.current_frame # Calculate midpoint between current and next sample midpoint = (current_pos + next_sample) // 2 if midpoint == current_pos: print("Already at or very close to next sample point.") return False # Update last position for further bisection self.last_jump_position[current_file] = current_pos # Jump to midpoint self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, midpoint) self.load_current_frame() percentage = (midpoint / self.total_frames) * 100 print(f"Bisected forwards to frame {midpoint} ({percentage:.1f}% through video)") return True def undo_jump(self): """Undo the last L jump by returning to previous position""" if not self.is_video(self.media_files[self.current_index]): return False current_file = str(self.media_files[self.current_index]) if current_file not in self.jump_history or not self.jump_history[current_file]: print("No jump history to undo. Use L first to establish jump points.") return False # Get the last position before the most recent jump if len(self.jump_history[current_file]) < 1: print("No previous position to return to.") return False # Remove the current position from history and get the previous one previous_position = self.jump_history[current_file].pop() # Jump back to previous position self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, previous_position) self.load_current_frame() # Update last jump position for bisection reference self.last_jump_position[current_file] = previous_position percentage = (previous_position / self.total_frames) * 100 print(f"Undid jump: returned to frame {previous_position} ({percentage:.1f}% through video)") return True def toggle_multi_segment_mode(self): """Toggle between single and multi-segment video mode""" if not self.is_video(self.media_files[self.current_index]): print("Multi-segment mode only works with videos") return False self.multi_segment_mode = not self.multi_segment_mode if self.multi_segment_mode: print(f"Enabled multi-segment mode ({self.segment_count} segments)") self.setup_segment_captures() else: print("Disabled multi-segment mode") self.cleanup_segment_captures() # Reload single video self.load_media(self.media_files[self.current_index]) return True def setup_segment_captures(self): """Setup multiple video captures for segment mode""" if not self.is_video(self.media_files[self.current_index]): return # Clean up existing segment captures self.cleanup_segment_captures() current_file = self.media_files[self.current_index] # Calculate segment positions - evenly spaced through video self.segment_positions = [] for i in range(self.segment_count): # Position segments at 0%, 25%, 50%, 75% of video (not 0%, 33%, 66%, 100%) position_ratio = i / self.segment_count # This gives 0, 0.25, 0.5, 0.75 start_frame = int(position_ratio * self.total_frames) self.segment_positions.append(start_frame) # Create video captures for each segment for i, start_frame in enumerate(self.segment_positions): cap = cv2.VideoCapture(str(current_file)) if cap.isOpened(): cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) self.segment_caps.append(cap) # Load initial frame for each segment ret, frame = cap.read() if ret: self.segment_frames.append(frame) else: self.segment_frames.append(None) else: self.segment_caps.append(None) self.segment_frames.append(None) def cleanup_segment_captures(self): """Clean up all segment video captures""" for cap in self.segment_caps: if cap: cap.release() self.segment_caps = [] self.segment_frames = [] self.segment_positions = [] def update_segment_frames(self): """Update frames for all segments during playback""" if not self.multi_segment_mode or not self.segment_caps: return for i, cap in enumerate(self.segment_caps): if cap and cap.isOpened(): ret, frame = cap.read() if ret: self.segment_frames[i] = frame else: # Loop back to segment start when reaching end cap.set(cv2.CAP_PROP_POS_FRAMES, self.segment_positions[i]) ret, frame = cap.read() if ret: self.segment_frames[i] = frame def reposition_segments_around_frame(self, center_frame: int): """Reposition all segments around a center frame while maintaining spacing""" if not self.multi_segment_mode or not self.segment_caps: return # Calculate new segment positions around the center frame # Keep the same relative spacing but center around the new frame segment_spacing = self.total_frames // (self.segment_count + 1) new_positions = [] for i in range(self.segment_count): # Spread segments around center_frame offset = (i - (self.segment_count - 1) / 2) * segment_spacing new_frame = int(center_frame + offset) new_frame = max(0, min(new_frame, self.total_frames - 1)) new_positions.append(new_frame) # Update segment positions and seek all captures self.segment_positions = new_positions for i, cap in enumerate(self.segment_caps): if cap and cap.isOpened(): cap.set(cv2.CAP_PROP_POS_FRAMES, self.segment_positions[i]) # Load new frame ret, frame = cap.read() if ret: self.segment_frames[i] = frame # Reset position for next read cap.set(cv2.CAP_PROP_POS_FRAMES, self.segment_positions[i]) def seek_all_segments(self, frames_delta: int): """Seek all segments by the specified number of frames""" if not self.multi_segment_mode or not self.segment_caps: return for i, cap in enumerate(self.segment_caps): if cap and cap.isOpened(): current_frame = int(cap.get(cv2.CAP_PROP_POS_FRAMES)) segment_start = self.segment_positions[i] segment_duration = self.total_frames // self.segment_count segment_end = min(self.total_frames - 1, segment_start + segment_duration) target_frame = max(segment_start, min(current_frame + frames_delta, segment_end)) cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) # Load new frame ret, frame = cap.read() if ret: self.segment_frames[i] = frame # Reset position for next read cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) def display_current_frame(self): """Display the current cached frame with overlays""" if self.multi_segment_mode: self.display_multi_segment_frame() else: self.display_single_frame() def display_single_frame(self): """Display single frame view""" if self.current_display_frame is None: return frame = self.current_display_frame.copy() # Add info overlay current_file = self.media_files[self.current_index] if self.is_video(self.media_files[self.current_index]): info_text = f"Speed: {self.playback_speed:.1f}x | Frame: {self.current_frame}/{self.total_frames} | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}" cv2.putText( frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2, ) cv2.putText( frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 1 ) # Draw timeline self.draw_timeline(frame) cv2.imshow("Media Grader", frame) def display_multi_segment_frame(self): """Display multi-segment frame view""" if not self.segment_frames or not any(frame is not None for frame in self.segment_frames): return # Calculate grid dimensions (2x2 for 4 segments) grid_rows = int(self.segment_count ** 0.5) grid_cols = int(self.segment_count / grid_rows) # Get reference frame size ref_frame = next((f for f in self.segment_frames if f is not None), None) if ref_frame is None: return frame_height, frame_width = ref_frame.shape[:2] # Calculate segment display size segment_width = frame_width // grid_cols segment_height = frame_height // grid_rows # Create combined display frame combined_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8) # Place each segment in the grid for i, segment_frame in enumerate(self.segment_frames): if segment_frame is None: continue row = i // grid_cols col = i % grid_cols # Resize segment frame to fit grid cell resized_segment = cv2.resize(segment_frame, (segment_width, segment_height)) # Calculate position in combined frame y_start = row * segment_height y_end = y_start + segment_height x_start = col * segment_width x_end = x_start + segment_width # Place segment in combined frame combined_frame[y_start:y_end, x_start:x_end] = resized_segment # Add segment label segment_position = int((self.segment_positions[i] / self.total_frames) * 100) label_text = f"Seg {i+1}: {segment_position}%" cv2.putText( combined_frame, label_text, (x_start + 5, y_start + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2, ) cv2.putText( combined_frame, label_text, (x_start + 5, y_start + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, ) # Draw grid borders cv2.rectangle(combined_frame, (x_start, y_start), (x_end-1, y_end-1), (128, 128, 128), 1) # Add overall info overlay current_file = self.media_files[self.current_index] info_text = f"MULTI-SEGMENT | Speed: {self.playback_speed:.1f}x | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}" cv2.putText( combined_frame, info_text, (10, frame_height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, ) cv2.putText( combined_frame, info_text, (10, frame_height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 1 ) # Draw multi-segment timeline self.draw_multi_segment_timeline(combined_frame) cv2.imshow("Media Grader", combined_frame) def draw_multi_segment_timeline(self, frame): """Draw timeline showing all segment positions""" if not self.is_video(self.media_files[self.current_index]) or not self.segment_caps: return height, width = frame.shape[:2] # Timeline area - smaller than normal timeline timeline_height = 30 timeline_y = height - timeline_height - 25 # Leave space for info text timeline_margin = 20 timeline_bar_height = 8 # Draw timeline background cv2.rectangle(frame, (0, timeline_y), (width, timeline_y + timeline_height), (40, 40, 40), -1) # Calculate timeline bar position bar_y = timeline_y + (timeline_height - timeline_bar_height) // 2 bar_x_start = timeline_margin bar_x_end = width - timeline_margin bar_width = bar_x_end - bar_x_start # Draw timeline background bar cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + timeline_bar_height), (80, 80, 80), -1) cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + timeline_bar_height), (200, 200, 200), 1) # Draw segment markers if self.total_frames > 0: for i, segment_pos in enumerate(self.segment_positions): # Calculate position on timeline progress = segment_pos / max(1, self.total_frames - 1) marker_x = bar_x_start + int(bar_width * progress) # Draw segment marker color = (0, 255, 100) if i < len(self.segment_caps) and self.segment_caps[i] else (100, 100, 100) cv2.circle(frame, (marker_x, bar_y + timeline_bar_height // 2), 4, color, -1) cv2.circle(frame, (marker_x, bar_y + timeline_bar_height // 2), 4, (255, 255, 255), 1) # Add segment number cv2.putText(frame, str(i+1), (marker_x - 3, bar_y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 255), 1) def draw_timeline(self, frame): """Draw timeline at the bottom of the frame""" # Only draw timeline for video files in single mode if not self.is_video(self.media_files[self.current_index]) or self.multi_segment_mode: return height, width = frame.shape[:2] self.window_height = height self.window_width = width # Timeline background area timeline_y = height - self.TIMELINE_HEIGHT cv2.rectangle(frame, (0, timeline_y), (width, height), (40, 40, 40), -1) # Calculate timeline bar position bar_y = timeline_y + (self.TIMELINE_HEIGHT - self.TIMELINE_BAR_HEIGHT) // 2 bar_x_start = self.TIMELINE_MARGIN bar_x_end = width - self.TIMELINE_MARGIN bar_width = bar_x_end - bar_x_start self.timeline_rect = (bar_x_start, bar_y, bar_width, self.TIMELINE_BAR_HEIGHT) # Draw timeline background cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BG, -1) cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BORDER, 1) # Draw progress for videos if self.total_frames > 0: progress = self.current_frame / max(1, self.total_frames - 1) progress_width = int(bar_width * progress) if progress_width > 0: cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_start + progress_width, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_PROGRESS, -1) # Draw handle handle_x = bar_x_start + progress_width handle_y = bar_y + self.TIMELINE_BAR_HEIGHT // 2 cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_HANDLE, -1) cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_BORDER, 2) def mouse_callback(self, event, x, y, flags, param): """Handle mouse events for timeline interaction""" if not self.timeline_rect or not self.is_video(self.media_files[self.current_index]) or self.multi_segment_mode: return bar_x_start, bar_y, bar_width, bar_height = self.timeline_rect bar_x_end = bar_x_start + bar_width # Check if mouse is over timeline if bar_y <= y <= bar_y + bar_height + 10: # Add some extra height for easier clicking if event == cv2.EVENT_LBUTTONDOWN: if bar_x_start <= x <= bar_x_end: self.mouse_dragging = True self.seek_to_position(x, bar_x_start, bar_width) elif event == cv2.EVENT_MOUSEMOVE and self.mouse_dragging: if bar_x_start <= x <= bar_x_end: self.seek_to_position(x, bar_x_start, bar_width) elif event == cv2.EVENT_LBUTTONUP: self.mouse_dragging = False def seek_to_position(self, mouse_x, bar_x_start, bar_width): """Seek to position based on mouse click/drag on timeline""" if not self.current_cap or not self.is_video(self.media_files[self.current_index]): return # Calculate position ratio relative_x = mouse_x - bar_x_start position_ratio = max(0, min(1, relative_x / bar_width)) # Calculate target frame target_frame = int(position_ratio * (self.total_frames - 1)) target_frame = max(0, min(target_frame, self.total_frames - 1)) # Seek to target frame self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) self.load_current_frame() def advance_frame(self): """Advance to next frame(s) based on playback speed""" if ( not self.is_video(self.media_files[self.current_index]) or not self.is_playing ): return if self.multi_segment_mode: # Update all segment frames self.update_segment_frames() return True else: frames_to_skip = self.calculate_frames_to_skip() for _ in range(frames_to_skip + 1): ret, frame = self.current_cap.read() if not ret: return False self.current_display_frame = frame self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES)) # Update watch tracking self.update_watch_tracking() return True def seek_video(self, frames_delta: int): """Seek video by specified number of frames""" if not self.is_video(self.media_files[self.current_index]): return if self.multi_segment_mode: self.seek_all_segments(frames_delta) else: if not self.current_cap: return target_frame = max( 0, min(self.current_frame + frames_delta, self.total_frames - 1) ) self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) self.load_current_frame() def process_seek_key(self, key: int) -> bool: """Process seeking keys with proper rate limiting""" current_time = time.time() seek_direction = 0 seek_amount = 0 seek_multiplier = 1 # Default multiplier # Check for A/D keys with modifiers if key == ord("a") or key == ord("A"): seek_direction = -1 # SHIFT+A gives uppercase A if key == ord("A"): seek_multiplier = self.SHIFT_SEEK_MULTIPLIER elif key == ord("d") or key == ord("D"): seek_direction = 1 # SHIFT+D gives uppercase D if key == ord("D"): seek_multiplier = self.SHIFT_SEEK_MULTIPLIER elif key == 1: # CTRL+A seek_direction = -1 seek_multiplier = self.CTRL_SEEK_MULTIPLIER elif key == 4: # CTRL+D seek_direction = 1 seek_multiplier = self.CTRL_SEEK_MULTIPLIER elif key == ord(","): seek_amount = -self.fine_seek_frames elif key == ord("."): seek_amount = self.fine_seek_frames else: if self.current_seek_key is not None: self.current_seek_key = None self.is_seeking = False return False # Handle fine seeking (comma/period) - always immediate if seek_amount != 0: self.seek_video(seek_amount) return True # Handle A/D key seeking with rate limiting and modifiers if seek_direction != 0: if self.current_seek_key != key: self.current_seek_key = key self.key_first_press_time = current_time self.last_seek_time = current_time self.is_seeking = True seek_amount = seek_direction * self.coarse_seek_frames * seek_multiplier self.seek_video(seek_amount) return True elif self.is_seeking: time_since_last_seek = current_time - self.last_seek_time time_held = current_time - self.key_first_press_time if time_since_last_seek >= self.KEY_REPEAT_RATE_SEC: self.last_seek_time = current_time if time_held > self.FAST_SEEK_ACTIVATION_TIME: seek_amount = seek_direction * self.fast_seek_frames * seek_multiplier else: seek_amount = seek_direction * self.coarse_seek_frames * seek_multiplier self.seek_video(seek_amount) return True return False def grade_media(self, grade: int): """Move current media file to grade directory""" if not self.media_files or grade < 1 or grade > 5: return current_file = self.media_files[self.current_index] grade_dir = self.directory / str(grade) # Create grade directory if it doesn't exist grade_dir.mkdir(exist_ok=True) destination = grade_dir / current_file.name counter = 1 while destination.exists(): stem = current_file.stem suffix = current_file.suffix destination = grade_dir / f"{stem}_{counter}{suffix}" counter += 1 # Track this move for undo functionality BEFORE making changes self.undo_history.append((str(destination), str(current_file), self.current_index)) # Release video capture to unlock the file before moving if self.current_cap: self.current_cap.release() self.current_cap = None # Also release segment captures if in multi-segment mode if self.multi_segment_mode: self.cleanup_segment_captures() try: shutil.move(str(current_file), str(destination)) print(f"Moved {current_file.name} to grade {grade}") self.media_files.pop(self.current_index) if self.current_index >= len(self.media_files): self.current_index = 0 if not self.media_files: print("No more media files to grade!") return False except Exception as e: print(f"Error moving file: {e}") # Remove the undo entry since the move failed self.undo_history.pop() return True def undo_last_action(self): """Undo the last grading action by moving file back and restoring to media list""" if not self.undo_history: print("No actions to undo!") return False # Get the last action moved_file_path, original_file_path, original_index = self.undo_history.pop() # Release video capture to unlock any current file before moving if self.current_cap: self.current_cap.release() self.current_cap = None try: # Move the file back to its original location shutil.move(moved_file_path, original_file_path) # Add the file back to the media list at its original position original_file = Path(original_file_path) # Insert the file back at the appropriate position if original_index <= len(self.media_files): self.media_files.insert(original_index, original_file) else: self.media_files.append(original_file) # Navigate to the restored file print("Navigating to: ", original_index) self.current_index = original_index print(f"Undone: Moved {original_file.name} back from grade folder") return True except Exception as e: print(f"Error undoing action: {e}") # If undo failed, put the action back in history self.undo_history.append((moved_file_path, original_file_path, original_index)) return False def run(self): """Main grading loop""" self.media_files = self.find_media_files() if not self.media_files: print("No media files found in directory!") return print(f"Found {len(self.media_files)} media files") print("Controls:") print(" Space: Pause/Play") print(" A/D: Seek backward/forward (hold for FAST seek)") print(" Shift+A/D: Seek backward/forward (5x multiplier)") print(" Ctrl+A/D: Seek backward/forward (10x multiplier)") print(" , / . : Frame-by-frame seek (fine control)") print(" W/S: Decrease/Increase playback speed") print(" G: Toggle multi-segment mode (videos only)") print(" 1-5: Grade and move file") print(" N: Next file") print(" P: Previous file") print(" U: Undo last grading action") print(" L: Sample video at key points (videos only)") print(" H: Undo last L jump (videos only, disabled in multi-segment)") print(" J: Bisect backwards from current position (videos only, disabled in multi-segment)") print(" K: Bisect forwards toward next sample (videos only, disabled in multi-segment)") print(" Q/ESC: Quit") cv2.namedWindow("Media Grader", cv2.WINDOW_NORMAL) cv2.setMouseCallback("Media Grader", self.mouse_callback) while self.media_files and self.current_index < len(self.media_files): current_file = self.media_files[self.current_index] if not self.load_media(current_file): print(f"Could not load {current_file}") self.current_index += 1 continue # Setup multi-segment mode if enabled and this is a video if self.multi_segment_mode and self.is_video(current_file): self.setup_segment_captures() window_title = f"Media Grader - {current_file.name} ({self.current_index + 1}/{len(self.media_files)})" cv2.setWindowTitle("Media Grader", window_title) while True: self.display_current_frame() if self.is_video(current_file): if self.is_seeking: delay = self.FRAME_RENDER_TIME_MS else: delay = self.calculate_frame_delay() else: delay = self.IMAGE_DISPLAY_DELAY_MS key = cv2.waitKey(delay) & 0xFF if key == ord("q") or key == 27: return elif key == ord(" "): self.is_playing = not self.is_playing elif key == ord("s"): self.playback_speed = max( self.MIN_PLAYBACK_SPEED, self.playback_speed - self.SPEED_INCREMENT, ) elif key == ord("w"): self.playback_speed = min( self.MAX_PLAYBACK_SPEED, self.playback_speed + self.SPEED_INCREMENT, ) elif self.process_seek_key(key): pass elif key == ord("n"): break elif key == ord("p"): self.current_index = max(0, self.current_index - 1) print("Navigating to: ", self.current_index) break elif key == ord("u"): if self.undo_last_action(): # File was restored, reload it break elif key == ord("l"): # Jump to largest unwatched region (works in both modes) self.jump_to_unwatched_region() elif key == ord("j"): if not self.multi_segment_mode: self.bisect_backwards() else: print("Navigation keys (H/J/K/L) disabled in multi-segment mode") elif key == ord("k"): if not self.multi_segment_mode: self.bisect_forwards() else: print("Navigation keys (H/J/K/L) disabled in multi-segment mode") elif key == ord("h"): # Changed from "j" to "h" for undo jump if not self.multi_segment_mode: self.undo_jump() else: print("Navigation keys (H/J/K/L) disabled in multi-segment mode") elif key == ord("g"): self.toggle_multi_segment_mode() elif key in [ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]: grade = int(chr(key)) if not self.grade_media(grade): return break elif key == 255: if self.is_seeking and self.current_seek_key is not None: self.process_seek_key(self.current_seek_key) if ( self.is_playing and self.is_video(current_file) and not self.is_seeking ): if not self.advance_frame(): # Video reached the end, restart it instead of navigating self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, 0) self.current_frame = 0 self.load_current_frame() if key not in [ord("p"), ord("u"), ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]: print("Navigating to (pu12345): ", self.current_index) self.current_index += 1 if self.current_cap: self.current_cap.release() self.cleanup_segment_captures() cv2.destroyAllWindows() print("Grading session complete!") def main(): parser = argparse.ArgumentParser( description="Media Grader - Grade media files by moving them to numbered folders" ) parser.add_argument( "directory", nargs="?", default=".", help="Directory to scan for media files (default: current directory)", ) parser.add_argument( "--seek-frames", type=int, default=30, help="Number of frames to seek when using arrow keys (default: 30)", ) parser.add_argument( "--snap-to-iframe", action="store_true", help="Snap to I-frames when seeking backward for better performance", ) args = parser.parse_args() if not os.path.isdir(args.directory): print(f"Error: {args.directory} is not a valid directory") sys.exit(1) grader = MediaGrader(args.directory, args.seek_frames, args.snap_to_iframe) try: grader.run() except KeyboardInterrupt: print("\nGrading session interrupted") except Exception as e: print(f"Error: {e}") sys.exit(1) if __name__ == "__main__": main()