Files
py-media-grader/main.py

896 lines
33 KiB
Python

import os
import sys
import glob
import cv2
import argparse
import shutil
import time
from pathlib import Path
from typing import List
class MediaGrader:
# Configuration constants
BASE_FRAME_DELAY_MS = 16 # ~30 FPS
KEY_REPEAT_RATE_SEC = 0.5 # How often to process key repeats
FAST_SEEK_ACTIVATION_TIME = 2.0 # How long to hold before fast seek
FRAME_RENDER_TIME_MS = 50 # Time to let frames render between seeks
SPEED_INCREMENT = 0.2
MIN_PLAYBACK_SPEED = 0.1
MAX_PLAYBACK_SPEED = 100.0
FAST_SEEK_MULTIPLIER = 60
IMAGE_DISPLAY_DELAY_MS = 100
# Timeline configuration
TIMELINE_HEIGHT = 60
TIMELINE_MARGIN = 20
TIMELINE_BAR_HEIGHT = 12
TIMELINE_HANDLE_SIZE = 12
TIMELINE_COLOR_BG = (80, 80, 80)
TIMELINE_COLOR_PROGRESS = (0, 120, 255)
TIMELINE_COLOR_HANDLE = (255, 255, 255)
TIMELINE_COLOR_BORDER = (200, 200, 200)
def __init__(
self, directory: str, seek_frames: int = 30, snap_to_iframe: bool = False
):
self.directory = Path(directory)
self.seek_frames = seek_frames
self.current_index = 0
self.playback_speed = 1.0
self.media_files = []
self.current_cap = None
self.is_playing = True
self.current_frame = 0
self.total_frames = 0
# Key repeat tracking with rate limiting
self.last_seek_time = 0
self.current_seek_key = None
self.key_first_press_time = 0
self.is_seeking = False
# Seeking modes
self.fine_seek_frames = 1 # Frame-by-frame
self.coarse_seek_frames = self.seek_frames # User-configurable
self.fast_seek_frames = self.seek_frames * self.FAST_SEEK_MULTIPLIER
# Current frame cache for display
self.current_display_frame = None
# Supported media extensions
self.extensions = [
".png",
".jpg",
".jpeg",
".gif",
".mp4",
".avi",
".mov",
".mkv",
]
# Mouse interaction for timeline
self.mouse_dragging = False
self.timeline_rect = None
self.window_width = 800
self.window_height = 600
# Undo functionality
self.undo_history = [] # List of (source_path, destination_path, original_index) tuples
# Watch tracking for "good look" feature
self.watched_regions = {} # Dict[file_path: List[Tuple[start_frame, end_frame]]]
self.current_watch_start = None # Frame where current viewing session started
self.last_frame_position = 0 # Track last known frame position
# Bisection navigation tracking
self.last_jump_position = {} # Dict[file_path: last_frame] for bisection reference
# Jump history for H key (undo jump)
self.jump_history = {} # Dict[file_path: List[frame_positions]] for jump undo
def find_media_files(self) -> List[Path]:
"""Find all media files recursively in the directory"""
media_files = []
for ext in self.extensions:
pattern = str(self.directory / "**" / f"*{ext}")
files = glob.glob(pattern, recursive=True)
media_files.extend([Path(f) for f in files])
# Filter out files already in grade directories
filtered_files = []
for file in media_files:
# Check if file is not in a grade directory (1-5)
if not any(part in ["1", "2", "3", "4", "5"] for part in file.parts):
print("Adding file: ", file)
filtered_files.append(file)
return sorted(filtered_files)
def is_video(self, file_path: Path) -> bool:
"""Check if file is a video"""
video_extensions = [".mp4", ".avi", ".mov", ".mkv", ".gif"]
return file_path.suffix.lower() in video_extensions
def calculate_frame_delay(self) -> int:
"""Calculate frame delay in milliseconds based on playback speed"""
delay_ms = int(self.BASE_FRAME_DELAY_MS / self.playback_speed)
return max(1, delay_ms)
def calculate_frames_to_skip(self) -> int:
"""Calculate how many frames to skip for high-speed playback"""
if self.playback_speed <= 1.0:
return 0
elif self.playback_speed <= 2.0:
return 0
elif self.playback_speed <= 5.0:
return int(self.playback_speed - 1)
else:
return int(self.playback_speed * 2)
def load_media(self, file_path: Path) -> bool:
"""Load media file for display"""
if self.current_cap:
self.current_cap.release()
if self.is_video(file_path):
# Suppress OpenCV error messages for unsupported codecs
self.current_cap = cv2.VideoCapture(str(file_path))
if not self.current_cap.isOpened():
print(f"Warning: Could not open video file {file_path.name} (unsupported codec)")
return False
self.total_frames = int(self.current_cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.current_frame = 0
else:
self.current_cap = None
self.total_frames = 1
self.current_frame = 0
# Load initial frame
self.load_current_frame()
# Start watch tracking session for videos
self.start_watch_session()
return True
def load_current_frame(self):
"""Load the current frame into display cache"""
if self.is_video(self.media_files[self.current_index]):
if not self.current_cap:
return False
ret, frame = self.current_cap.read()
if ret:
self.current_display_frame = frame
self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES))
return True
return False
else:
frame = cv2.imread(str(self.media_files[self.current_index]))
if frame is not None:
self.current_display_frame = frame
return True
return False
def start_watch_session(self):
"""Start tracking a new viewing session"""
if self.is_video(self.media_files[self.current_index]):
self.current_watch_start = self.current_frame
self.last_frame_position = self.current_frame
def update_watch_tracking(self):
"""Update watch tracking based on current frame position"""
if not self.is_video(self.media_files[self.current_index]) or self.current_watch_start is None:
return
current_file = str(self.media_files[self.current_index])
# If we've moved more than a few frames from last position, record the watched region
if abs(self.current_frame - self.last_frame_position) > 5 or \
abs(self.current_frame - self.current_watch_start) > 30:
# Record the region we just watched
start_frame = min(self.current_watch_start, self.last_frame_position)
end_frame = max(self.current_watch_start, self.last_frame_position)
if current_file not in self.watched_regions:
self.watched_regions[current_file] = []
# Merge with existing regions if they overlap
self.add_watched_region(current_file, start_frame, end_frame)
# Start new session from current position
self.current_watch_start = self.current_frame
self.last_frame_position = self.current_frame
def add_watched_region(self, file_path, start_frame, end_frame):
"""Add a watched region, merging with existing overlapping regions"""
if file_path not in self.watched_regions:
self.watched_regions[file_path] = []
regions = self.watched_regions[file_path]
new_region = [start_frame, end_frame]
# Merge overlapping regions
merged = []
for region in regions:
if new_region[1] < region[0] or new_region[0] > region[1]:
# No overlap
merged.append(region)
else:
# Overlap, merge
new_region[0] = min(new_region[0], region[0])
new_region[1] = max(new_region[1], region[1])
merged.append(tuple(new_region))
self.watched_regions[file_path] = merged
def find_largest_unwatched_region(self):
"""Find the largest unwatched region in the current video"""
if not self.is_video(self.media_files[self.current_index]):
return None
current_file = str(self.media_files[self.current_index])
watched = self.watched_regions.get(current_file, [])
if not watched:
# No regions watched yet, return the beginning
return (0, self.total_frames // 4)
# Sort watched regions by start frame
watched.sort(key=lambda x: x[0])
# Find gaps between watched regions
gaps = []
# Gap before first watched region
if watched[0][0] > 0:
gaps.append((0, watched[0][0]))
# Gaps between watched regions
for i in range(len(watched) - 1):
gap_start = watched[i][1]
gap_end = watched[i + 1][0]
if gap_end > gap_start:
gaps.append((gap_start, gap_end))
# Gap after last watched region
if watched[-1][1] < self.total_frames:
gaps.append((watched[-1][1], self.total_frames))
if not gaps:
# Everything watched, return None
return None
# Return the largest gap
largest_gap = max(gaps, key=lambda x: x[1] - x[0])
return largest_gap
def jump_to_unwatched_region(self):
"""Jump to the next unwatched region of the video"""
if not self.is_video(self.media_files[self.current_index]):
return False
current_file = str(self.media_files[self.current_index])
# Get or initialize jump counter for this file
if not hasattr(self, 'jump_counters'):
self.jump_counters = {}
if current_file not in self.jump_counters:
self.jump_counters[current_file] = 0
# Define sampling strategy: divide video into segments and sample them
segments = 8 # Divide video into 8 segments for sampling
segment_size = self.total_frames // segments
if segment_size == 0:
print("Video too short for sampling")
return False
# Jump to different segments in a smart order
# Start with 1/4, 1/2, 3/4, then fill in the gaps
sample_points = [
segment_size * 2, # 1/4 through
segment_size * 4, # 1/2 through
segment_size * 6, # 3/4 through
segment_size * 1, # 1/8 through
segment_size * 3, # 3/8 through
segment_size * 5, # 5/8 through
segment_size * 7, # 7/8 through
0 # Beginning
]
current_jump = self.jump_counters[current_file]
if current_jump >= len(sample_points):
print("All sample points visited! Video fully sampled.")
return False
target_frame = sample_points[current_jump]
target_frame = min(target_frame, self.total_frames - 1)
# Track last position for bisection
self.last_jump_position[current_file] = self.current_frame
# Track jump history for H key undo
if current_file not in self.jump_history:
self.jump_history[current_file] = []
self.jump_history[current_file].append(self.current_frame)
# Jump to the target frame
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
self.load_current_frame()
# Increment jump counter
self.jump_counters[current_file] += 1
# Calculate percentage through video
percentage = (target_frame / self.total_frames) * 100
print(f"Sample {current_jump + 1}/{len(sample_points)}: jumped to frame {target_frame} ({percentage:.1f}% through video)")
return True
def bisect_backwards(self):
"""Bisect backwards between last position and current position"""
if not self.is_video(self.media_files[self.current_index]):
return False
current_file = str(self.media_files[self.current_index])
if current_file not in self.last_jump_position:
print("No previous position to bisect from. Use L first to establish a reference point.")
return False
last_pos = self.last_jump_position[current_file]
current_pos = self.current_frame
if last_pos == current_pos:
print("Already at the same position as last jump.")
return False
# Calculate midpoint
if last_pos < current_pos:
midpoint = (last_pos + current_pos) // 2
else:
midpoint = (current_pos + last_pos) // 2
# Update last position for further bisection
self.last_jump_position[current_file] = current_pos
# Jump to midpoint
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, midpoint)
self.load_current_frame()
percentage = (midpoint / self.total_frames) * 100
print(f"Bisected backwards to frame {midpoint} ({percentage:.1f}% through video)")
return True
def bisect_forwards(self):
"""Bisect forwards between current position and next sample point"""
if not self.is_video(self.media_files[self.current_index]):
return False
current_file = str(self.media_files[self.current_index])
# Get next sample point
if not hasattr(self, 'jump_counters') or current_file not in self.jump_counters:
print("No sampling started yet. Use L first to establish sample points.")
return False
# Define same sampling strategy as L key
segments = 8
segment_size = self.total_frames // segments
sample_points = [
segment_size * 2, # 1/4 through
segment_size * 4, # 1/2 through
segment_size * 6, # 3/4 through
segment_size * 1, # 1/8 through
segment_size * 3, # 3/8 through
segment_size * 5, # 5/8 through
segment_size * 7, # 7/8 through
0 # Beginning
]
current_jump = self.jump_counters[current_file]
if current_jump >= len(sample_points):
print("All sample points visited. No forward reference point.")
return False
next_sample = sample_points[current_jump]
next_sample = min(next_sample, self.total_frames - 1)
current_pos = self.current_frame
# Calculate midpoint between current and next sample
midpoint = (current_pos + next_sample) // 2
if midpoint == current_pos:
print("Already at or very close to next sample point.")
return False
# Update last position for further bisection
self.last_jump_position[current_file] = current_pos
# Jump to midpoint
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, midpoint)
self.load_current_frame()
percentage = (midpoint / self.total_frames) * 100
print(f"Bisected forwards to frame {midpoint} ({percentage:.1f}% through video)")
return True
def undo_jump(self):
"""Undo the last L jump by returning to previous position"""
if not self.is_video(self.media_files[self.current_index]):
return False
current_file = str(self.media_files[self.current_index])
if current_file not in self.jump_history or not self.jump_history[current_file]:
print("No jump history to undo. Use L first to establish jump points.")
return False
# Get the last position before the most recent jump
if len(self.jump_history[current_file]) < 1:
print("No previous position to return to.")
return False
# Remove the current position from history and get the previous one
previous_position = self.jump_history[current_file].pop()
# Jump back to previous position
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, previous_position)
self.load_current_frame()
# Update last jump position for bisection reference
self.last_jump_position[current_file] = previous_position
percentage = (previous_position / self.total_frames) * 100
print(f"Undid jump: returned to frame {previous_position} ({percentage:.1f}% through video)")
return True
def display_current_frame(self):
"""Display the current cached frame with overlays"""
if self.current_display_frame is None:
return
frame = self.current_display_frame.copy()
# Add info overlay
current_file = self.media_files[self.current_index]
if self.is_video(self.media_files[self.current_index]):
info_text = f"Speed: {self.playback_speed:.1f}x | Frame: {self.current_frame}/{self.total_frames} | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}"
cv2.putText(
frame,
info_text,
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(255, 255, 255),
2,
)
cv2.putText(
frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 1
)
# Draw timeline
self.draw_timeline(frame)
cv2.imshow("Media Grader", frame)
def draw_timeline(self, frame):
"""Draw timeline at the bottom of the frame"""
# Only draw timeline for video files
if not self.is_video(self.media_files[self.current_index]):
return
height, width = frame.shape[:2]
self.window_height = height
self.window_width = width
# Timeline background area
timeline_y = height - self.TIMELINE_HEIGHT
cv2.rectangle(frame, (0, timeline_y), (width, height), (40, 40, 40), -1)
# Calculate timeline bar position
bar_y = timeline_y + (self.TIMELINE_HEIGHT - self.TIMELINE_BAR_HEIGHT) // 2
bar_x_start = self.TIMELINE_MARGIN
bar_x_end = width - self.TIMELINE_MARGIN
bar_width = bar_x_end - bar_x_start
self.timeline_rect = (bar_x_start, bar_y, bar_width, self.TIMELINE_BAR_HEIGHT)
# Draw timeline background
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BG, -1)
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BORDER, 1)
# Draw progress for videos
if self.total_frames > 0:
progress = self.current_frame / max(1, self.total_frames - 1)
progress_width = int(bar_width * progress)
if progress_width > 0:
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_start + progress_width, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_PROGRESS, -1)
# Draw handle
handle_x = bar_x_start + progress_width
handle_y = bar_y + self.TIMELINE_BAR_HEIGHT // 2
cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_HANDLE, -1)
cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_BORDER, 2)
def mouse_callback(self, event, x, y, flags, param):
"""Handle mouse events for timeline interaction"""
if not self.timeline_rect or not self.is_video(self.media_files[self.current_index]):
return
bar_x_start, bar_y, bar_width, bar_height = self.timeline_rect
bar_x_end = bar_x_start + bar_width
# Check if mouse is over timeline
if bar_y <= y <= bar_y + bar_height + 10: # Add some extra height for easier clicking
if event == cv2.EVENT_LBUTTONDOWN:
if bar_x_start <= x <= bar_x_end:
self.mouse_dragging = True
self.seek_to_position(x, bar_x_start, bar_width)
elif event == cv2.EVENT_MOUSEMOVE and self.mouse_dragging:
if bar_x_start <= x <= bar_x_end:
self.seek_to_position(x, bar_x_start, bar_width)
elif event == cv2.EVENT_LBUTTONUP:
self.mouse_dragging = False
def seek_to_position(self, mouse_x, bar_x_start, bar_width):
"""Seek to position based on mouse click/drag on timeline"""
if not self.current_cap or not self.is_video(self.media_files[self.current_index]):
return
# Calculate position ratio
relative_x = mouse_x - bar_x_start
position_ratio = max(0, min(1, relative_x / bar_width))
# Calculate target frame
target_frame = int(position_ratio * (self.total_frames - 1))
target_frame = max(0, min(target_frame, self.total_frames - 1))
# Seek to target frame
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
self.load_current_frame()
def advance_frame(self):
"""Advance to next frame(s) based on playback speed"""
if (
not self.is_video(self.media_files[self.current_index])
or not self.is_playing
):
return
frames_to_skip = self.calculate_frames_to_skip()
for _ in range(frames_to_skip + 1):
ret, frame = self.current_cap.read()
if not ret:
return False
self.current_display_frame = frame
self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES))
# Update watch tracking
self.update_watch_tracking()
return True
def seek_video(self, frames_delta: int):
"""Seek video by specified number of frames"""
if not self.current_cap or not self.is_video(
self.media_files[self.current_index]
):
return
target_frame = max(
0, min(self.current_frame + frames_delta, self.total_frames - 1)
)
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
self.load_current_frame()
def process_seek_key(self, key: int) -> bool:
"""Process seeking keys with proper rate limiting"""
current_time = time.time()
seek_direction = 0
seek_amount = 0
if key == ord("a"):
seek_direction = -1
elif key == ord("d"):
seek_direction = 1
elif key == ord(","):
seek_amount = -self.fine_seek_frames
elif key == ord("."):
seek_amount = self.fine_seek_frames
else:
if self.current_seek_key is not None:
self.current_seek_key = None
self.is_seeking = False
return False
# Handle fine seeking (comma/period) - always immediate
if seek_amount != 0:
self.seek_video(seek_amount)
return True
# Handle arrow key seeking with rate limiting
if seek_direction != 0:
if self.current_seek_key != key:
self.current_seek_key = key
self.key_first_press_time = current_time
self.last_seek_time = current_time
self.is_seeking = True
seek_amount = seek_direction * self.coarse_seek_frames
self.seek_video(seek_amount)
return True
elif self.is_seeking:
time_since_last_seek = current_time - self.last_seek_time
time_held = current_time - self.key_first_press_time
if time_since_last_seek >= self.KEY_REPEAT_RATE_SEC:
self.last_seek_time = current_time
if time_held > self.FAST_SEEK_ACTIVATION_TIME:
seek_amount = seek_direction * self.fast_seek_frames
else:
seek_amount = seek_direction * self.coarse_seek_frames
self.seek_video(seek_amount)
return True
return False
def grade_media(self, grade: int):
"""Move current media file to grade directory"""
if not self.media_files or grade < 1 or grade > 5:
return
current_file = self.media_files[self.current_index]
grade_dir = self.directory / str(grade)
# Create grade directory if it doesn't exist
grade_dir.mkdir(exist_ok=True)
destination = grade_dir / current_file.name
counter = 1
while destination.exists():
stem = current_file.stem
suffix = current_file.suffix
destination = grade_dir / f"{stem}_{counter}{suffix}"
counter += 1
# Track this move for undo functionality BEFORE making changes
self.undo_history.append((str(destination), str(current_file), self.current_index))
# Release video capture to unlock the file before moving
if self.current_cap:
self.current_cap.release()
self.current_cap = None
try:
shutil.move(str(current_file), str(destination))
print(f"Moved {current_file.name} to grade {grade}")
self.media_files.pop(self.current_index)
if self.current_index >= len(self.media_files):
self.current_index = 0
if not self.media_files:
print("No more media files to grade!")
return False
except Exception as e:
print(f"Error moving file: {e}")
# Remove the undo entry since the move failed
self.undo_history.pop()
return True
def undo_last_action(self):
"""Undo the last grading action by moving file back and restoring to media list"""
if not self.undo_history:
print("No actions to undo!")
return False
# Get the last action
moved_file_path, original_file_path, original_index = self.undo_history.pop()
# Release video capture to unlock any current file before moving
if self.current_cap:
self.current_cap.release()
self.current_cap = None
try:
# Move the file back to its original location
shutil.move(moved_file_path, original_file_path)
# Add the file back to the media list at its original position
original_file = Path(original_file_path)
# Insert the file back at the appropriate position
if original_index <= len(self.media_files):
self.media_files.insert(original_index, original_file)
else:
self.media_files.append(original_file)
# Navigate to the restored file
self.current_index = original_index
print(f"Undone: Moved {original_file.name} back from grade folder")
return True
except Exception as e:
print(f"Error undoing action: {e}")
# If undo failed, put the action back in history
self.undo_history.append((moved_file_path, original_file_path, original_index))
return False
def run(self):
"""Main grading loop"""
self.media_files = self.find_media_files()
if not self.media_files:
print("No media files found in directory!")
return
print(f"Found {len(self.media_files)} media files")
print("Controls:")
print(" Space: Pause/Play")
print(" A/D: Seek backward/forward (hold for FAST seek)")
print(" , / . : Frame-by-frame seek (fine control)")
print(" W/S: Decrease/Increase playback speed")
print(" 1-5: Grade and move file")
print(" N: Next file")
print(" P: Previous file")
print(" U: Undo last grading action")
print(" L: Sample video at key points (videos only)")
print(" H: Undo last L jump (videos only)")
print(" J: Bisect backwards from current position (videos only)")
print(" K: Bisect forwards toward next sample (videos only)")
print(" Q/ESC: Quit")
cv2.namedWindow("Media Grader", cv2.WINDOW_NORMAL)
cv2.setMouseCallback("Media Grader", self.mouse_callback)
while self.media_files and self.current_index < len(self.media_files):
current_file = self.media_files[self.current_index]
if not self.load_media(current_file):
print(f"Could not load {current_file}")
self.current_index += 1
continue
window_title = f"Media Grader - {current_file.name} ({self.current_index + 1}/{len(self.media_files)})"
cv2.setWindowTitle("Media Grader", window_title)
while True:
self.display_current_frame()
if self.is_video(current_file):
if self.is_seeking:
delay = self.FRAME_RENDER_TIME_MS
else:
delay = self.calculate_frame_delay()
else:
delay = self.IMAGE_DISPLAY_DELAY_MS
key = cv2.waitKey(delay) & 0xFF
if key == ord("q") or key == 27:
return
elif key == ord(" "):
self.is_playing = not self.is_playing
elif key == ord("s"):
self.playback_speed = max(
self.MIN_PLAYBACK_SPEED,
self.playback_speed - self.SPEED_INCREMENT,
)
elif key == ord("w"):
self.playback_speed = min(
self.MAX_PLAYBACK_SPEED,
self.playback_speed + self.SPEED_INCREMENT,
)
elif self.process_seek_key(key):
pass
elif key == ord("n"):
break
elif key == ord("p"):
self.current_index = max(0, self.current_index - 1)
break
elif key == ord("u"):
if self.undo_last_action():
# File was restored, reload it
break
elif key == ord("l"):
# Jump to largest unwatched region
self.jump_to_unwatched_region()
elif key == ord("j"):
self.bisect_backwards()
elif key == ord("k"):
self.bisect_forwards()
elif key == ord("h"): # Changed from "j" to "h" for undo jump
self.undo_jump()
elif key in [ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]:
grade = int(chr(key))
if not self.grade_media(grade):
return
break
elif key == 255:
if self.is_seeking and self.current_seek_key is not None:
self.process_seek_key(self.current_seek_key)
if (
self.is_playing
and self.is_video(current_file)
and not self.is_seeking
):
if not self.advance_frame():
# Video reached the end, restart it instead of navigating
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
self.current_frame = 0
self.load_current_frame()
if key not in [ord("p")]:
self.current_index += 1
if self.current_cap:
self.current_cap.release()
cv2.destroyAllWindows()
print("Grading session complete!")
def main():
parser = argparse.ArgumentParser(
description="Media Grader - Grade media files by moving them to numbered folders"
)
parser.add_argument(
"directory",
nargs="?",
default=".",
help="Directory to scan for media files (default: current directory)",
)
parser.add_argument(
"--seek-frames",
type=int,
default=30,
help="Number of frames to seek when using arrow keys (default: 30)",
)
parser.add_argument(
"--snap-to-iframe",
action="store_true",
help="Snap to I-frames when seeking backward for better performance",
)
args = parser.parse_args()
if not os.path.isdir(args.directory):
print(f"Error: {args.directory} is not a valid directory")
sys.exit(1)
grader = MediaGrader(args.directory, args.seek_frames, args.snap_to_iframe)
try:
grader.run()
except KeyboardInterrupt:
print("\nGrading session interrupted")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()