Files
py-media-grader/main.py

497 lines
17 KiB
Python

import os
import sys
import glob
import cv2
import argparse
import shutil
import time
from pathlib import Path
from typing import List, Tuple, Optional
class MediaGrader:
# Configuration constants
DEFAULT_FPS = 30
BASE_FRAME_DELAY_MS = 33 # ~30 FPS
KEY_REPEAT_THRESHOLD_SEC = 0.2 # Faster detection for repeat
FAST_SEEK_ACTIVATION_TIME = 0.5 # How long to hold before fast seek
WINDOW_MAX_WIDTH = 1200
WINDOW_MAX_HEIGHT = 800
WINDOW_MAX_SCALE_UP = 2.0
SPEED_INCREMENT = 0.1
MIN_PLAYBACK_SPEED = 0.1
MAX_PLAYBACK_SPEED = 100.0
FAST_SEEK_MULTIPLIER = 5
IFRAME_SNAP_INTERVAL = 30
IMAGE_DISPLAY_DELAY_MS = 100
def __init__(
self, directory: str, seek_frames: int = 30, snap_to_iframe: bool = False
):
self.directory = Path(directory)
self.seek_frames = seek_frames
self.snap_to_iframe = snap_to_iframe
self.current_index = 0
self.playback_speed = 1.0
self.media_files = []
self.current_cap = None
self.is_playing = True
self.current_frame = 0
self.total_frames = 0
# Key repeat tracking
self.last_key_time = 0
self.last_key = None
self.key_first_press_time = 0
# Seeking modes
self.fine_seek_frames = 1 # Frame-by-frame
self.coarse_seek_frames = self.seek_frames # User-configurable
self.fast_seek_frames = self.seek_frames * self.FAST_SEEK_MULTIPLIER
# Current frame cache for display
self.current_display_frame = None
# Supported media extensions
self.extensions = [
"*.png",
"*.jpg",
"*.jpeg",
"*.gif",
"*.mp4",
"*.avi",
"*.mov",
"*.mkv",
]
# Create grade directories
for i in range(1, 6):
grade_dir = self.directory / str(i)
grade_dir.mkdir(exist_ok=True)
def find_media_files(self) -> List[Path]:
"""Find all media files recursively in the directory"""
media_files = []
for ext in self.extensions:
pattern = str(self.directory / "**" / ext)
files = glob.glob(pattern, recursive=True)
media_files.extend([Path(f) for f in files])
# Filter out files already in grade directories
filtered_files = []
for file in media_files:
# Check if file is not in a grade directory (1-5)
if not any(part in ["1", "2", "3", "4", "5"] for part in file.parts):
filtered_files.append(file)
return sorted(filtered_files)
def is_video(self, file_path: Path) -> bool:
"""Check if file is a video"""
return file_path.suffix.lower() in [".mp4", ".avi", ".mov", ".mkv", ".gif"]
def calculate_frame_delay(self) -> int:
"""Calculate frame delay in milliseconds based on playback speed"""
# Base delay for 30 FPS, adjusted by playback speed
delay_ms = int(self.BASE_FRAME_DELAY_MS / self.playback_speed)
return max(1, delay_ms) # Minimum 1ms delay
def calculate_frames_to_skip(self) -> int:
"""Calculate how many frames to skip for high-speed playback"""
if self.playback_speed <= 1.0:
return 0
elif self.playback_speed <= 2.0:
return 0 # No skipping for moderate speeds
elif self.playback_speed <= 5.0:
return int(self.playback_speed - 1) # Skip some frames
else:
return int(self.playback_speed * 2) # Skip many frames for very high speeds
def load_media(self, file_path: Path) -> bool:
"""Load media file for display"""
if self.current_cap:
self.current_cap.release()
if self.is_video(file_path):
self.current_cap = cv2.VideoCapture(str(file_path))
if not self.current_cap.isOpened():
return False
self.total_frames = int(self.current_cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.current_frame = 0
else:
# For images, we'll just display them
self.current_cap = None
self.total_frames = 1
self.current_frame = 0
# Load initial frame
self.load_current_frame()
return True
def load_current_frame(self):
"""Load the current frame into display cache"""
if self.is_video(self.media_files[self.current_index]):
if not self.current_cap:
return False
# Read frame at current position
ret, frame = self.current_cap.read()
if ret:
self.current_display_frame = frame
self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES))
return True
return False
else:
# Load image
frame = cv2.imread(str(self.media_files[self.current_index]))
if frame is not None:
self.current_display_frame = frame
return True
return False
def advance_frame(self):
"""Advance to next frame(s) based on playback speed"""
if (
not self.is_video(self.media_files[self.current_index])
or not self.is_playing
):
return
# Skip frames for high-speed playback
frames_to_skip = self.calculate_frames_to_skip()
for _ in range(frames_to_skip + 1): # +1 to advance at least one frame
ret, frame = self.current_cap.read()
if not ret:
return False
self.current_display_frame = frame
self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES))
return True
def auto_resize_window(self, frame):
"""Auto-resize window to fit media while respecting screen limits"""
height, width = frame.shape[:2]
# Calculate scaling factor to fit within max dimensions
scale_w = (
self.WINDOW_MAX_WIDTH / width if width > self.WINDOW_MAX_WIDTH else 1.0
)
scale_h = (
self.WINDOW_MAX_HEIGHT / height if height > self.WINDOW_MAX_HEIGHT else 1.0
)
scale = min(scale_w, scale_h)
# Don't scale up small images too much
if scale > self.WINDOW_MAX_SCALE_UP:
scale = self.WINDOW_MAX_SCALE_UP
new_width = int(width * scale)
new_height = int(height * scale)
cv2.resizeWindow("Media Grader", new_width, new_height)
def seek_video(self, frames_delta: int):
"""Seek video by specified number of frames"""
if not self.current_cap or not self.is_video(
self.media_files[self.current_index]
):
return
new_frame = max(
0, min(self.current_frame + frames_delta, self.total_frames - 1)
)
if self.snap_to_iframe and frames_delta < 0:
# Find previous I-frame (approximation)
new_frame = max(0, new_frame - (new_frame % self.IFRAME_SNAP_INTERVAL))
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, new_frame)
# Load the frame we just seeked to and display it immediately
self.load_current_frame()
print(f"Seeked by {frames_delta} frames to frame {new_frame}")
def handle_seeking_key(self, key: int) -> bool:
"""Handle seeking keys with different granularities. Returns True if key was handled."""
current_time = time.time()
# Determine seek amount based on key and timing
seek_amount = 0
is_arrow_key = False
# Try different arrow key detection methods
if key == ord("a"): # Left arrow (various systems)
is_arrow_key = True
direction = -1
elif key == ord("d"): # Right arrow (various systems)
is_arrow_key = True
direction = 1
elif key == ord(","): # Comma - fine seek backward
seek_amount = -self.fine_seek_frames
elif key == ord("."): # Period - fine seek forward
seek_amount = self.fine_seek_frames
else:
return False
if is_arrow_key:
# Track key press timing for fast seek detection
if self.last_key != key:
# New key press
self.key_first_press_time = current_time
self.last_key = key
seek_amount = direction * self.coarse_seek_frames
else:
# Repeated key press
time_held = current_time - self.key_first_press_time
time_since_last = current_time - self.last_key_time
print(
f"Key held for {time_held:.2f}s, since last: {time_since_last:.2f}s"
)
if time_held > self.FAST_SEEK_ACTIVATION_TIME:
# Fast seek mode
seek_amount = direction * self.fast_seek_frames
print(f"FAST SEEK: {seek_amount} frames")
else:
# Normal seek
seek_amount = direction * self.coarse_seek_frames
if seek_amount != 0:
self.seek_video(seek_amount)
self.last_key_time = current_time
return True
return False
def grade_media(self, grade: int):
"""Move current media file to grade directory"""
if not self.media_files or grade < 1 or grade > 5:
return
current_file = self.media_files[self.current_index]
grade_dir = self.directory / str(grade)
destination = grade_dir / current_file.name
# Handle name conflicts
counter = 1
while destination.exists():
stem = current_file.stem
suffix = current_file.suffix
destination = grade_dir / f"{stem}_{counter}{suffix}"
counter += 1
try:
shutil.move(str(current_file), str(destination))
print(f"Moved {current_file.name} to grade {grade}")
# Remove from current list
self.media_files.pop(self.current_index)
# Adjust current index
if self.current_index >= len(self.media_files):
self.current_index = 0
if not self.media_files:
print("No more media files to grade!")
return False
except Exception as e:
print(f"Error moving file: {e}")
return True
def run(self):
"""Main grading loop"""
self.media_files = self.find_media_files()
if not self.media_files:
print("No media files found in directory!")
return
print(f"Found {len(self.media_files)} media files")
print("Controls:")
print(" Space: Pause/Play")
print(" A/D: Seek backward/forward (hold for FAST seek)")
print(" , / . : Frame-by-frame seek (fine control)")
print(" W/S: Decrease/Increase playback speed")
print(" 1-5: Grade and move file")
print(" N: Next file")
print(" P: Previous file")
print(" Q/ESC: Quit")
cv2.namedWindow("Media Grader", cv2.WINDOW_NORMAL)
while self.media_files and self.current_index < len(self.media_files):
current_file = self.media_files[self.current_index]
if not self.load_media(current_file):
print(f"Could not load {current_file}")
self.current_index += 1
continue
window_title = f"Media Grader - {current_file.name} ({self.current_index + 1}/{len(self.media_files)})"
cv2.setWindowTitle("Media Grader", window_title)
window_resized = False
while True:
# Always display the current cached frame
if self.current_display_frame is not None:
frame = self.current_display_frame.copy()
# Auto-resize window on first frame
if not window_resized:
self.auto_resize_window(frame)
window_resized = True
# Add info overlay
info_text = f"Speed: {self.playback_speed:.1f}x | Frame: {self.current_frame}/{self.total_frames} | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}"
help_text = "Seek: A/D (hold=FAST) ,. (fine) | W/S speed | 1-5 grade | Space pause | Q quit"
# White background for text visibility
cv2.putText(
frame,
info_text,
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(255, 255, 255),
2,
)
cv2.putText(
frame,
info_text,
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(0, 0, 0),
1,
)
cv2.putText(
frame,
help_text,
(10, 60),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(255, 255, 255),
2,
)
cv2.putText(
frame,
help_text,
(10, 60),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 0, 0),
1,
)
cv2.imshow("Media Grader", frame)
# Calculate appropriate delay
if self.is_video(current_file):
delay = self.calculate_frame_delay()
else:
delay = self.IMAGE_DISPLAY_DELAY_MS
key = cv2.waitKey(delay) & 0xFF
# Debug: print key codes to help with arrow key detection
if key != 255: # 255 means no key pressed
print(f"Key pressed: {key}")
if key == ord("q") or key == 27: # Q or ESC
return
elif key == ord(" "): # Space - pause/play
self.is_playing = not self.is_playing
print(f"{'Playing' if self.is_playing else 'Paused'}")
elif key == ord("s"): # W - decrease speed
self.playback_speed = max(
self.MIN_PLAYBACK_SPEED,
self.playback_speed - self.SPEED_INCREMENT,
)
print(f"Speed: {self.playback_speed:.1f}x")
elif key == ord("w"): # S - increase speed
self.playback_speed = min(
self.MAX_PLAYBACK_SPEED,
self.playback_speed + self.SPEED_INCREMENT,
)
print(f"Speed: {self.playback_speed:.1f}x")
elif self.handle_seeking_key(key):
# Seeking was handled and frame was updated
pass
elif key == ord("n"): # Next file
break
elif key == ord("p"): # Previous file
self.current_index = max(0, self.current_index - 1)
break
elif key in [ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]: # Grade
grade = int(chr(key))
if not self.grade_media(grade):
return
break
elif key == 255: # No key pressed
# Reset key tracking if no key is pressed
if self.last_key is not None:
self.last_key = None
print("Key released")
# Advance frame only if playing (and it's a video)
if self.is_playing and self.is_video(current_file):
if not self.advance_frame():
# End of video
break
if key not in [ord("p")]: # Don't increment for previous
self.current_index += 1
if self.current_cap:
self.current_cap.release()
cv2.destroyAllWindows()
print("Grading session complete!")
def main():
parser = argparse.ArgumentParser(
description="Media Grader - Grade media files by moving them to numbered folders"
)
parser.add_argument(
"directory",
nargs="?",
default=".",
help="Directory to scan for media files (default: current directory)",
)
parser.add_argument(
"--seek-frames",
type=int,
default=30,
help="Number of frames to seek when using arrow keys (default: 30)",
)
parser.add_argument(
"--snap-to-iframe",
action="store_true",
help="Snap to I-frames when seeking backward for better performance",
)
args = parser.parse_args()
if not os.path.isdir(args.directory):
print(f"Error: {args.directory} is not a valid directory")
sys.exit(1)
grader = MediaGrader(args.directory, args.seek_frames, args.snap_to_iframe)
try:
grader.run()
except KeyboardInterrupt:
print("\nGrading session interrupted")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()