Files
py-media-grader/main.py

1622 lines
65 KiB
Python

import os
import sys
import glob
import cv2
import numpy as np
import argparse
import shutil
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import List
class MediaGrader:
# Configuration constants
BASE_FRAME_DELAY_MS = 16 # ~30 FPS
KEY_REPEAT_RATE_SEC = 0.5 # How often to process key repeats
FAST_SEEK_ACTIVATION_TIME = 2.0 # How long to hold before fast seek
FRAME_RENDER_TIME_MS = 50 # Time to let frames render between seeks
SPEED_INCREMENT = 0.2
MIN_PLAYBACK_SPEED = 0.1
MAX_PLAYBACK_SPEED = 100.0
FAST_SEEK_MULTIPLIER = 60
IMAGE_DISPLAY_DELAY_MS = 100
# Monitor dimensions for full-screen sizing
MONITOR_WIDTH = 2560
MONITOR_HEIGHT = 1440
# Timeline configuration
TIMELINE_HEIGHT = 60
TIMELINE_MARGIN = 20
TIMELINE_BAR_HEIGHT = 12
TIMELINE_HANDLE_SIZE = 12
TIMELINE_COLOR_BG = (80, 80, 80)
TIMELINE_COLOR_PROGRESS = (0, 120, 255)
TIMELINE_COLOR_HANDLE = (255, 255, 255)
TIMELINE_COLOR_BORDER = (200, 200, 200)
# Seek modifiers for A/D keys
SHIFT_SEEK_MULTIPLIER = 5 # SHIFT + A/D multiplier
CTRL_SEEK_MULTIPLIER = 10 # CTRL + A/D multiplier
# Multi-segment mode configuration
SEGMENT_COUNT = 16 # Number of video segments (2x2 grid)
SEGMENT_OVERLAP_PERCENT = 10 # Percentage overlap between segments
def __init__(
self, directory: str, seek_frames: int = 30, snap_to_iframe: bool = False
):
self.directory = Path(directory)
self.seek_frames = seek_frames
self.current_index = 0
self.playback_speed = 1.0
self.media_files = []
self.current_cap = None
self.is_playing = True
self.current_frame = 0
self.total_frames = 0
# Multi-segment mode state
self.multi_segment_mode = False
self.segment_count = self.SEGMENT_COUNT # Use the class constant
self.segment_overlap_percent = self.SEGMENT_OVERLAP_PERCENT # Use the class constant
self.segment_caps = [] # List of VideoCapture objects for each segment
self.segment_frames = [] # List of current frames for each segment
self.segment_positions = [] # List of frame positions for each segment
# Timeline visibility state
self.timeline_visible = True
# Improved frame cache for performance
self.frame_cache = {} # Dict[frame_number: frame_data]
self.cache_size_limit = 200 # Increased cache size
self.cache_lock = threading.Lock() # Thread safety for cache
# Key repeat tracking with rate limiting
self.last_seek_time = 0
self.current_seek_key = None
self.key_first_press_time = 0
self.is_seeking = False
# Seeking modes
self.fine_seek_frames = 1 # Frame-by-frame
self.coarse_seek_frames = self.seek_frames # User-configurable
self.fast_seek_frames = self.seek_frames * self.FAST_SEEK_MULTIPLIER
# Current frame cache for display
self.current_display_frame = None
# Supported media extensions
self.extensions = [
".png",
".jpg",
".jpeg",
".gif",
".mp4",
".avi",
".mov",
".mkv",
]
# Mouse interaction for timeline
self.mouse_dragging = False
self.timeline_rect = None
self.window_width = 800
self.window_height = 600
# Undo functionality
self.undo_history = [] # List of (source_path, destination_path, original_index) tuples
# Watch tracking for "good look" feature
self.watched_regions = {} # Dict[file_path: List[Tuple[start_frame, end_frame]]]
self.current_watch_start = None # Frame where current viewing session started
self.last_frame_position = 0 # Track last known frame position
# Bisection navigation tracking
self.last_jump_position = {} # Dict[file_path: last_frame] for bisection reference
# Jump history for H key (undo jump)
self.jump_history = {} # Dict[file_path: List[frame_positions]] for jump undo
# Performance optimization: Thread pool for parallel operations
self.thread_pool = ThreadPoolExecutor(max_workers=4)
def display_with_aspect_ratio(self, frame):
"""Display frame while maintaining aspect ratio and maximizing screen usage"""
if frame is None:
return
# Get frame dimensions
frame_height, frame_width = frame.shape[:2]
# Calculate aspect ratio
frame_aspect_ratio = frame_width / frame_height
monitor_aspect_ratio = self.MONITOR_WIDTH / self.MONITOR_HEIGHT
# Determine if frame is vertical or horizontal relative to monitor
if frame_aspect_ratio < monitor_aspect_ratio:
# Frame is more vertical than monitor - maximize height
display_height = self.MONITOR_HEIGHT
display_width = int(display_height * frame_aspect_ratio)
else:
# Frame is more horizontal than monitor - maximize width
display_width = self.MONITOR_WIDTH
display_height = int(display_width / frame_aspect_ratio)
# Resize window to calculated dimensions
cv2.resizeWindow("Media Grader", display_width, display_height)
# Center the window on screen
x_position = (self.MONITOR_WIDTH - display_width) // 2
y_position = (self.MONITOR_HEIGHT - display_height) // 2
cv2.moveWindow("Media Grader", x_position, y_position)
# Display the frame
cv2.imshow("Media Grader", frame)
def find_media_files(self) -> List[Path]:
"""Find all media files recursively in the directory"""
media_files = []
for ext in self.extensions:
pattern = str(self.directory / "**" / f"*{ext}")
files = glob.glob(pattern, recursive=True)
media_files.extend([Path(f) for f in files])
# Filter out files already in grade directories
filtered_files = []
for file in media_files:
# Check if file is not in a grade directory (1-5)
if not any(part in ["1", "2", "3", "4", "5"] for part in file.parts):
print("Adding file: ", file)
filtered_files.append(file)
return sorted(filtered_files)
def is_video(self, file_path: Path) -> bool:
"""Check if file is a video"""
video_extensions = [".mp4", ".avi", ".mov", ".mkv", ".gif"]
return file_path.suffix.lower() in video_extensions
def calculate_frame_delay(self) -> int:
"""Calculate frame delay in milliseconds based on playback speed"""
delay_ms = int(self.BASE_FRAME_DELAY_MS / self.playback_speed)
return max(1, delay_ms)
def calculate_frames_to_skip(self) -> int:
"""Calculate how many frames to skip for high-speed playback"""
if self.playback_speed <= 1.0:
return 0
elif self.playback_speed <= 2.0:
return 0
elif self.playback_speed <= 5.0:
return int(self.playback_speed - 1)
else:
return int(self.playback_speed * 2)
def load_media(self, file_path: Path) -> bool:
"""Load media file for display"""
if self.current_cap:
self.current_cap.release()
if self.is_video(file_path):
# Suppress OpenCV error messages for unsupported codecs
self.current_cap = cv2.VideoCapture(str(file_path))
if not self.current_cap.isOpened():
print(f"Warning: Could not open video file {file_path.name} (unsupported codec)")
return False
self.total_frames = int(self.current_cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.current_frame = 0
else:
self.current_cap = None
self.total_frames = 1
self.current_frame = 0
# Load initial frame
self.load_current_frame()
# Start watch tracking session for videos
self.start_watch_session()
return True
def load_current_frame(self):
"""Load the current frame into display cache"""
if self.is_video(self.media_files[self.current_index]):
if not self.current_cap:
return False
ret, frame = self.current_cap.read()
if ret:
self.current_display_frame = frame
self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES))
return True
return False
else:
frame = cv2.imread(str(self.media_files[self.current_index]))
if frame is not None:
self.current_display_frame = frame
return True
return False
def start_watch_session(self):
"""Start tracking a new viewing session"""
if self.is_video(self.media_files[self.current_index]):
self.current_watch_start = self.current_frame
self.last_frame_position = self.current_frame
def update_watch_tracking(self):
"""Update watch tracking based on current frame position"""
if not self.is_video(self.media_files[self.current_index]) or self.current_watch_start is None:
return
current_file = str(self.media_files[self.current_index])
# If we've moved more than a few frames from last position, record the watched region
if abs(self.current_frame - self.last_frame_position) > 5 or \
abs(self.current_frame - self.current_watch_start) > 30:
# Record the region we just watched
start_frame = min(self.current_watch_start, self.last_frame_position)
end_frame = max(self.current_watch_start, self.last_frame_position)
if current_file not in self.watched_regions:
self.watched_regions[current_file] = []
# Merge with existing regions if they overlap
self.add_watched_region(current_file, start_frame, end_frame)
# Start new session from current position
self.current_watch_start = self.current_frame
self.last_frame_position = self.current_frame
def add_watched_region(self, file_path, start_frame, end_frame):
"""Add a watched region, merging with existing overlapping regions"""
if file_path not in self.watched_regions:
self.watched_regions[file_path] = []
regions = self.watched_regions[file_path]
new_region = [start_frame, end_frame]
# Merge overlapping regions
merged = []
for region in regions:
if new_region[1] < region[0] or new_region[0] > region[1]:
# No overlap
merged.append(region)
else:
# Overlap, merge
new_region[0] = min(new_region[0], region[0])
new_region[1] = max(new_region[1], region[1])
merged.append(tuple(new_region))
self.watched_regions[file_path] = merged
def find_largest_unwatched_region(self):
"""Find the largest unwatched region in the current video"""
if not self.is_video(self.media_files[self.current_index]):
return None
current_file = str(self.media_files[self.current_index])
watched = self.watched_regions.get(current_file, [])
if not watched:
# No regions watched yet, return the beginning
return (0, self.total_frames // 4)
# Sort watched regions by start frame
watched.sort(key=lambda x: x[0])
# Find gaps between watched regions
gaps = []
# Gap before first watched region
if watched[0][0] > 0:
gaps.append((0, watched[0][0]))
# Gaps between watched regions
for i in range(len(watched) - 1):
gap_start = watched[i][1]
gap_end = watched[i + 1][0]
if gap_end > gap_start:
gaps.append((gap_start, gap_end))
# Gap after last watched region
if watched[-1][1] < self.total_frames:
gaps.append((watched[-1][1], self.total_frames))
if not gaps:
# Everything watched, return None
return None
# Return the largest gap
largest_gap = max(gaps, key=lambda x: x[1] - x[0])
return largest_gap
def get_sample_points(self):
"""Get standardized sample points for video navigation"""
segments = 8 # Divide video into 8 segments for sampling
segment_size = self.total_frames // segments
if segment_size == 0:
return []
return [
segment_size * 2, # 1/4 through
segment_size * 4, # 1/2 through
segment_size * 6, # 3/4 through
segment_size * 1, # 1/8 through
segment_size * 3, # 3/8 through
segment_size * 5, # 5/8 through
segment_size * 7, # 7/8 through
0 # Beginning
]
def jump_to_unwatched_region(self):
"""Jump to the next unwatched region of the video"""
if not self.is_video(self.media_files[self.current_index]):
return False
current_file = str(self.media_files[self.current_index])
# Get or initialize jump counter for this file
if not hasattr(self, 'jump_counters'):
self.jump_counters = {}
if current_file not in self.jump_counters:
self.jump_counters[current_file] = 0
# Get standardized sample points
sample_points = self.get_sample_points()
if not sample_points:
print("Video too short for sampling")
return False
current_jump = self.jump_counters[current_file]
if current_jump >= len(sample_points):
print("All sample points visited! Video fully sampled.")
return False
target_frame = sample_points[current_jump]
target_frame = min(target_frame, self.total_frames - 1)
# Track last position for bisection
self.last_jump_position[current_file] = self.current_frame
# Track jump history for H key undo
if current_file not in self.jump_history:
self.jump_history[current_file] = []
self.jump_history[current_file].append(self.current_frame)
# Jump to the target frame
if self.multi_segment_mode:
# In multi-segment mode, reposition all segments relative to the jump target
self.reposition_segments_around_frame(target_frame)
else:
# In single mode, just jump the main capture
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
self.load_current_frame()
# Increment jump counter
self.jump_counters[current_file] += 1
# Calculate percentage through video
percentage = (target_frame / self.total_frames) * 100
print(f"Sample {current_jump + 1}/{len(sample_points)}: jumped to frame {target_frame} ({percentage:.1f}% through video)")
return True
def bisect_backwards(self):
"""Bisect backwards between last position and current position"""
if not self.is_video(self.media_files[self.current_index]):
return False
current_file = str(self.media_files[self.current_index])
if current_file not in self.last_jump_position:
print("No previous position to bisect from. Use L first to establish a reference point.")
return False
last_pos = self.last_jump_position[current_file]
current_pos = self.current_frame
if last_pos == current_pos:
print("Already at the same position as last jump.")
return False
# Calculate midpoint
if last_pos < current_pos:
midpoint = (last_pos + current_pos) // 2
else:
midpoint = (current_pos + last_pos) // 2
# Update last position for further bisection
self.last_jump_position[current_file] = current_pos
# Jump to midpoint
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, midpoint)
self.load_current_frame()
percentage = (midpoint / self.total_frames) * 100
print(f"Bisected backwards to frame {midpoint} ({percentage:.1f}% through video)")
return True
def bisect_forwards(self):
"""Bisect forwards between current position and next sample point"""
if not self.is_video(self.media_files[self.current_index]):
return False
current_file = str(self.media_files[self.current_index])
# Get next sample point
if not hasattr(self, 'jump_counters') or current_file not in self.jump_counters:
print("No sampling started yet. Use L first to establish sample points.")
return False
# Use same sampling strategy as L key
sample_points = self.get_sample_points()
current_jump = self.jump_counters[current_file]
if current_jump >= len(sample_points):
print("All sample points visited. No forward reference point.")
return False
next_sample = sample_points[current_jump]
next_sample = min(next_sample, self.total_frames - 1)
current_pos = self.current_frame
# Calculate midpoint between current and next sample
midpoint = (current_pos + next_sample) // 2
if midpoint == current_pos:
print("Already at or very close to next sample point.")
return False
# Update last position for further bisection
self.last_jump_position[current_file] = current_pos
# Jump to midpoint
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, midpoint)
self.load_current_frame()
percentage = (midpoint / self.total_frames) * 100
print(f"Bisected forwards to frame {midpoint} ({percentage:.1f}% through video)")
return True
def undo_jump(self):
"""Undo the last L jump by returning to previous position"""
if not self.is_video(self.media_files[self.current_index]):
return False
current_file = str(self.media_files[self.current_index])
if current_file not in self.jump_history or not self.jump_history[current_file]:
print("No jump history to undo. Use L first to establish jump points.")
return False
# Get the last position before the most recent jump
if len(self.jump_history[current_file]) < 1:
print("No previous position to return to.")
return False
# Remove the current position from history and get the previous one
previous_position = self.jump_history[current_file].pop()
# Jump back to previous position
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, previous_position)
self.load_current_frame()
# Update last jump position for bisection reference
self.last_jump_position[current_file] = previous_position
percentage = (previous_position / self.total_frames) * 100
print(f"Undid jump: returned to frame {previous_position} ({percentage:.1f}% through video)")
return True
def toggle_multi_segment_mode(self):
"""Toggle between single and multi-segment video mode"""
if not self.is_video(self.media_files[self.current_index]):
print("Multi-segment mode only works with videos")
return False
self.multi_segment_mode = not self.multi_segment_mode
if self.multi_segment_mode:
print(f"Enabled multi-segment mode ({self.segment_count} segments)")
self.setup_segment_captures()
else:
print("Disabled multi-segment mode")
self.cleanup_segment_captures()
# Reload single video
self.load_media(self.media_files[self.current_index])
return True
def toggle_timeline(self):
"""Toggle timeline visibility"""
self.timeline_visible = not self.timeline_visible
print(f"Timeline {'visible' if self.timeline_visible else 'hidden'}")
return True
def load_segment_frame_fast(self, segment_index, start_frame, shared_cap):
"""Load a single segment frame using a shared capture (much faster)"""
segment_start_time = time.time()
try:
# Time the seek operation
seek_start = time.time()
shared_cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
seek_time = (time.time() - seek_start) * 1000
# Time the frame read
read_start = time.time()
ret, frame = shared_cap.read()
read_time = (time.time() - read_start) * 1000
total_time = (time.time() - segment_start_time) * 1000
print(f"Segment {segment_index}: Total={total_time:.1f}ms (Seek={seek_time:.1f}ms, Read={read_time:.1f}ms)")
if ret:
return segment_index, frame.copy(), start_frame # Copy frame since we'll reuse the capture
else:
return segment_index, None, start_frame
except Exception as e:
error_time = (time.time() - segment_start_time) * 1000
print(f"Segment {segment_index}: ERROR in {error_time:.1f}ms: {e}")
return segment_index, None, start_frame
def setup_segment_captures_blazing_fast(self):
"""BLAZING FAST: Sample frames at intervals without any seeking (10-50ms total)"""
if not self.is_video(self.media_files[self.current_index]):
return
start_time = time.time()
print(f"Setting up {self.segment_count} segments with BLAZING FAST method...")
# Clean up existing segment captures
self.cleanup_segment_captures()
current_file = self.media_files[self.current_index]
# Initialize arrays
self.segment_caps = [None] * self.segment_count
self.segment_frames = [None] * self.segment_count
self.segment_positions = [0] * self.segment_count # We'll update these as we sample
# BLAZING FAST METHOD: Sample frames at even intervals without seeking
load_start = time.time()
print("Sampling frames at regular intervals (NO SEEKING)...")
shared_cap_start = time.time()
shared_cap = cv2.VideoCapture(str(current_file))
shared_cap_create_time = (time.time() - shared_cap_start) * 1000
print(f"Capture creation: {shared_cap_create_time:.1f}ms")
if shared_cap.isOpened():
frames_start = time.time()
# Calculate sampling interval
sample_interval = max(1, self.total_frames // (self.segment_count * 2)) # Sample more frequently than needed
print(f"Sampling every {sample_interval} frames from {self.total_frames} total frames")
current_frame = 0
segment_index = 0
segments_filled = 0
sample_start = time.time()
while segments_filled < self.segment_count:
ret, frame = shared_cap.read()
if not ret:
break
# Check if this frame should be used for a segment
if segment_index < self.segment_count:
target_frame_for_segment = int((segment_index / max(1, self.segment_count - 1)) * (self.total_frames - 1))
# If we're close enough to the target frame, use this frame
if abs(current_frame - target_frame_for_segment) <= sample_interval:
self.segment_frames[segment_index] = frame.copy()
self.segment_positions[segment_index] = current_frame
print(f"Segment {segment_index}: Frame {current_frame} (target was {target_frame_for_segment})")
segment_index += 1
segments_filled += 1
current_frame += 1
# Skip frames to speed up sampling if we have many frames
if sample_interval > 1:
for _ in range(sample_interval - 1):
ret, _ = shared_cap.read()
if not ret:
break
current_frame += 1
if not ret:
break
sample_time = (time.time() - sample_start) * 1000
frames_time = (time.time() - frames_start) * 1000
print(f"Frame sampling: {sample_time:.1f}ms for {segments_filled} segments")
print(f"Total frame loading: {frames_time:.1f}ms")
shared_cap.release()
else:
print("Failed to create shared capture!")
total_time = time.time() - start_time
print(f"BLAZING FAST Total setup time: {total_time * 1000:.1f}ms")
# Report success
successful_segments = sum(1 for frame in self.segment_frames if frame is not None)
print(f"Successfully sampled {successful_segments}/{self.segment_count} segments")
def setup_segment_captures_lightning_fast(self):
"""LIGHTNING FAST: Use intelligent skipping to get segments in minimal time"""
if not self.is_video(self.media_files[self.current_index]):
return
start_time = time.time()
print(f"Setting up {self.segment_count} segments with LIGHTNING FAST method...")
# Clean up existing segment captures
self.cleanup_segment_captures()
current_file = self.media_files[self.current_index]
# Initialize arrays
self.segment_caps = [None] * self.segment_count
self.segment_frames = [None] * self.segment_count
self.segment_positions = []
# Calculate target positions
for i in range(self.segment_count):
position_ratio = i / max(1, self.segment_count - 1)
start_frame = int(position_ratio * (self.total_frames - 1))
self.segment_positions.append(start_frame)
# LIGHTNING FAST: Smart skipping strategy
load_start = time.time()
print("Using SMART SKIPPING strategy...")
shared_cap_start = time.time()
shared_cap = cv2.VideoCapture(str(current_file))
shared_cap_create_time = (time.time() - shared_cap_start) * 1000
print(f"Capture creation: {shared_cap_create_time:.1f}ms")
if shared_cap.isOpened():
frames_start = time.time()
# Strategy: Read a much smaller subset and interpolate/approximate
# Only read 4-6 key frames and generate the rest through approximation
key_frames_to_read = min(6, self.segment_count)
frames_read = 0
for i in range(key_frames_to_read):
target_frame = self.segment_positions[i * (self.segment_count // key_frames_to_read)]
seek_start = time.time()
shared_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
seek_time = (time.time() - seek_start) * 1000
read_start = time.time()
ret, frame = shared_cap.read()
read_time = (time.time() - read_start) * 1000
if ret:
# Use this frame for multiple segments (approximation)
segments_per_key = self.segment_count // key_frames_to_read
start_seg = i * segments_per_key
end_seg = min(start_seg + segments_per_key, self.segment_count)
for seg_idx in range(start_seg, end_seg):
self.segment_frames[seg_idx] = frame.copy()
frames_read += 1
print(f"Key frame {i}: Frame {target_frame} -> Segments {start_seg}-{end_seg-1} ({seek_time:.1f}ms + {read_time:.1f}ms)")
else:
print(f"Failed to read key frame {i} at position {target_frame}")
# Fill any remaining segments with the last valid frame
last_valid_frame = None
for frame in self.segment_frames:
if frame is not None:
last_valid_frame = frame
break
if last_valid_frame is not None:
for i in range(len(self.segment_frames)):
if self.segment_frames[i] is None:
self.segment_frames[i] = last_valid_frame.copy()
frames_time = (time.time() - frames_start) * 1000
print(f"Smart frame reading: {frames_time:.1f}ms ({frames_read} key frames for {self.segment_count} segments)")
shared_cap.release()
else:
print("Failed to create shared capture!")
total_time = time.time() - start_time
print(f"LIGHTNING FAST Total setup time: {total_time * 1000:.1f}ms")
# Report success
successful_segments = sum(1 for frame in self.segment_frames if frame is not None)
print(f"Successfully approximated {successful_segments}/{self.segment_count} segments")
def setup_segment_captures(self):
"""Use the lightning fast approximation method for maximum speed"""
self.setup_segment_captures_lightning_fast()
def cleanup_segment_captures(self):
"""Clean up all segment video captures"""
for cap in self.segment_caps:
if cap:
cap.release()
self.segment_caps = []
self.segment_frames = []
self.segment_positions = []
# Clear frame cache
self.frame_cache.clear()
def get_cached_frame(self, frame_number: int):
"""Get frame from cache or load it if not cached"""
# Check cache first (thread-safe)
with self.cache_lock:
if frame_number in self.frame_cache:
return self.frame_cache[frame_number].copy() # Return a copy to avoid modification
# Load frame outside of lock to avoid blocking other threads
frame = None
if self.current_cap:
# Create a temporary capture to avoid interfering with main playback
current_file = self.media_files[self.current_index]
temp_cap = cv2.VideoCapture(str(current_file))
if temp_cap.isOpened():
temp_cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
ret, frame = temp_cap.read()
temp_cap.release()
if ret and frame is not None:
# Cache the frame (with size limit) - thread-safe
with self.cache_lock:
if len(self.frame_cache) >= self.cache_size_limit:
# Remove oldest cached frames (remove multiple at once for efficiency)
keys_to_remove = sorted(self.frame_cache.keys())[:len(self.frame_cache) // 4]
for key in keys_to_remove:
del self.frame_cache[key]
self.frame_cache[frame_number] = frame.copy()
return frame
return None
def get_segment_capture(self, segment_index):
"""Get or create a capture for a specific segment (lazy loading)"""
if segment_index >= len(self.segment_caps) or self.segment_caps[segment_index] is None:
if segment_index < len(self.segment_caps):
# Create capture on demand
current_file = self.media_files[self.current_index]
cap = cv2.VideoCapture(str(current_file))
if cap.isOpened():
cap.set(cv2.CAP_PROP_POS_FRAMES, self.segment_positions[segment_index])
self.segment_caps[segment_index] = cap
return cap
else:
return None
return None
return self.segment_caps[segment_index]
def update_segment_frame_parallel(self, segment_index):
"""Update a single segment frame"""
try:
cap = self.get_segment_capture(segment_index)
if cap and cap.isOpened():
ret, frame = cap.read()
if ret:
return segment_index, frame
else:
# Loop back to segment start when reaching end
cap.set(cv2.CAP_PROP_POS_FRAMES, self.segment_positions[segment_index])
ret, frame = cap.read()
if ret:
return segment_index, frame
else:
return segment_index, None
return segment_index, None
except Exception as e:
print(f"Error updating segment {segment_index}: {e}")
return segment_index, None
def update_segment_frames(self):
"""Update frames for all segments during playback with parallel processing"""
if not self.multi_segment_mode or not self.segment_frames:
return
# Only update segments that have valid frames loaded
active_segments = [i for i, frame in enumerate(self.segment_frames) if frame is not None]
if not active_segments:
return
# Use thread pool for parallel frame updates (but limit to avoid overwhelming)
if len(active_segments) <= 4:
# For small numbers, use parallel processing
futures = []
for i in active_segments:
future = self.thread_pool.submit(self.update_segment_frame_parallel, i)
futures.append(future)
# Collect results
for future in futures:
segment_index, frame = future.result()
if frame is not None:
self.segment_frames[segment_index] = frame
else:
# For larger numbers, process in smaller batches to avoid resource exhaustion
batch_size = 4
for batch_start in range(0, len(active_segments), batch_size):
batch = active_segments[batch_start:batch_start + batch_size]
futures = []
for i in batch:
future = self.thread_pool.submit(self.update_segment_frame_parallel, i)
futures.append(future)
# Collect batch results
for future in futures:
segment_index, frame = future.result()
if frame is not None:
self.segment_frames[segment_index] = frame
def reposition_segments_around_frame(self, center_frame: int):
"""Reposition all segments around a center frame while maintaining spacing"""
if not self.multi_segment_mode or not self.segment_caps:
return
# Calculate new segment positions around the center frame
# Keep the same relative spacing but center around the new frame
segment_spacing = self.total_frames // (self.segment_count + 1)
new_positions = []
for i in range(self.segment_count):
# Spread segments around center_frame
offset = (i - (self.segment_count - 1) / 2) * segment_spacing
new_frame = int(center_frame + offset)
new_frame = max(0, min(new_frame, self.total_frames - 1))
new_positions.append(new_frame)
# Update segment positions and seek all captures
self.segment_positions = new_positions
for i, cap in enumerate(self.segment_caps):
if cap and cap.isOpened():
cap.set(cv2.CAP_PROP_POS_FRAMES, self.segment_positions[i])
# Load new frame
ret, frame = cap.read()
if ret:
self.segment_frames[i] = frame
# Reset position for next read
cap.set(cv2.CAP_PROP_POS_FRAMES, self.segment_positions[i])
def seek_segment_parallel(self, segment_index, frames_delta):
"""Seek a single segment by the specified number of frames"""
try:
if segment_index >= len(self.segment_positions):
return segment_index, None
cap = self.get_segment_capture(segment_index)
if cap and cap.isOpened():
current_frame = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
segment_start = self.segment_positions[segment_index]
segment_duration = self.total_frames // self.segment_count
segment_end = min(self.total_frames - 1, segment_start + segment_duration)
target_frame = max(segment_start, min(current_frame + frames_delta, segment_end))
# Try cache first for better performance
cached_frame = self.get_cached_frame(target_frame)
if cached_frame is not None:
cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
return segment_index, cached_frame
else:
# Fall back to normal seeking
cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
ret, frame = cap.read()
if ret:
return segment_index, frame
else:
return segment_index, None
return segment_index, None
except Exception as e:
print(f"Error seeking segment {segment_index}: {e}")
return segment_index, None
def seek_all_segments(self, frames_delta: int):
"""Seek all segments by the specified number of frames with parallel processing"""
if not self.multi_segment_mode or not self.segment_frames:
return
# Only seek segments that have valid frames loaded
active_segments = [i for i, frame in enumerate(self.segment_frames) if frame is not None]
if not active_segments:
return
# Use parallel processing for seeking
futures = []
for i in active_segments:
future = self.thread_pool.submit(self.seek_segment_parallel, i, frames_delta)
futures.append(future)
# Collect results
for future in futures:
segment_index, frame = future.result()
if frame is not None:
self.segment_frames[segment_index] = frame
def display_current_frame(self):
"""Display the current cached frame with overlays"""
if self.multi_segment_mode:
self.display_multi_segment_frame()
else:
self.display_single_frame()
def display_single_frame(self):
"""Display single frame view"""
if self.current_display_frame is None:
return
frame = self.current_display_frame.copy()
# Add info overlay
current_file = self.media_files[self.current_index]
if self.is_video(self.media_files[self.current_index]):
info_text = f"Speed: {self.playback_speed:.1f}x | Frame: {self.current_frame}/{self.total_frames} | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}"
cv2.putText(
frame,
info_text,
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(255, 255, 255),
2,
)
cv2.putText(
frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 1
)
# Draw timeline
self.draw_timeline(frame)
# Maintain aspect ratio when displaying
self.display_with_aspect_ratio(frame)
def display_multi_segment_frame(self):
"""Display multi-segment frame view"""
if not self.segment_frames or not any(frame is not None for frame in self.segment_frames):
return
# Calculate grid dimensions (2x2 for 4 segments)
grid_rows = int(self.segment_count ** 0.5)
grid_cols = int(self.segment_count / grid_rows)
# Get reference frame size
ref_frame = next((f for f in self.segment_frames if f is not None), None)
if ref_frame is None:
return
frame_height, frame_width = ref_frame.shape[:2]
# Calculate segment display size
segment_width = frame_width // grid_cols
segment_height = frame_height // grid_rows
# Create combined display frame
combined_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
# Place each segment in the grid
for i, segment_frame in enumerate(self.segment_frames):
if segment_frame is None:
continue
row = i // grid_cols
col = i % grid_cols
# Resize segment frame to fit grid cell while maintaining aspect ratio
frame_height, frame_width = segment_frame.shape[:2]
seg_scale_x = segment_width / frame_width
seg_scale_y = segment_height / frame_height
seg_scale = min(seg_scale_x, seg_scale_y)
new_seg_width = int(frame_width * seg_scale)
new_seg_height = int(frame_height * seg_scale)
resized_segment = cv2.resize(segment_frame, (new_seg_width, new_seg_height), interpolation=cv2.INTER_AREA)
# Center the resized segment in the grid cell
y_offset = (segment_height - new_seg_height) // 2
x_offset = (segment_width - new_seg_width) // 2
# Calculate position in combined frame
y_start = row * segment_height
y_end = y_start + segment_height
x_start = col * segment_width
x_end = x_start + segment_width
# Place segment in combined frame (centered)
y_place_start = y_start + y_offset
y_place_end = y_place_start + new_seg_height
x_place_start = x_start + x_offset
x_place_end = x_place_start + new_seg_width
# Ensure we don't go out of bounds
y_place_end = min(y_place_end, y_end)
x_place_end = min(x_place_end, x_end)
combined_frame[y_place_start:y_place_end, x_place_start:x_place_end] = resized_segment
# Add segment label
segment_position = int((self.segment_positions[i] / self.total_frames) * 100)
label_text = f"Seg {i+1}: {segment_position}%"
cv2.putText(
combined_frame,
label_text,
(x_place_start + 5, y_place_start + 20),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(255, 255, 255),
2,
)
cv2.putText(
combined_frame,
label_text,
(x_place_start + 5, y_place_start + 20),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 0, 0),
1,
)
# Draw grid borders
cv2.rectangle(combined_frame, (x_start, y_start), (x_end-1, y_end-1), (128, 128, 128), 1)
# Add overall info overlay
current_file = self.media_files[self.current_index]
info_text = f"MULTI-SEGMENT | Speed: {self.playback_speed:.1f}x | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}"
cv2.putText(
combined_frame,
info_text,
(10, frame_height - 20),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 255, 255),
2,
)
cv2.putText(
combined_frame,
info_text,
(10, frame_height - 20),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 0, 0),
1
)
# Draw multi-segment timeline
self.draw_multi_segment_timeline(combined_frame)
# Maintain aspect ratio when displaying
self.display_with_aspect_ratio(combined_frame)
def draw_multi_segment_timeline(self, frame):
"""Draw timeline showing all segment positions"""
if not self.is_video(self.media_files[self.current_index]) or not self.segment_caps or not self.timeline_visible:
return
height, width = frame.shape[:2]
# Timeline area - smaller than normal timeline
timeline_height = 30
timeline_y = height - timeline_height - 25 # Leave space for info text
timeline_margin = 20
timeline_bar_height = 8
# Draw timeline background
cv2.rectangle(frame, (0, timeline_y), (width, timeline_y + timeline_height), (40, 40, 40), -1)
# Calculate timeline bar position
bar_y = timeline_y + (timeline_height - timeline_bar_height) // 2
bar_x_start = timeline_margin
bar_x_end = width - timeline_margin
bar_width = bar_x_end - bar_x_start
# Draw timeline background bar
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + timeline_bar_height), (80, 80, 80), -1)
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + timeline_bar_height), (200, 200, 200), 1)
# Draw segment markers
if self.total_frames > 0:
for i, segment_pos in enumerate(self.segment_positions):
# Calculate position on timeline
progress = segment_pos / max(1, self.total_frames - 1)
marker_x = bar_x_start + int(bar_width * progress)
# Draw segment marker
color = (0, 255, 100) if i < len(self.segment_caps) and self.segment_caps[i] else (100, 100, 100)
cv2.circle(frame, (marker_x, bar_y + timeline_bar_height // 2), 4, color, -1)
cv2.circle(frame, (marker_x, bar_y + timeline_bar_height // 2), 4, (255, 255, 255), 1)
# Add segment number
cv2.putText(frame, str(i+1), (marker_x - 3, bar_y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 255), 1)
def draw_timeline(self, frame):
"""Draw timeline at the bottom of the frame"""
# Only draw timeline for video files in single mode and when visible
if not self.is_video(self.media_files[self.current_index]) or self.multi_segment_mode or not self.timeline_visible:
return
height, width = frame.shape[:2]
self.window_height = height
self.window_width = width
# Timeline background area
timeline_y = height - self.TIMELINE_HEIGHT
cv2.rectangle(frame, (0, timeline_y), (width, height), (40, 40, 40), -1)
# Calculate timeline bar position
bar_y = timeline_y + (self.TIMELINE_HEIGHT - self.TIMELINE_BAR_HEIGHT) // 2
bar_x_start = self.TIMELINE_MARGIN
bar_x_end = width - self.TIMELINE_MARGIN
bar_width = bar_x_end - bar_x_start
self.timeline_rect = (bar_x_start, bar_y, bar_width, self.TIMELINE_BAR_HEIGHT)
# Draw timeline background
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BG, -1)
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BORDER, 1)
# Draw progress for videos
if self.total_frames > 0:
progress = self.current_frame / max(1, self.total_frames - 1)
progress_width = int(bar_width * progress)
if progress_width > 0:
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_start + progress_width, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_PROGRESS, -1)
# Draw handle
handle_x = bar_x_start + progress_width
handle_y = bar_y + self.TIMELINE_BAR_HEIGHT // 2
cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_HANDLE, -1)
cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_BORDER, 2)
def mouse_callback(self, event, x, y, flags, param):
"""Handle mouse events for timeline interaction"""
if not self.timeline_rect or not self.is_video(self.media_files[self.current_index]) or self.multi_segment_mode:
return
bar_x_start, bar_y, bar_width, bar_height = self.timeline_rect
bar_x_end = bar_x_start + bar_width
# Check if mouse is over timeline
if bar_y <= y <= bar_y + bar_height + 10: # Add some extra height for easier clicking
if event == cv2.EVENT_LBUTTONDOWN:
if bar_x_start <= x <= bar_x_end:
self.mouse_dragging = True
self.seek_to_position(x, bar_x_start, bar_width)
elif event == cv2.EVENT_MOUSEMOVE and self.mouse_dragging:
if bar_x_start <= x <= bar_x_end:
self.seek_to_position(x, bar_x_start, bar_width)
elif event == cv2.EVENT_LBUTTONUP:
self.mouse_dragging = False
def seek_to_position(self, mouse_x, bar_x_start, bar_width):
"""Seek to position based on mouse click/drag on timeline"""
if not self.current_cap or not self.is_video(self.media_files[self.current_index]):
return
# Calculate position ratio
relative_x = mouse_x - bar_x_start
position_ratio = max(0, min(1, relative_x / bar_width))
# Calculate target frame
target_frame = int(position_ratio * (self.total_frames - 1))
target_frame = max(0, min(target_frame, self.total_frames - 1))
# Seek to target frame
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
self.load_current_frame()
def advance_frame(self):
"""Advance to next frame(s) based on playback speed"""
if (
not self.is_video(self.media_files[self.current_index])
or not self.is_playing
):
return
if self.multi_segment_mode:
# Update all segment frames
self.update_segment_frames()
return True
else:
frames_to_skip = self.calculate_frames_to_skip()
for _ in range(frames_to_skip + 1):
ret, frame = self.current_cap.read()
if not ret:
return False
self.current_display_frame = frame
self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES))
# Update watch tracking
self.update_watch_tracking()
return True
def seek_video(self, frames_delta: int):
"""Seek video by specified number of frames"""
if not self.is_video(self.media_files[self.current_index]):
return
if self.multi_segment_mode:
self.seek_all_segments(frames_delta)
else:
if not self.current_cap:
return
target_frame = max(
0, min(self.current_frame + frames_delta, self.total_frames - 1)
)
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
self.load_current_frame()
def process_seek_key(self, key: int) -> bool:
"""Process seeking keys with proper rate limiting"""
current_time = time.time()
seek_direction = 0
seek_amount = 0
seek_multiplier = 1 # Default multiplier
# Check for A/D keys with modifiers
if key == ord("a") or key == ord("A"):
seek_direction = -1
# SHIFT+A gives uppercase A
if key == ord("A"):
seek_multiplier = self.SHIFT_SEEK_MULTIPLIER
elif key == ord("d") or key == ord("D"):
seek_direction = 1
# SHIFT+D gives uppercase D
if key == ord("D"):
seek_multiplier = self.SHIFT_SEEK_MULTIPLIER
elif key == 1: # CTRL+A
seek_direction = -1
seek_multiplier = self.CTRL_SEEK_MULTIPLIER
elif key == 4: # CTRL+D
seek_direction = 1
seek_multiplier = self.CTRL_SEEK_MULTIPLIER
elif key == ord(","):
seek_amount = -self.fine_seek_frames
elif key == ord("."):
seek_amount = self.fine_seek_frames
else:
if self.current_seek_key is not None:
self.current_seek_key = None
self.is_seeking = False
return False
# Handle fine seeking (comma/period) - always immediate
if seek_amount != 0:
self.seek_video(seek_amount)
return True
# Handle A/D key seeking with rate limiting and modifiers
if seek_direction != 0:
if self.current_seek_key != key:
self.current_seek_key = key
self.key_first_press_time = current_time
self.last_seek_time = current_time
self.is_seeking = True
seek_amount = seek_direction * self.coarse_seek_frames * seek_multiplier
self.seek_video(seek_amount)
return True
elif self.is_seeking:
time_since_last_seek = current_time - self.last_seek_time
time_held = current_time - self.key_first_press_time
if time_since_last_seek >= self.KEY_REPEAT_RATE_SEC:
self.last_seek_time = current_time
if time_held > self.FAST_SEEK_ACTIVATION_TIME:
seek_amount = seek_direction * self.fast_seek_frames * seek_multiplier
else:
seek_amount = seek_direction * self.coarse_seek_frames * seek_multiplier
self.seek_video(seek_amount)
return True
return False
def grade_media(self, grade: int):
"""Move current media file to grade directory"""
if not self.media_files or grade < 1 or grade > 5:
return
current_file = self.media_files[self.current_index]
grade_dir = self.directory / str(grade)
# Create grade directory if it doesn't exist
grade_dir.mkdir(exist_ok=True)
destination = grade_dir / current_file.name
counter = 1
while destination.exists():
stem = current_file.stem
suffix = current_file.suffix
destination = grade_dir / f"{stem}_{counter}{suffix}"
counter += 1
# Track this move for undo functionality BEFORE making changes
self.undo_history.append((str(destination), str(current_file), self.current_index))
# Release video capture to unlock the file before moving
if self.current_cap:
self.current_cap.release()
self.current_cap = None
# Also release segment captures if in multi-segment mode
if self.multi_segment_mode:
self.cleanup_segment_captures()
try:
shutil.move(str(current_file), str(destination))
print(f"Moved {current_file.name} to grade {grade}")
self.media_files.pop(self.current_index)
if self.current_index >= len(self.media_files):
self.current_index = 0
if not self.media_files:
print("No more media files to grade!")
return False
except Exception as e:
print(f"Error moving file: {e}")
# Remove the undo entry since the move failed
self.undo_history.pop()
return True
def undo_last_action(self):
"""Undo the last grading action by moving file back and restoring to media list"""
if not self.undo_history:
print("No actions to undo!")
return False
# Get the last action
moved_file_path, original_file_path, original_index = self.undo_history.pop()
# Release video capture to unlock any current file before moving
if self.current_cap:
self.current_cap.release()
self.current_cap = None
try:
# Move the file back to its original location
shutil.move(moved_file_path, original_file_path)
# Add the file back to the media list at its original position
original_file = Path(original_file_path)
# Insert the file back at the appropriate position
if original_index <= len(self.media_files):
self.media_files.insert(original_index, original_file)
else:
self.media_files.append(original_file)
# Navigate to the restored file
print("Navigating to: ", original_index)
self.current_index = original_index
print(f"Undone: Moved {original_file.name} back from grade folder")
return True
except Exception as e:
print(f"Error undoing action: {e}")
# If undo failed, put the action back in history
self.undo_history.append((moved_file_path, original_file_path, original_index))
return False
def run(self):
"""Main grading loop"""
self.media_files = self.find_media_files()
if not self.media_files:
print("No media files found in directory!")
return
print(f"Found {len(self.media_files)} media files")
print("Controls:")
print(" Space: Pause/Play")
print(" A/D: Seek backward/forward (hold for FAST seek)")
print(" Shift+A/D: Seek backward/forward (5x multiplier)")
print(" Ctrl+A/D: Seek backward/forward (10x multiplier)")
print(" , / . : Frame-by-frame seek (fine control)")
print(" W/S: Decrease/Increase playback speed")
print(" G: Toggle multi-segment mode (videos only)")
print(" 1-5: Grade and move file")
print(" N: Next file")
print(" P: Previous file")
print(" U: Undo last grading action")
print(" L: Sample video at key points (videos only)")
print(" H: Toggle timeline visibility")
print(" J: Bisect backwards from current position (videos only, disabled in multi-segment)")
print(" K: Bisect forwards toward next sample (videos only, disabled in multi-segment)")
print(" Q/ESC: Quit")
cv2.namedWindow("Media Grader", cv2.WINDOW_NORMAL)
cv2.setMouseCallback("Media Grader", self.mouse_callback)
# Set initial window size to a reasonable default (will be resized on first frame)
cv2.resizeWindow("Media Grader", 1280, 720)
while self.media_files and self.current_index < len(self.media_files):
current_file = self.media_files[self.current_index]
if not self.load_media(current_file):
print(f"Could not load {current_file}")
self.current_index += 1
continue
# Setup multi-segment mode if enabled and this is a video
if self.multi_segment_mode and self.is_video(current_file):
self.setup_segment_captures()
window_title = f"Media Grader - {current_file.name} ({self.current_index + 1}/{len(self.media_files)})"
cv2.setWindowTitle("Media Grader", window_title)
while True:
self.display_current_frame()
if self.is_video(current_file):
if self.is_seeking:
delay = self.FRAME_RENDER_TIME_MS
else:
delay = self.calculate_frame_delay()
else:
delay = self.IMAGE_DISPLAY_DELAY_MS
key = cv2.waitKey(delay) & 0xFF
if key == ord("q") or key == 27:
return
elif key == ord(" "):
self.is_playing = not self.is_playing
elif key == ord("s"):
self.playback_speed = max(
self.MIN_PLAYBACK_SPEED,
self.playback_speed - self.SPEED_INCREMENT,
)
elif key == ord("w"):
self.playback_speed = min(
self.MAX_PLAYBACK_SPEED,
self.playback_speed + self.SPEED_INCREMENT,
)
elif self.process_seek_key(key):
pass
elif key == ord("n"):
break
elif key == ord("p"):
self.current_index = max(0, self.current_index - 1)
print("Navigating to: ", self.current_index)
break
elif key == ord("u"):
if self.undo_last_action():
# File was restored, reload it
break
elif key == ord("l"):
# Jump to largest unwatched region (works in both modes)
self.jump_to_unwatched_region()
elif key == ord("j"):
if not self.multi_segment_mode:
self.bisect_backwards()
else:
print("Navigation keys (H/J/K/L) disabled in multi-segment mode")
elif key == ord("k"):
if not self.multi_segment_mode:
self.bisect_forwards()
else:
print("Navigation keys (H/J/K/L) disabled in multi-segment mode")
elif key == ord("h"): # Toggle timeline visibility
self.toggle_timeline()
elif key == ord("g"):
self.toggle_multi_segment_mode()
elif key in [ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]:
grade = int(chr(key))
if not self.grade_media(grade):
return
break
elif key == 255:
if self.is_seeking and self.current_seek_key is not None:
self.process_seek_key(self.current_seek_key)
if (
self.is_playing
and self.is_video(current_file)
and not self.is_seeking
):
if not self.advance_frame():
# Video reached the end, restart it instead of navigating
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
self.current_frame = 0
self.load_current_frame()
if key not in [ord("p"), ord("u"), ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]:
print("Navigating to (pu12345): ", self.current_index)
self.current_index += 1
if self.current_cap:
self.current_cap.release()
self.cleanup_segment_captures()
# Cleanup thread pool
self.thread_pool.shutdown(wait=True)
cv2.destroyAllWindows()
print("Grading session complete!")
def main():
parser = argparse.ArgumentParser(
description="Media Grader - Grade media files by moving them to numbered folders"
)
parser.add_argument(
"directory",
nargs="?",
default=".",
help="Directory to scan for media files (default: current directory)",
)
parser.add_argument(
"--seek-frames",
type=int,
default=30,
help="Number of frames to seek when using arrow keys (default: 30)",
)
parser.add_argument(
"--snap-to-iframe",
action="store_true",
help="Snap to I-frames when seeking backward for better performance",
)
args = parser.parse_args()
if not os.path.isdir(args.directory):
print(f"Error: {args.directory} is not a valid directory")
sys.exit(1)
grader = MediaGrader(args.directory, args.seek_frames, args.snap_to_iframe)
try:
grader.run()
except KeyboardInterrupt:
print("\nGrading session interrupted")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()