Files
py-media-grader/main.py

347 lines
14 KiB
Python

import os
import sys
import glob
import cv2
import argparse
import shutil
import time
from pathlib import Path
from typing import List, Tuple, Optional
class MediaGrader:
# Configuration constants
DEFAULT_FPS = 30
BASE_FRAME_DELAY_MS = 33 # ~30 FPS
KEY_REPEAT_THRESHOLD_SEC = 0.5
WINDOW_MAX_WIDTH = 1200
WINDOW_MAX_HEIGHT = 800
WINDOW_MAX_SCALE_UP = 2.0
SPEED_INCREMENT = 0.1
MIN_PLAYBACK_SPEED = 0.1
MAX_PLAYBACK_SPEED = 100.0
FAST_SEEK_MULTIPLIER = 500
IFRAME_SNAP_INTERVAL = 30
IMAGE_DISPLAY_DELAY_MS = 100
def __init__(self, directory: str, seek_frames: int = 30, snap_to_iframe: bool = False):
self.directory = Path(directory)
self.seek_frames = seek_frames
self.snap_to_iframe = snap_to_iframe
self.current_index = 0
self.playback_speed = 1.0
self.media_files = []
self.current_cap = None
self.is_playing = True
self.current_frame = 0
self.total_frames = 0
# Key repeat tracking
self.last_key_time = 0
self.last_key = None
# Seeking modes
self.fine_seek_frames = 1 # Frame-by-frame
self.coarse_seek_frames = self.seek_frames # User-configurable
self.fast_seek_frames = self.seek_frames * self.FAST_SEEK_MULTIPLIER
# Supported media extensions
self.extensions = ['*.png', '*.jpg', '*.jpeg', '*.gif', '*.mp4', '*.avi', '*.mov', '*.mkv']
# Create grade directories
for i in range(1, 6):
grade_dir = self.directory / str(i)
grade_dir.mkdir(exist_ok=True)
def find_media_files(self) -> List[Path]:
"""Find all media files recursively in the directory"""
media_files = []
for ext in self.extensions:
pattern = str(self.directory / "**" / ext)
files = glob.glob(pattern, recursive=True)
media_files.extend([Path(f) for f in files])
# Filter out files already in grade directories
filtered_files = []
for file in media_files:
# Check if file is not in a grade directory (1-5)
if not any(part in ['1', '2', '3', '4', '5'] for part in file.parts):
filtered_files.append(file)
return sorted(filtered_files)
def is_video(self, file_path: Path) -> bool:
"""Check if file is a video"""
return file_path.suffix.lower() in ['.mp4', '.avi', '.mov', '.mkv', '.gif']
def calculate_frame_delay(self) -> int:
"""Calculate frame delay in milliseconds based on playback speed"""
if not self.is_playing:
return 0 # No delay when paused
# Base delay for 30 FPS, adjusted by playback speed
delay_ms = int(self.BASE_FRAME_DELAY_MS / self.playback_speed)
return max(1, delay_ms) # Minimum 1ms delay
def load_media(self, file_path: Path) -> bool:
"""Load media file for display"""
if self.current_cap:
self.current_cap.release()
if self.is_video(file_path):
self.current_cap = cv2.VideoCapture(str(file_path))
if not self.current_cap.isOpened():
return False
self.total_frames = int(self.current_cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.current_frame = 0
else:
# For images, we'll just display them
self.current_cap = None
self.total_frames = 1
self.current_frame = 0
return True
def display_media(self, file_path: Path) -> Optional[Tuple[bool, any]]:
"""Display current media file"""
if self.is_video(file_path):
if not self.current_cap:
return None
ret, frame = self.current_cap.read()
if not ret:
return False, None
self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES))
return True, frame
else:
# Display image
frame = cv2.imread(str(file_path))
if frame is None:
return False, None
return True, frame
def auto_resize_window(self, frame):
"""Auto-resize window to fit media while respecting screen limits"""
height, width = frame.shape[:2]
# Calculate scaling factor to fit within max dimensions
scale_w = self.WINDOW_MAX_WIDTH / width if width > self.WINDOW_MAX_WIDTH else 1.0
scale_h = self.WINDOW_MAX_HEIGHT / height if height > self.WINDOW_MAX_HEIGHT else 1.0
scale = min(scale_w, scale_h)
# Don't scale up small images too much
if scale > self.WINDOW_MAX_SCALE_UP:
scale = self.WINDOW_MAX_SCALE_UP
new_width = int(width * scale)
new_height = int(height * scale)
cv2.resizeWindow('Media Grader', new_width, new_height)
def seek_video(self, frames_delta: int):
"""Seek video by specified number of frames"""
if not self.current_cap or not self.is_video(self.media_files[self.current_index]):
return
new_frame = max(0, min(self.current_frame + frames_delta, self.total_frames - 1))
if self.snap_to_iframe and frames_delta < 0:
# Find previous I-frame (approximation)
new_frame = max(0, new_frame - (new_frame % self.IFRAME_SNAP_INTERVAL))
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, new_frame)
self.current_frame = new_frame
def handle_seeking_key(self, key: int) -> bool:
"""Handle seeking keys with different granularities. Returns True if key was handled."""
current_time = time.time()
# Determine seek amount based on key and timing
seek_amount = 0
# Try different arrow key detection methods
if key == 2424832 or key == 81: # Left arrow (different systems)
if self.last_key == key and (current_time - self.last_key_time) < self.KEY_REPEAT_THRESHOLD_SEC:
seek_amount = -self.fast_seek_frames
else:
seek_amount = -self.coarse_seek_frames
elif key == 2555904 or key == 83: # Right arrow (different systems)
if self.last_key == key and (current_time - self.last_key_time) < self.KEY_REPEAT_THRESHOLD_SEC:
seek_amount = self.fast_seek_frames
else:
seek_amount = self.coarse_seek_frames
elif key == ord(','): # Comma - fine seek backward
seek_amount = -self.fine_seek_frames
elif key == ord('.'): # Period - fine seek forward
seek_amount = self.fine_seek_frames
else:
return False
self.seek_video(seek_amount)
self.last_key = key
self.last_key_time = current_time
return True
def grade_media(self, grade: int):
"""Move current media file to grade directory"""
if not self.media_files or grade < 1 or grade > 5:
return
current_file = self.media_files[self.current_index]
grade_dir = self.directory / str(grade)
destination = grade_dir / current_file.name
# Handle name conflicts
counter = 1
while destination.exists():
stem = current_file.stem
suffix = current_file.suffix
destination = grade_dir / f"{stem}_{counter}{suffix}"
counter += 1
try:
shutil.move(str(current_file), str(destination))
print(f"Moved {current_file.name} to grade {grade}")
# Remove from current list
self.media_files.pop(self.current_index)
# Adjust current index
if self.current_index >= len(self.media_files):
self.current_index = 0
if not self.media_files:
print("No more media files to grade!")
return False
except Exception as e:
print(f"Error moving file: {e}")
return True
def run(self):
"""Main grading loop"""
self.media_files = self.find_media_files()
if not self.media_files:
print("No media files found in directory!")
return
print(f"Found {len(self.media_files)} media files")
print("Controls:")
print(" Space: Pause/Play")
print(" Left/Right: Seek backward/forward (accelerates on repeat)")
print(" , / . : Frame-by-frame seek (fine control)")
print(" A/D: Decrease/Increase playback speed")
print(" 1-5: Grade and move file")
print(" N: Next file")
print(" P: Previous file")
print(" Q/ESC: Quit")
cv2.namedWindow('Media Grader', cv2.WINDOW_NORMAL)
while self.media_files and self.current_index < len(self.media_files):
current_file = self.media_files[self.current_index]
if not self.load_media(current_file):
print(f"Could not load {current_file}")
self.current_index += 1
continue
window_title = f"Media Grader - {current_file.name} ({self.current_index + 1}/{len(self.media_files)})"
cv2.setWindowTitle('Media Grader', window_title)
window_resized = False
while True:
# Only advance frame if playing (for videos)
if self.is_playing or not self.is_video(current_file):
result = self.display_media(current_file)
if result is None or not result[0]:
break
ret, frame = result
# Auto-resize window on first frame
if not window_resized:
self.auto_resize_window(frame)
window_resized = True
# Add info overlay
info_text = f"Speed: {self.playback_speed:.1f}x | Frame: {self.current_frame}/{self.total_frames} | File: {self.current_index + 1}/{len(self.media_files)}"
help_text = "Seek: ←→ (accel) ,. (fine) | A/D speed | 1-5 grade | Space pause | Q quit"
# White background for text visibility
cv2.putText(frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
cv2.putText(frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 1)
cv2.putText(frame, help_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
cv2.putText(frame, help_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)
cv2.imshow('Media Grader', frame)
# Calculate appropriate delay
delay = self.calculate_frame_delay() if self.is_video(current_file) else self.IMAGE_DISPLAY_DELAY_MS
key = cv2.waitKey(delay) & 0xFF
# Debug: print key codes to help with arrow key detection
if key != 255: # 255 means no key pressed
print(f"Key pressed: {key}")
if key == ord('q') or key == 27: # Q or ESC
return
elif key == ord(' '): # Space - pause/play
self.is_playing = not self.is_playing
elif key == ord('a'): # A - decrease speed
self.playback_speed = max(self.MIN_PLAYBACK_SPEED, self.playback_speed - self.SPEED_INCREMENT)
elif key == ord('d'): # D - increase speed
self.playback_speed = min(self.MAX_PLAYBACK_SPEED, self.playback_speed + self.SPEED_INCREMENT)
elif self.handle_seeking_key(key):
# Seeking was handled
pass
elif key == ord('n'): # Next file
break
elif key == ord('p'): # Previous file
self.current_index = max(0, self.current_index - 1)
break
elif key in [ord('1'), ord('2'), ord('3'), ord('4'), ord('5')]: # Grade
grade = int(chr(key))
if not self.grade_media(grade):
return
break
if key not in [ord('p')]: # Don't increment for previous
self.current_index += 1
if self.current_cap:
self.current_cap.release()
cv2.destroyAllWindows()
print("Grading session complete!")
def main():
parser = argparse.ArgumentParser(description='Media Grader - Grade media files by moving them to numbered folders')
parser.add_argument('directory', nargs='?', default='.', help='Directory to scan for media files (default: current directory)')
parser.add_argument('--seek-frames', type=int, default=30, help='Number of frames to seek when using arrow keys (default: 30)')
parser.add_argument('--snap-to-iframe', action='store_true', help='Snap to I-frames when seeking backward for better performance')
args = parser.parse_args()
if not os.path.isdir(args.directory):
print(f"Error: {args.directory} is not a valid directory")
sys.exit(1)
grader = MediaGrader(args.directory, args.seek_frames, args.snap_to_iframe)
try:
grader.run()
except KeyboardInterrupt:
print("\nGrading session interrupted")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()