import os import sys import glob import cv2 import argparse import shutil from pathlib import Path from typing import List, Tuple, Optional class MediaGrader: def __init__(self, directory: str, seek_frames: int = 30, snap_to_iframe: bool = False): self.directory = Path(directory) self.seek_frames = seek_frames self.snap_to_iframe = snap_to_iframe self.current_index = 0 self.playback_speed = 1.0 self.media_files = [] self.current_cap = None self.is_playing = True self.current_frame = 0 self.total_frames = 0 # Supported media extensions self.extensions = ['*.png', '*.jpg', '*.jpeg', '*.gif', '*.mp4', '*.avi', '*.mov', '*.mkv'] # Create grade directories for i in range(1, 6): grade_dir = self.directory / str(i) grade_dir.mkdir(exist_ok=True) def find_media_files(self) -> List[Path]: """Find all media files recursively in the directory""" media_files = [] for ext in self.extensions: pattern = str(self.directory / "**" / ext) files = glob.glob(pattern, recursive=True) media_files.extend([Path(f) for f in files]) # Filter out files already in grade directories filtered_files = [] for file in media_files: # Check if file is not in a grade directory (1-5) if not any(part in ['1', '2', '3', '4', '5'] for part in file.parts): filtered_files.append(file) return sorted(filtered_files) def is_video(self, file_path: Path) -> bool: """Check if file is a video""" return file_path.suffix.lower() in ['.mp4', '.avi', '.mov', '.mkv', '.gif'] def load_media(self, file_path: Path) -> bool: """Load media file for display""" if self.current_cap: self.current_cap.release() if self.is_video(file_path): self.current_cap = cv2.VideoCapture(str(file_path)) if not self.current_cap.isOpened(): return False self.total_frames = int(self.current_cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.current_frame = 0 else: # For images, we'll just display them self.current_cap = None self.total_frames = 1 self.current_frame = 0 return True def display_media(self, file_path: Path) -> Optional[Tuple[bool, any]]: """Display current media file""" if self.is_video(file_path): if not self.current_cap: return None ret, frame = self.current_cap.read() if not ret: return False, None self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES)) return True, frame else: # Display image frame = cv2.imread(str(file_path)) if frame is None: return False, None return True, frame def seek_video(self, frames_delta: int): """Seek video by specified number of frames""" if not self.current_cap or not self.is_video(self.media_files[self.current_index]): return new_frame = max(0, min(self.current_frame + frames_delta, self.total_frames - 1)) if self.snap_to_iframe and frames_delta < 0: # Find previous I-frame (approximation) new_frame = max(0, new_frame - (new_frame % 30)) self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, new_frame) self.current_frame = new_frame def grade_media(self, grade: int): """Move current media file to grade directory""" if not self.media_files or grade < 1 or grade > 5: return current_file = self.media_files[self.current_index] grade_dir = self.directory / str(grade) destination = grade_dir / current_file.name # Handle name conflicts counter = 1 while destination.exists(): stem = current_file.stem suffix = current_file.suffix destination = grade_dir / f"{stem}_{counter}{suffix}" counter += 1 try: shutil.move(str(current_file), str(destination)) print(f"Moved {current_file.name} to grade {grade}") # Remove from current list self.media_files.pop(self.current_index) # Adjust current index if self.current_index >= len(self.media_files): self.current_index = 0 if not self.media_files: print("No more media files to grade!") return False except Exception as e: print(f"Error moving file: {e}") return True def run(self): """Main grading loop""" self.media_files = self.find_media_files() if not self.media_files: print("No media files found in directory!") return print(f"Found {len(self.media_files)} media files") print("Controls:") print(" Space: Pause/Play") print(" Left/Right: Seek backward/forward") print(" A/D: Decrease/Increase playback speed") print(" 1-5: Grade and move file") print(" N: Next file") print(" P: Previous file") print(" Q/ESC: Quit") cv2.namedWindow('Media Grader', cv2.WINDOW_NORMAL) while self.media_files and self.current_index < len(self.media_files): current_file = self.media_files[self.current_index] if not self.load_media(current_file): print(f"Could not load {current_file}") self.current_index += 1 continue window_title = f"Media Grader - {current_file.name} ({self.current_index + 1}/{len(self.media_files)})" cv2.setWindowTitle('Media Grader', window_title) delay = int(33 / self.playback_speed) if self.is_video(current_file) else 0 while True: result = self.display_media(current_file) if result is None or not result[0]: break ret, frame = result # Add info overlay info_text = f"Speed: {self.playback_speed:.1f}x | Frame: {self.current_frame}/{self.total_frames} | File: {self.current_index + 1}/{len(self.media_files)}" cv2.putText(frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) cv2.putText(frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 1) cv2.imshow('Media Grader', frame) key = cv2.waitKey(delay) & 0xFF if key == ord('q') or key == 27: # Q or ESC return elif key == ord(' '): # Space - pause/play self.is_playing = not self.is_playing delay = int(33 / self.playback_speed) if self.is_playing and self.is_video(current_file) else 0 elif key == 81 or key == ord('a'): # Left arrow or A - decrease speed if key == 81: # Left arrow - seek backward self.seek_video(-self.seek_frames) else: # A - decrease speed self.playback_speed = max(0.1, self.playback_speed - 0.1) delay = int(33 / self.playback_speed) if self.is_video(current_file) else 0 elif key == 83 or key == ord('d'): # Right arrow or D if key == 83: # Right arrow - seek forward self.seek_video(self.seek_frames) else: # D - increase speed self.playback_speed = min(5.0, self.playback_speed + 0.1) delay = int(33 / self.playback_speed) if self.is_video(current_file) else 0 elif key == ord('n'): # Next file break elif key == ord('p'): # Previous file self.current_index = max(0, self.current_index - 1) break elif key in [ord('1'), ord('2'), ord('3'), ord('4'), ord('5')]: # Grade grade = int(chr(key)) if not self.grade_media(grade): return break if not self.is_playing and not self.is_video(current_file): # For images, wait indefinitely when paused continue if key not in [ord('p')]: # Don't increment for previous self.current_index += 1 if self.current_cap: self.current_cap.release() cv2.destroyAllWindows() print("Grading session complete!") def main(): parser = argparse.ArgumentParser(description='Media Grader - Grade media files by moving them to numbered folders') parser.add_argument('directory', nargs='?', default='.', help='Directory to scan for media files (default: current directory)') parser.add_argument('--seek-frames', type=int, default=30, help='Number of frames to seek when using arrow keys (default: 30)') parser.add_argument('--snap-to-iframe', action='store_true', help='Snap to I-frames when seeking backward for better performance') args = parser.parse_args() if not os.path.isdir(args.directory): print(f"Error: {args.directory} is not a valid directory") sys.exit(1) grader = MediaGrader(args.directory, args.seek_frames, args.snap_to_iframe) try: grader.run() except KeyboardInterrupt: print("\nGrading session interrupted") except Exception as e: print(f"Error: {e}") sys.exit(1) if __name__ == "__main__": main()