import os import sys import glob import cv2 import argparse import shutil import time from pathlib import Path from typing import List class MediaGrader: # Configuration constants BASE_FRAME_DELAY_MS = 16 # ~30 FPS KEY_REPEAT_RATE_SEC = 0.5 # How often to process key repeats FAST_SEEK_ACTIVATION_TIME = 2.0 # How long to hold before fast seek FRAME_RENDER_TIME_MS = 50 # Time to let frames render between seeks SPEED_INCREMENT = 0.2 MIN_PLAYBACK_SPEED = 0.1 MAX_PLAYBACK_SPEED = 100.0 FAST_SEEK_MULTIPLIER = 60 IMAGE_DISPLAY_DELAY_MS = 100 # Timeline configuration TIMELINE_HEIGHT = 60 TIMELINE_MARGIN = 20 TIMELINE_BAR_HEIGHT = 12 TIMELINE_HANDLE_SIZE = 12 TIMELINE_COLOR_BG = (80, 80, 80) TIMELINE_COLOR_PROGRESS = (0, 120, 255) TIMELINE_COLOR_HANDLE = (255, 255, 255) TIMELINE_COLOR_BORDER = (200, 200, 200) def __init__( self, directory: str, seek_frames: int = 30, snap_to_iframe: bool = False ): self.directory = Path(directory) self.seek_frames = seek_frames self.current_index = 0 self.playback_speed = 1.0 self.media_files = [] self.current_cap = None self.is_playing = True self.current_frame = 0 self.total_frames = 0 # Key repeat tracking with rate limiting self.last_seek_time = 0 self.current_seek_key = None self.key_first_press_time = 0 self.is_seeking = False # Seeking modes self.fine_seek_frames = 1 # Frame-by-frame self.coarse_seek_frames = self.seek_frames # User-configurable self.fast_seek_frames = self.seek_frames * self.FAST_SEEK_MULTIPLIER # Current frame cache for display self.current_display_frame = None # Supported media extensions self.extensions = [ ".png", ".jpg", ".jpeg", ".gif", ".mp4", ".avi", ".mov", ".mkv", ] # Mouse interaction for timeline self.mouse_dragging = False self.timeline_rect = None self.window_width = 800 self.window_height = 600 # Undo functionality self.undo_history = [] # List of (source_path, destination_path, original_index) tuples # Watch tracking for "good look" feature self.watched_regions = {} # Dict[file_path: List[Tuple[start_frame, end_frame]]] self.current_watch_start = None # Frame where current viewing session started self.last_frame_position = 0 # Track last known frame position def find_media_files(self) -> List[Path]: """Find all media files recursively in the directory""" media_files = [] for ext in self.extensions: pattern = str(self.directory / "**" / f"*{ext}") files = glob.glob(pattern, recursive=True) media_files.extend([Path(f) for f in files]) # Filter out files already in grade directories filtered_files = [] for file in media_files: # Check if file is not in a grade directory (1-5) if not any(part in ["1", "2", "3", "4", "5"] for part in file.parts): print("Adding file: ", file) filtered_files.append(file) return sorted(filtered_files) def is_video(self, file_path: Path) -> bool: """Check if file is a video""" video_extensions = [".mp4", ".avi", ".mov", ".mkv", ".gif"] return file_path.suffix.lower() in video_extensions def calculate_frame_delay(self) -> int: """Calculate frame delay in milliseconds based on playback speed""" delay_ms = int(self.BASE_FRAME_DELAY_MS / self.playback_speed) return max(1, delay_ms) def calculate_frames_to_skip(self) -> int: """Calculate how many frames to skip for high-speed playback""" if self.playback_speed <= 1.0: return 0 elif self.playback_speed <= 2.0: return 0 elif self.playback_speed <= 5.0: return int(self.playback_speed - 1) else: return int(self.playback_speed * 2) def load_media(self, file_path: Path) -> bool: """Load media file for display""" if self.current_cap: self.current_cap.release() if self.is_video(file_path): # Suppress OpenCV error messages for unsupported codecs self.current_cap = cv2.VideoCapture(str(file_path)) if not self.current_cap.isOpened(): print(f"Warning: Could not open video file {file_path.name} (unsupported codec)") return False self.total_frames = int(self.current_cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.current_frame = 0 else: self.current_cap = None self.total_frames = 1 self.current_frame = 0 # Load initial frame self.load_current_frame() # Start watch tracking session for videos self.start_watch_session() return True def load_current_frame(self): """Load the current frame into display cache""" if self.is_video(self.media_files[self.current_index]): if not self.current_cap: return False ret, frame = self.current_cap.read() if ret: self.current_display_frame = frame self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES)) return True return False else: frame = cv2.imread(str(self.media_files[self.current_index])) if frame is not None: self.current_display_frame = frame return True return False def start_watch_session(self): """Start tracking a new viewing session""" if self.is_video(self.media_files[self.current_index]): self.current_watch_start = self.current_frame self.last_frame_position = self.current_frame def update_watch_tracking(self): """Update watch tracking based on current frame position""" if not self.is_video(self.media_files[self.current_index]) or self.current_watch_start is None: return current_file = str(self.media_files[self.current_index]) # If we've moved more than a few frames from last position, record the watched region if abs(self.current_frame - self.last_frame_position) > 5 or \ abs(self.current_frame - self.current_watch_start) > 30: # Record the region we just watched start_frame = min(self.current_watch_start, self.last_frame_position) end_frame = max(self.current_watch_start, self.last_frame_position) if current_file not in self.watched_regions: self.watched_regions[current_file] = [] # Merge with existing regions if they overlap self.add_watched_region(current_file, start_frame, end_frame) # Start new session from current position self.current_watch_start = self.current_frame self.last_frame_position = self.current_frame def add_watched_region(self, file_path, start_frame, end_frame): """Add a watched region, merging with existing overlapping regions""" if file_path not in self.watched_regions: self.watched_regions[file_path] = [] regions = self.watched_regions[file_path] new_region = [start_frame, end_frame] # Merge overlapping regions merged = [] for region in regions: if new_region[1] < region[0] or new_region[0] > region[1]: # No overlap merged.append(region) else: # Overlap, merge new_region[0] = min(new_region[0], region[0]) new_region[1] = max(new_region[1], region[1]) merged.append(tuple(new_region)) self.watched_regions[file_path] = merged def find_largest_unwatched_region(self): """Find the largest unwatched region in the current video""" if not self.is_video(self.media_files[self.current_index]): return None current_file = str(self.media_files[self.current_index]) watched = self.watched_regions.get(current_file, []) if not watched: # No regions watched yet, return the beginning return (0, self.total_frames // 4) # Sort watched regions by start frame watched.sort(key=lambda x: x[0]) # Find gaps between watched regions gaps = [] # Gap before first watched region if watched[0][0] > 0: gaps.append((0, watched[0][0])) # Gaps between watched regions for i in range(len(watched) - 1): gap_start = watched[i][1] gap_end = watched[i + 1][0] if gap_end > gap_start: gaps.append((gap_start, gap_end)) # Gap after last watched region if watched[-1][1] < self.total_frames: gaps.append((watched[-1][1], self.total_frames)) if not gaps: # Everything watched, return None return None # Return the largest gap largest_gap = max(gaps, key=lambda x: x[1] - x[0]) return largest_gap def jump_to_unwatched_region(self): """Jump to the next unwatched region of the video""" if not self.is_video(self.media_files[self.current_index]): return False current_file = str(self.media_files[self.current_index]) # Get or initialize jump counter for this file if not hasattr(self, 'jump_counters'): self.jump_counters = {} if current_file not in self.jump_counters: self.jump_counters[current_file] = 0 # Define sampling strategy: divide video into segments and sample them segments = 8 # Divide video into 8 segments for sampling segment_size = self.total_frames // segments if segment_size == 0: print("Video too short for sampling") return False # Jump to different segments in a smart order # Start with 1/4, 1/2, 3/4, then fill in the gaps sample_points = [ segment_size * 2, # 1/4 through segment_size * 4, # 1/2 through segment_size * 6, # 3/4 through segment_size * 1, # 1/8 through segment_size * 3, # 3/8 through segment_size * 5, # 5/8 through segment_size * 7, # 7/8 through 0 # Beginning ] current_jump = self.jump_counters[current_file] if current_jump >= len(sample_points): print("All sample points visited! Video fully sampled.") return False target_frame = sample_points[current_jump] target_frame = min(target_frame, self.total_frames - 1) # Jump to the target frame self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) self.load_current_frame() # Increment jump counter self.jump_counters[current_file] += 1 # Calculate percentage through video percentage = (target_frame / self.total_frames) * 100 print(f"Sample {current_jump + 1}/{len(sample_points)}: jumped to frame {target_frame} ({percentage:.1f}% through video)") return True def display_current_frame(self): """Display the current cached frame with overlays""" if self.current_display_frame is None: return frame = self.current_display_frame.copy() # Add info overlay current_file = self.media_files[self.current_index] if self.is_video(self.media_files[self.current_index]): info_text = f"Speed: {self.playback_speed:.1f}x | Frame: {self.current_frame}/{self.total_frames} | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}" cv2.putText( frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2, ) cv2.putText( frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 1 ) # Draw timeline self.draw_timeline(frame) cv2.imshow("Media Grader", frame) def draw_timeline(self, frame): """Draw timeline at the bottom of the frame""" # Only draw timeline for video files if not self.is_video(self.media_files[self.current_index]): return height, width = frame.shape[:2] self.window_height = height self.window_width = width # Timeline background area timeline_y = height - self.TIMELINE_HEIGHT cv2.rectangle(frame, (0, timeline_y), (width, height), (40, 40, 40), -1) # Calculate timeline bar position bar_y = timeline_y + (self.TIMELINE_HEIGHT - self.TIMELINE_BAR_HEIGHT) // 2 bar_x_start = self.TIMELINE_MARGIN bar_x_end = width - self.TIMELINE_MARGIN bar_width = bar_x_end - bar_x_start self.timeline_rect = (bar_x_start, bar_y, bar_width, self.TIMELINE_BAR_HEIGHT) # Draw timeline background cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BG, -1) cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BORDER, 1) # Draw progress for videos if self.total_frames > 0: progress = self.current_frame / max(1, self.total_frames - 1) progress_width = int(bar_width * progress) if progress_width > 0: cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_start + progress_width, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_PROGRESS, -1) # Draw handle handle_x = bar_x_start + progress_width handle_y = bar_y + self.TIMELINE_BAR_HEIGHT // 2 cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_HANDLE, -1) cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_BORDER, 2) def mouse_callback(self, event, x, y, flags, param): """Handle mouse events for timeline interaction""" if not self.timeline_rect or not self.is_video(self.media_files[self.current_index]): return bar_x_start, bar_y, bar_width, bar_height = self.timeline_rect bar_x_end = bar_x_start + bar_width # Check if mouse is over timeline if bar_y <= y <= bar_y + bar_height + 10: # Add some extra height for easier clicking if event == cv2.EVENT_LBUTTONDOWN: if bar_x_start <= x <= bar_x_end: self.mouse_dragging = True self.seek_to_position(x, bar_x_start, bar_width) elif event == cv2.EVENT_MOUSEMOVE and self.mouse_dragging: if bar_x_start <= x <= bar_x_end: self.seek_to_position(x, bar_x_start, bar_width) elif event == cv2.EVENT_LBUTTONUP: self.mouse_dragging = False def seek_to_position(self, mouse_x, bar_x_start, bar_width): """Seek to position based on mouse click/drag on timeline""" if not self.current_cap or not self.is_video(self.media_files[self.current_index]): return # Calculate position ratio relative_x = mouse_x - bar_x_start position_ratio = max(0, min(1, relative_x / bar_width)) # Calculate target frame target_frame = int(position_ratio * (self.total_frames - 1)) target_frame = max(0, min(target_frame, self.total_frames - 1)) # Seek to target frame self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) self.load_current_frame() def advance_frame(self): """Advance to next frame(s) based on playback speed""" if ( not self.is_video(self.media_files[self.current_index]) or not self.is_playing ): return frames_to_skip = self.calculate_frames_to_skip() for _ in range(frames_to_skip + 1): ret, frame = self.current_cap.read() if not ret: return False self.current_display_frame = frame self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES)) # Update watch tracking self.update_watch_tracking() return True def seek_video(self, frames_delta: int): """Seek video by specified number of frames""" if not self.current_cap or not self.is_video( self.media_files[self.current_index] ): return target_frame = max( 0, min(self.current_frame + frames_delta, self.total_frames - 1) ) self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) self.load_current_frame() def process_seek_key(self, key: int) -> bool: """Process seeking keys with proper rate limiting""" current_time = time.time() seek_direction = 0 seek_amount = 0 if key == ord("a"): seek_direction = -1 elif key == ord("d"): seek_direction = 1 elif key == ord(","): seek_amount = -self.fine_seek_frames elif key == ord("."): seek_amount = self.fine_seek_frames else: if self.current_seek_key is not None: self.current_seek_key = None self.is_seeking = False return False # Handle fine seeking (comma/period) - always immediate if seek_amount != 0: self.seek_video(seek_amount) return True # Handle arrow key seeking with rate limiting if seek_direction != 0: if self.current_seek_key != key: self.current_seek_key = key self.key_first_press_time = current_time self.last_seek_time = current_time self.is_seeking = True seek_amount = seek_direction * self.coarse_seek_frames self.seek_video(seek_amount) return True elif self.is_seeking: time_since_last_seek = current_time - self.last_seek_time time_held = current_time - self.key_first_press_time if time_since_last_seek >= self.KEY_REPEAT_RATE_SEC: self.last_seek_time = current_time if time_held > self.FAST_SEEK_ACTIVATION_TIME: seek_amount = seek_direction * self.fast_seek_frames else: seek_amount = seek_direction * self.coarse_seek_frames self.seek_video(seek_amount) return True return False def grade_media(self, grade: int): """Move current media file to grade directory""" if not self.media_files or grade < 1 or grade > 5: return current_file = self.media_files[self.current_index] grade_dir = self.directory / str(grade) # Create grade directory if it doesn't exist grade_dir.mkdir(exist_ok=True) destination = grade_dir / current_file.name counter = 1 while destination.exists(): stem = current_file.stem suffix = current_file.suffix destination = grade_dir / f"{stem}_{counter}{suffix}" counter += 1 # Track this move for undo functionality BEFORE making changes self.undo_history.append((str(destination), str(current_file), self.current_index)) # Release video capture to unlock the file before moving if self.current_cap: self.current_cap.release() self.current_cap = None try: shutil.move(str(current_file), str(destination)) print(f"Moved {current_file.name} to grade {grade}") self.media_files.pop(self.current_index) if self.current_index >= len(self.media_files): self.current_index = 0 if not self.media_files: print("No more media files to grade!") return False except Exception as e: print(f"Error moving file: {e}") # Remove the undo entry since the move failed self.undo_history.pop() return True def undo_last_action(self): """Undo the last grading action by moving file back and restoring to media list""" if not self.undo_history: print("No actions to undo!") return False # Get the last action moved_file_path, original_file_path, original_index = self.undo_history.pop() # Release video capture to unlock any current file before moving if self.current_cap: self.current_cap.release() self.current_cap = None try: # Move the file back to its original location shutil.move(moved_file_path, original_file_path) # Add the file back to the media list at its original position original_file = Path(original_file_path) # Insert the file back at the appropriate position if original_index <= len(self.media_files): self.media_files.insert(original_index, original_file) else: self.media_files.append(original_file) # Navigate to the restored file self.current_index = original_index print(f"Undone: Moved {original_file.name} back from grade folder") return True except Exception as e: print(f"Error undoing action: {e}") # If undo failed, put the action back in history self.undo_history.append((moved_file_path, original_file_path, original_index)) return False def run(self): """Main grading loop""" self.media_files = self.find_media_files() if not self.media_files: print("No media files found in directory!") return print(f"Found {len(self.media_files)} media files") print("Controls:") print(" Space: Pause/Play") print(" A/D: Seek backward/forward (hold for FAST seek)") print(" , / . : Frame-by-frame seek (fine control)") print(" W/S: Decrease/Increase playback speed") print(" 1-5: Grade and move file") print(" N: Next file") print(" P: Previous file") print(" U: Undo last grading action") print(" L: Sample video at key points (videos only)") print(" Q/ESC: Quit") cv2.namedWindow("Media Grader", cv2.WINDOW_NORMAL) cv2.setMouseCallback("Media Grader", self.mouse_callback) while self.media_files and self.current_index < len(self.media_files): current_file = self.media_files[self.current_index] if not self.load_media(current_file): print(f"Could not load {current_file}") self.current_index += 1 continue window_title = f"Media Grader - {current_file.name} ({self.current_index + 1}/{len(self.media_files)})" cv2.setWindowTitle("Media Grader", window_title) while True: self.display_current_frame() if self.is_video(current_file): if self.is_seeking: delay = self.FRAME_RENDER_TIME_MS else: delay = self.calculate_frame_delay() else: delay = self.IMAGE_DISPLAY_DELAY_MS key = cv2.waitKey(delay) & 0xFF if key == ord("q") or key == 27: return elif key == ord(" "): self.is_playing = not self.is_playing elif key == ord("s"): self.playback_speed = max( self.MIN_PLAYBACK_SPEED, self.playback_speed - self.SPEED_INCREMENT, ) elif key == ord("w"): self.playback_speed = min( self.MAX_PLAYBACK_SPEED, self.playback_speed + self.SPEED_INCREMENT, ) elif self.process_seek_key(key): pass elif key == ord("n"): break elif key == ord("p"): self.current_index = max(0, self.current_index - 1) break elif key == ord("u"): if self.undo_last_action(): # File was restored, reload it break elif key == ord("l"): # Jump to largest unwatched region self.jump_to_unwatched_region() elif key in [ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]: grade = int(chr(key)) if not self.grade_media(grade): return break elif key == 255: if self.is_seeking and self.current_seek_key is not None: self.process_seek_key(self.current_seek_key) if ( self.is_playing and self.is_video(current_file) and not self.is_seeking ): if not self.advance_frame(): # Video reached the end, restart it instead of navigating self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, 0) self.current_frame = 0 self.load_current_frame() if key not in [ord("p")]: self.current_index += 1 if self.current_cap: self.current_cap.release() cv2.destroyAllWindows() print("Grading session complete!") def main(): parser = argparse.ArgumentParser( description="Media Grader - Grade media files by moving them to numbered folders" ) parser.add_argument( "directory", nargs="?", default=".", help="Directory to scan for media files (default: current directory)", ) parser.add_argument( "--seek-frames", type=int, default=30, help="Number of frames to seek when using arrow keys (default: 30)", ) parser.add_argument( "--snap-to-iframe", action="store_true", help="Snap to I-frames when seeking backward for better performance", ) args = parser.parse_args() if not os.path.isdir(args.directory): print(f"Error: {args.directory} is not a valid directory") sys.exit(1) grader = MediaGrader(args.directory, args.seek_frames, args.snap_to_iframe) try: grader.run() except KeyboardInterrupt: print("\nGrading session interrupted") except Exception as e: print(f"Error: {e}") sys.exit(1) if __name__ == "__main__": main()