import os import sys import glob import cv2 import argparse import shutil import time from pathlib import Path from typing import List, Tuple, Optional class MediaGrader: # Configuration constants DEFAULT_FPS = 30 BASE_FRAME_DELAY_MS = 33 # ~30 FPS KEY_REPEAT_THRESHOLD_SEC = 0.5 # Faster detection for repeat FAST_SEEK_ACTIVATION_TIME = 0.5 # How long to hold before fast seek WINDOW_MAX_WIDTH = 1200 WINDOW_MAX_HEIGHT = 800 WINDOW_MAX_SCALE_UP = 2.0 SPEED_INCREMENT = 0.1 MIN_PLAYBACK_SPEED = 0.1 MAX_PLAYBACK_SPEED = 100.0 FAST_SEEK_MULTIPLIER = 5 IFRAME_SNAP_INTERVAL = 30 IMAGE_DISPLAY_DELAY_MS = 100 SEEK_DISPLAY_INTERVAL = 10 # Update display every N frames during seeking def __init__( self, directory: str, seek_frames: int = 30, snap_to_iframe: bool = False ): self.directory = Path(directory) self.seek_frames = seek_frames self.snap_to_iframe = snap_to_iframe self.current_index = 0 self.playback_speed = 1.0 self.media_files = [] self.current_cap = None self.is_playing = True self.current_frame = 0 self.total_frames = 0 # Key repeat tracking self.last_key_time = 0 self.last_key = None self.key_first_press_time = 0 # Seeking modes self.fine_seek_frames = 1 # Frame-by-frame self.coarse_seek_frames = self.seek_frames # User-configurable self.fast_seek_frames = self.seek_frames * self.FAST_SEEK_MULTIPLIER # Current frame cache for display self.current_display_frame = None self.window_resized = False # Supported media extensions self.extensions = [ "*.png", "*.jpg", "*.jpeg", "*.gif", "*.mp4", "*.avi", "*.mov", "*.mkv", ] # Create grade directories for i in range(1, 6): grade_dir = self.directory / str(i) grade_dir.mkdir(exist_ok=True) def find_media_files(self) -> List[Path]: """Find all media files recursively in the directory""" media_files = [] for ext in self.extensions: pattern = str(self.directory / "**" / ext) files = glob.glob(pattern, recursive=True) media_files.extend([Path(f) for f in files]) # Filter out files already in grade directories filtered_files = [] for file in media_files: # Check if file is not in a grade directory (1-5) if not any(part in ["1", "2", "3", "4", "5"] for part in file.parts): filtered_files.append(file) return sorted(filtered_files) def is_video(self, file_path: Path) -> bool: """Check if file is a video""" return file_path.suffix.lower() in [".mp4", ".avi", ".mov", ".mkv", ".gif"] def calculate_frame_delay(self) -> int: """Calculate frame delay in milliseconds based on playback speed""" # Base delay for 30 FPS, adjusted by playback speed delay_ms = int(self.BASE_FRAME_DELAY_MS / self.playback_speed) return max(1, delay_ms) # Minimum 1ms delay def calculate_frames_to_skip(self) -> int: """Calculate how many frames to skip for high-speed playback""" if self.playback_speed <= 1.0: return 0 elif self.playback_speed <= 2.0: return 0 # No skipping for moderate speeds elif self.playback_speed <= 5.0: return int(self.playback_speed - 1) # Skip some frames else: return int(self.playback_speed * 2) # Skip many frames for very high speeds def load_media(self, file_path: Path) -> bool: """Load media file for display""" if self.current_cap: self.current_cap.release() if self.is_video(file_path): self.current_cap = cv2.VideoCapture(str(file_path)) if not self.current_cap.isOpened(): return False self.total_frames = int(self.current_cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.current_frame = 0 else: # For images, we'll just display them self.current_cap = None self.total_frames = 1 self.current_frame = 0 # Load initial frame self.load_current_frame() self.window_resized = False return True def load_current_frame(self): """Load the current frame into display cache""" if self.is_video(self.media_files[self.current_index]): if not self.current_cap: return False # Read frame at current position ret, frame = self.current_cap.read() if ret: self.current_display_frame = frame self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES)) return True return False else: # Load image frame = cv2.imread(str(self.media_files[self.current_index])) if frame is not None: self.current_display_frame = frame return True return False def display_current_frame(self): """Display the current cached frame with overlays""" if self.current_display_frame is None: return frame = self.current_display_frame.copy() # Auto-resize window on first frame if not self.window_resized: self.auto_resize_window(frame) self.window_resized = True # Add info overlay current_file = self.media_files[self.current_index] info_text = f"Speed: {self.playback_speed:.1f}x | Frame: {self.current_frame}/{self.total_frames} | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}" help_text = "Seek: A/D (hold=FAST) ,. (fine) | W/S speed | 1-5 grade | Space pause | Q quit" # White background for text visibility cv2.putText( frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2, ) cv2.putText( frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 1, ) cv2.putText( frame, help_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2, ) cv2.putText( frame, help_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, ) cv2.imshow("Media Grader", frame) def advance_frame(self): """Advance to next frame(s) based on playback speed""" if ( not self.is_video(self.media_files[self.current_index]) or not self.is_playing ): return # Skip frames for high-speed playback frames_to_skip = self.calculate_frames_to_skip() for _ in range(frames_to_skip + 1): # +1 to advance at least one frame ret, frame = self.current_cap.read() if not ret: return False self.current_display_frame = frame self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES)) return True def auto_resize_window(self, frame): """Auto-resize window to fit media while respecting screen limits""" height, width = frame.shape[:2] # Calculate scaling factor to fit within max dimensions scale_w = ( self.WINDOW_MAX_WIDTH / width if width > self.WINDOW_MAX_WIDTH else 1.0 ) scale_h = ( self.WINDOW_MAX_HEIGHT / height if height > self.WINDOW_MAX_HEIGHT else 1.0 ) scale = min(scale_w, scale_h) # Don't scale up small images too much if scale > self.WINDOW_MAX_SCALE_UP: scale = self.WINDOW_MAX_SCALE_UP new_width = int(width * scale) new_height = int(height * scale) cv2.resizeWindow("Media Grader", new_width, new_height) def seek_to_iframe(self, target_frame): """Seek to the nearest I-frame at or before target_frame""" if not self.current_cap: return False # For more reliable seeking, always snap to I-frames iframe_frame = ( target_frame // self.IFRAME_SNAP_INTERVAL ) * self.IFRAME_SNAP_INTERVAL iframe_frame = max(0, min(iframe_frame, self.total_frames - 1)) self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, iframe_frame) # If we need to get closer to target, read frames sequentially current_pos = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES)) frames_to_read = target_frame - current_pos if frames_to_read > 0 and frames_to_read < 60: # Only if it's reasonable for i in range(frames_to_read): ret, frame = self.current_cap.read() if not ret: break # Update display every few frames during seeking if i % self.SEEK_DISPLAY_INTERVAL == 0: self.current_display_frame = frame self.current_frame = int( self.current_cap.get(cv2.CAP_PROP_POS_FRAMES) ) self.display_current_frame() cv2.waitKey(1) # Process display events else: # For large seeks, just go to the I-frame ret, frame = self.current_cap.read() if ret: self.current_display_frame = frame self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES)) return True def seek_video(self, frames_delta: int): """Seek video by specified number of frames""" if not self.current_cap or not self.is_video( self.media_files[self.current_index] ): return target_frame = max( 0, min(self.current_frame + frames_delta, self.total_frames - 1) ) print( f"Seeking from {self.current_frame} to {target_frame} (delta: {frames_delta})" ) # Use I-frame seeking for smoother performance if abs(frames_delta) > 5 or self.snap_to_iframe: self.seek_to_iframe(target_frame) else: # For small seeks, use direct frame seeking self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) self.load_current_frame() print(f"Seeked to frame {self.current_frame}") def handle_seeking_key(self, key: int) -> bool: """Handle seeking keys with different granularities. Returns True if key was handled.""" current_time = time.time() # Determine seek amount based on key and timing seek_amount = 0 is_arrow_key = False # Try different arrow key detection methods if key == ord("a"): # Left arrow (various systems) is_arrow_key = True direction = -1 elif key == ord("d"): # Right arrow (various systems) is_arrow_key = True direction = 1 elif key == ord(","): # Comma - fine seek backward seek_amount = -self.fine_seek_frames elif key == ord("."): # Period - fine seek forward seek_amount = self.fine_seek_frames else: return False if is_arrow_key: # Track key press timing for fast seek detection if self.last_key != key: # New key press self.key_first_press_time = current_time self.last_key = key seek_amount = direction * self.coarse_seek_frames else: # Repeated key press time_held = current_time - self.key_first_press_time time_since_last = current_time - self.last_key_time print( f"Key held for {time_held:.2f}s, since last: {time_since_last:.2f}s" ) if time_held > self.FAST_SEEK_ACTIVATION_TIME: # Fast seek mode seek_amount = direction * self.fast_seek_frames print(f"FAST SEEK: {seek_amount} frames") else: # Normal seek seek_amount = direction * self.coarse_seek_frames if seek_amount != 0: self.seek_video(seek_amount) self.last_key_time = current_time return True return False def grade_media(self, grade: int): """Move current media file to grade directory""" if not self.media_files or grade < 1 or grade > 5: return current_file = self.media_files[self.current_index] grade_dir = self.directory / str(grade) destination = grade_dir / current_file.name # Handle name conflicts counter = 1 while destination.exists(): stem = current_file.stem suffix = current_file.suffix destination = grade_dir / f"{stem}_{counter}{suffix}" counter += 1 try: shutil.move(str(current_file), str(destination)) print(f"Moved {current_file.name} to grade {grade}") # Remove from current list self.media_files.pop(self.current_index) # Adjust current index if self.current_index >= len(self.media_files): self.current_index = 0 if not self.media_files: print("No more media files to grade!") return False except Exception as e: print(f"Error moving file: {e}") return True def run(self): """Main grading loop""" self.media_files = self.find_media_files() if not self.media_files: print("No media files found in directory!") return print(f"Found {len(self.media_files)} media files") print("Controls:") print(" Space: Pause/Play") print(" A/D: Seek backward/forward (hold for FAST seek)") print(" , / . : Frame-by-frame seek (fine control)") print(" W/S: Decrease/Increase playback speed") print(" 1-5: Grade and move file") print(" N: Next file") print(" P: Previous file") print(" Q/ESC: Quit") cv2.namedWindow("Media Grader", cv2.WINDOW_NORMAL) while self.media_files and self.current_index < len(self.media_files): current_file = self.media_files[self.current_index] if not self.load_media(current_file): print(f"Could not load {current_file}") self.current_index += 1 continue window_title = f"Media Grader - {current_file.name} ({self.current_index + 1}/{len(self.media_files)})" cv2.setWindowTitle("Media Grader", window_title) while True: # Always display the current cached frame self.display_current_frame() # Calculate appropriate delay if self.is_video(current_file): delay = self.calculate_frame_delay() else: delay = self.IMAGE_DISPLAY_DELAY_MS key = cv2.waitKey(delay) & 0xFF # Debug: print key codes to help with arrow key detection if key != 255: # 255 means no key pressed print(f"Key pressed: {key}") if key == ord("q") or key == 27: # Q or ESC return elif key == ord(" "): # Space - pause/play self.is_playing = not self.is_playing print(f"{'Playing' if self.is_playing else 'Paused'}") elif key == ord("w"): self.playback_speed = max( self.MIN_PLAYBACK_SPEED, self.playback_speed - self.SPEED_INCREMENT, ) print(f"Speed: {self.playback_speed:.1f}x") elif key == ord("w"): self.playback_speed = min( self.MAX_PLAYBACK_SPEED, self.playback_speed + self.SPEED_INCREMENT, ) print(f"Speed: {self.playback_speed:.1f}x") elif self.handle_seeking_key(key): # Seeking was handled and frame was updated pass elif key == ord("n"): # Next file break elif key == ord("p"): # Previous file self.current_index = max(0, self.current_index - 1) break elif key in [ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]: # Grade grade = int(chr(key)) if not self.grade_media(grade): return break elif key == 255: # No key pressed # Reset key tracking if no key is pressed if self.last_key is not None: self.last_key = None print("Key released") # Advance frame only if playing (and it's a video) if self.is_playing and self.is_video(current_file): if not self.advance_frame(): # End of video break if key not in [ord("p")]: # Don't increment for previous self.current_index += 1 if self.current_cap: self.current_cap.release() cv2.destroyAllWindows() print("Grading session complete!") def main(): parser = argparse.ArgumentParser( description="Media Grader - Grade media files by moving them to numbered folders" ) parser.add_argument( "directory", nargs="?", default=".", help="Directory to scan for media files (default: current directory)", ) parser.add_argument( "--seek-frames", type=int, default=30, help="Number of frames to seek when using arrow keys (default: 30)", ) parser.add_argument( "--snap-to-iframe", action="store_true", help="Snap to I-frames when seeking backward for better performance", ) args = parser.parse_args() if not os.path.isdir(args.directory): print(f"Error: {args.directory} is not a valid directory") sys.exit(1) grader = MediaGrader(args.directory, args.seek_frames, args.snap_to_iframe) try: grader.run() except KeyboardInterrupt: print("\nGrading session interrupted") except Exception as e: print(f"Error: {e}") sys.exit(1) if __name__ == "__main__": main()