Files
py-media-grader/main.py

1262 lines
49 KiB
Python

import os
import sys
import glob
import cv2
import numpy as np
import argparse
import shutil
import time
from pathlib import Path
from typing import List
class MediaGrader:
# Configuration constants
BASE_FRAME_DELAY_MS = 16 # ~30 FPS
KEY_REPEAT_RATE_SEC = 0.5 # How often to process key repeats
FAST_SEEK_ACTIVATION_TIME = 2.0 # How long to hold before fast seek
FRAME_RENDER_TIME_MS = 50 # Time to let frames render between seeks
SPEED_INCREMENT = 0.2
MIN_PLAYBACK_SPEED = 0.1
MAX_PLAYBACK_SPEED = 100.0
FAST_SEEK_MULTIPLIER = 60
IMAGE_DISPLAY_DELAY_MS = 100
# Timeline configuration
TIMELINE_HEIGHT = 60
TIMELINE_MARGIN = 20
TIMELINE_BAR_HEIGHT = 12
TIMELINE_HANDLE_SIZE = 12
TIMELINE_COLOR_BG = (80, 80, 80)
TIMELINE_COLOR_PROGRESS = (0, 120, 255)
TIMELINE_COLOR_HANDLE = (255, 255, 255)
TIMELINE_COLOR_BORDER = (200, 200, 200)
# Seek modifiers for A/D keys
SHIFT_SEEK_MULTIPLIER = 5 # SHIFT + A/D multiplier
CTRL_SEEK_MULTIPLIER = 10 # CTRL + A/D multiplier
def __init__(
self, directory: str, seek_frames: int = 30, snap_to_iframe: bool = False
):
self.directory = Path(directory)
self.seek_frames = seek_frames
self.current_index = 0
self.playback_speed = 1.0
self.media_files = []
self.current_cap = None
self.is_playing = True
self.current_frame = 0
self.total_frames = 0
# Multi-segment mode state
self.multi_segment_mode = False
self.segment_count = self.SEGMENT_COUNT # Use the class constant
self.segment_overlap_percent = self.SEGMENT_OVERLAP_PERCENT # Use the class constant
self.segment_caps = [] # List of VideoCapture objects for each segment
self.segment_frames = [] # List of current frames for each segment
self.segment_positions = [] # List of frame positions for each segment
# Timeline visibility state
self.timeline_visible = True
# Key repeat tracking with rate limiting
self.last_seek_time = 0
self.current_seek_key = None
self.key_first_press_time = 0
self.is_seeking = False
# Seeking modes
self.fine_seek_frames = 1 # Frame-by-frame
self.coarse_seek_frames = self.seek_frames # User-configurable
self.fast_seek_frames = self.seek_frames * self.FAST_SEEK_MULTIPLIER
# Current frame cache for display
self.current_display_frame = None
# Supported media extensions
self.extensions = [
".png",
".jpg",
".jpeg",
".gif",
".mp4",
".avi",
".mov",
".mkv",
]
# Mouse interaction for timeline
self.mouse_dragging = False
self.timeline_rect = None
self.window_width = 800
self.window_height = 600
# Undo functionality
self.undo_history = [] # List of (source_path, destination_path, original_index) tuples
# Watch tracking for "good look" feature
self.watched_regions = {} # Dict[file_path: List[Tuple[start_frame, end_frame]]]
self.current_watch_start = None # Frame where current viewing session started
self.last_frame_position = 0 # Track last known frame position
# Bisection navigation tracking
self.last_jump_position = {} # Dict[file_path: last_frame] for bisection reference
# Jump history for H key (undo jump)
self.jump_history = {} # Dict[file_path: List[frame_positions]] for jump undo
# Undo functionality
self.undo_history = [] # List of (source_path, destination_path, original_index) tuples
# Watch tracking for "good look" feature
self.watched_regions = {} # Dict[file_path: List[Tuple[start_frame, end_frame]]]
self.current_watch_start = None # Frame where current viewing session started
self.last_frame_position = 0 # Track last known frame position
# Bisection navigation tracking
self.last_jump_position = {} # Dict[file_path: last_frame] for bisection reference
# Jump history for H key (undo jump)
self.jump_history = {} # Dict[file_path: List[frame_positions]] for jump undo
# Multi-segment mode configuration
MULTI_SEGMENT_MODE = False
SEGMENT_COUNT = 16 # Number of video segments (2x2 grid)
SEGMENT_OVERLAP_PERCENT = 10 # Percentage overlap between segments
# Seek modifiers for A/D keys
SHIFT_SEEK_MULTIPLIER = 5 # SHIFT + A/D multiplier
def find_media_files(self) -> List[Path]:
"""Find all media files recursively in the directory"""
media_files = []
for ext in self.extensions:
pattern = str(self.directory / "**" / f"*{ext}")
files = glob.glob(pattern, recursive=True)
media_files.extend([Path(f) for f in files])
# Filter out files already in grade directories
filtered_files = []
for file in media_files:
# Check if file is not in a grade directory (1-5)
if not any(part in ["1", "2", "3", "4", "5"] for part in file.parts):
print("Adding file: ", file)
filtered_files.append(file)
return sorted(filtered_files)
def is_video(self, file_path: Path) -> bool:
"""Check if file is a video"""
video_extensions = [".mp4", ".avi", ".mov", ".mkv", ".gif"]
return file_path.suffix.lower() in video_extensions
def calculate_frame_delay(self) -> int:
"""Calculate frame delay in milliseconds based on playback speed"""
delay_ms = int(self.BASE_FRAME_DELAY_MS / self.playback_speed)
return max(1, delay_ms)
def calculate_frames_to_skip(self) -> int:
"""Calculate how many frames to skip for high-speed playback"""
if self.playback_speed <= 1.0:
return 0
elif self.playback_speed <= 2.0:
return 0
elif self.playback_speed <= 5.0:
return int(self.playback_speed - 1)
else:
return int(self.playback_speed * 2)
def load_media(self, file_path: Path) -> bool:
"""Load media file for display"""
if self.current_cap:
self.current_cap.release()
if self.is_video(file_path):
# Suppress OpenCV error messages for unsupported codecs
self.current_cap = cv2.VideoCapture(str(file_path))
if not self.current_cap.isOpened():
print(f"Warning: Could not open video file {file_path.name} (unsupported codec)")
return False
self.total_frames = int(self.current_cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.current_frame = 0
else:
self.current_cap = None
self.total_frames = 1
self.current_frame = 0
# Load initial frame
self.load_current_frame()
# Start watch tracking session for videos
self.start_watch_session()
return True
def load_current_frame(self):
"""Load the current frame into display cache"""
if self.is_video(self.media_files[self.current_index]):
if not self.current_cap:
return False
ret, frame = self.current_cap.read()
if ret:
self.current_display_frame = frame
self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES))
return True
return False
else:
frame = cv2.imread(str(self.media_files[self.current_index]))
if frame is not None:
self.current_display_frame = frame
return True
return False
def start_watch_session(self):
"""Start tracking a new viewing session"""
if self.is_video(self.media_files[self.current_index]):
self.current_watch_start = self.current_frame
self.last_frame_position = self.current_frame
def update_watch_tracking(self):
"""Update watch tracking based on current frame position"""
if not self.is_video(self.media_files[self.current_index]) or self.current_watch_start is None:
return
current_file = str(self.media_files[self.current_index])
# If we've moved more than a few frames from last position, record the watched region
if abs(self.current_frame - self.last_frame_position) > 5 or \
abs(self.current_frame - self.current_watch_start) > 30:
# Record the region we just watched
start_frame = min(self.current_watch_start, self.last_frame_position)
end_frame = max(self.current_watch_start, self.last_frame_position)
if current_file not in self.watched_regions:
self.watched_regions[current_file] = []
# Merge with existing regions if they overlap
self.add_watched_region(current_file, start_frame, end_frame)
# Start new session from current position
self.current_watch_start = self.current_frame
self.last_frame_position = self.current_frame
def add_watched_region(self, file_path, start_frame, end_frame):
"""Add a watched region, merging with existing overlapping regions"""
if file_path not in self.watched_regions:
self.watched_regions[file_path] = []
regions = self.watched_regions[file_path]
new_region = [start_frame, end_frame]
# Merge overlapping regions
merged = []
for region in regions:
if new_region[1] < region[0] or new_region[0] > region[1]:
# No overlap
merged.append(region)
else:
# Overlap, merge
new_region[0] = min(new_region[0], region[0])
new_region[1] = max(new_region[1], region[1])
merged.append(tuple(new_region))
self.watched_regions[file_path] = merged
def find_largest_unwatched_region(self):
"""Find the largest unwatched region in the current video"""
if not self.is_video(self.media_files[self.current_index]):
return None
current_file = str(self.media_files[self.current_index])
watched = self.watched_regions.get(current_file, [])
if not watched:
# No regions watched yet, return the beginning
return (0, self.total_frames // 4)
# Sort watched regions by start frame
watched.sort(key=lambda x: x[0])
# Find gaps between watched regions
gaps = []
# Gap before first watched region
if watched[0][0] > 0:
gaps.append((0, watched[0][0]))
# Gaps between watched regions
for i in range(len(watched) - 1):
gap_start = watched[i][1]
gap_end = watched[i + 1][0]
if gap_end > gap_start:
gaps.append((gap_start, gap_end))
# Gap after last watched region
if watched[-1][1] < self.total_frames:
gaps.append((watched[-1][1], self.total_frames))
if not gaps:
# Everything watched, return None
return None
# Return the largest gap
largest_gap = max(gaps, key=lambda x: x[1] - x[0])
return largest_gap
def get_sample_points(self):
"""Get standardized sample points for video navigation"""
segments = 8 # Divide video into 8 segments for sampling
segment_size = self.total_frames // segments
if segment_size == 0:
return []
return [
segment_size * 2, # 1/4 through
segment_size * 4, # 1/2 through
segment_size * 6, # 3/4 through
segment_size * 1, # 1/8 through
segment_size * 3, # 3/8 through
segment_size * 5, # 5/8 through
segment_size * 7, # 7/8 through
0 # Beginning
]
def jump_to_unwatched_region(self):
"""Jump to the next unwatched region of the video"""
if not self.is_video(self.media_files[self.current_index]):
return False
current_file = str(self.media_files[self.current_index])
# Get or initialize jump counter for this file
if not hasattr(self, 'jump_counters'):
self.jump_counters = {}
if current_file not in self.jump_counters:
self.jump_counters[current_file] = 0
# Get standardized sample points
sample_points = self.get_sample_points()
if not sample_points:
print("Video too short for sampling")
return False
current_jump = self.jump_counters[current_file]
if current_jump >= len(sample_points):
print("All sample points visited! Video fully sampled.")
return False
target_frame = sample_points[current_jump]
target_frame = min(target_frame, self.total_frames - 1)
# Track last position for bisection
self.last_jump_position[current_file] = self.current_frame
# Track jump history for H key undo
if current_file not in self.jump_history:
self.jump_history[current_file] = []
self.jump_history[current_file].append(self.current_frame)
# Jump to the target frame
if self.multi_segment_mode:
# In multi-segment mode, reposition all segments relative to the jump target
self.reposition_segments_around_frame(target_frame)
else:
# In single mode, just jump the main capture
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
self.load_current_frame()
# Increment jump counter
self.jump_counters[current_file] += 1
# Calculate percentage through video
percentage = (target_frame / self.total_frames) * 100
print(f"Sample {current_jump + 1}/{len(sample_points)}: jumped to frame {target_frame} ({percentage:.1f}% through video)")
return True
def bisect_backwards(self):
"""Bisect backwards between last position and current position"""
if not self.is_video(self.media_files[self.current_index]):
return False
current_file = str(self.media_files[self.current_index])
if current_file not in self.last_jump_position:
print("No previous position to bisect from. Use L first to establish a reference point.")
return False
last_pos = self.last_jump_position[current_file]
current_pos = self.current_frame
if last_pos == current_pos:
print("Already at the same position as last jump.")
return False
# Calculate midpoint
if last_pos < current_pos:
midpoint = (last_pos + current_pos) // 2
else:
midpoint = (current_pos + last_pos) // 2
# Update last position for further bisection
self.last_jump_position[current_file] = current_pos
# Jump to midpoint
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, midpoint)
self.load_current_frame()
percentage = (midpoint / self.total_frames) * 100
print(f"Bisected backwards to frame {midpoint} ({percentage:.1f}% through video)")
return True
def bisect_forwards(self):
"""Bisect forwards between current position and next sample point"""
if not self.is_video(self.media_files[self.current_index]):
return False
current_file = str(self.media_files[self.current_index])
# Get next sample point
if not hasattr(self, 'jump_counters') or current_file not in self.jump_counters:
print("No sampling started yet. Use L first to establish sample points.")
return False
# Use same sampling strategy as L key
sample_points = self.get_sample_points()
current_jump = self.jump_counters[current_file]
if current_jump >= len(sample_points):
print("All sample points visited. No forward reference point.")
return False
next_sample = sample_points[current_jump]
next_sample = min(next_sample, self.total_frames - 1)
current_pos = self.current_frame
# Calculate midpoint between current and next sample
midpoint = (current_pos + next_sample) // 2
if midpoint == current_pos:
print("Already at or very close to next sample point.")
return False
# Update last position for further bisection
self.last_jump_position[current_file] = current_pos
# Jump to midpoint
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, midpoint)
self.load_current_frame()
percentage = (midpoint / self.total_frames) * 100
print(f"Bisected forwards to frame {midpoint} ({percentage:.1f}% through video)")
return True
def undo_jump(self):
"""Undo the last L jump by returning to previous position"""
if not self.is_video(self.media_files[self.current_index]):
return False
current_file = str(self.media_files[self.current_index])
if current_file not in self.jump_history or not self.jump_history[current_file]:
print("No jump history to undo. Use L first to establish jump points.")
return False
# Get the last position before the most recent jump
if len(self.jump_history[current_file]) < 1:
print("No previous position to return to.")
return False
# Remove the current position from history and get the previous one
previous_position = self.jump_history[current_file].pop()
# Jump back to previous position
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, previous_position)
self.load_current_frame()
# Update last jump position for bisection reference
self.last_jump_position[current_file] = previous_position
percentage = (previous_position / self.total_frames) * 100
print(f"Undid jump: returned to frame {previous_position} ({percentage:.1f}% through video)")
return True
def toggle_multi_segment_mode(self):
"""Toggle between single and multi-segment video mode"""
if not self.is_video(self.media_files[self.current_index]):
print("Multi-segment mode only works with videos")
return False
self.multi_segment_mode = not self.multi_segment_mode
if self.multi_segment_mode:
print(f"Enabled multi-segment mode ({self.segment_count} segments)")
self.setup_segment_captures()
else:
print("Disabled multi-segment mode")
self.cleanup_segment_captures()
# Reload single video
self.load_media(self.media_files[self.current_index])
return True
def toggle_timeline(self):
"""Toggle timeline visibility"""
self.timeline_visible = not self.timeline_visible
print(f"Timeline {'visible' if self.timeline_visible else 'hidden'}")
return True
def setup_segment_captures(self):
"""Setup multiple video captures for segment mode"""
if not self.is_video(self.media_files[self.current_index]):
return
# Clean up existing segment captures
self.cleanup_segment_captures()
current_file = self.media_files[self.current_index]
# Calculate segment positions - evenly spaced through video
self.segment_positions = []
for i in range(self.segment_count):
# Position segments at 0%, 25%, 50%, 75% of video (not 0%, 33%, 66%, 100%)
position_ratio = i / self.segment_count # This gives 0, 0.25, 0.5, 0.75
start_frame = int(position_ratio * self.total_frames)
self.segment_positions.append(start_frame)
# Create video captures for each segment
for i, start_frame in enumerate(self.segment_positions):
cap = cv2.VideoCapture(str(current_file))
if cap.isOpened():
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
self.segment_caps.append(cap)
# Load initial frame for each segment
ret, frame = cap.read()
if ret:
self.segment_frames.append(frame)
else:
self.segment_frames.append(None)
else:
self.segment_caps.append(None)
self.segment_frames.append(None)
def cleanup_segment_captures(self):
"""Clean up all segment video captures"""
for cap in self.segment_caps:
if cap:
cap.release()
self.segment_caps = []
self.segment_frames = []
self.segment_positions = []
def update_segment_frames(self):
"""Update frames for all segments during playback"""
if not self.multi_segment_mode or not self.segment_caps:
return
for i, cap in enumerate(self.segment_caps):
if cap and cap.isOpened():
ret, frame = cap.read()
if ret:
self.segment_frames[i] = frame
else:
# Loop back to segment start when reaching end
cap.set(cv2.CAP_PROP_POS_FRAMES, self.segment_positions[i])
ret, frame = cap.read()
if ret:
self.segment_frames[i] = frame
def reposition_segments_around_frame(self, center_frame: int):
"""Reposition all segments around a center frame while maintaining spacing"""
if not self.multi_segment_mode or not self.segment_caps:
return
# Calculate new segment positions around the center frame
# Keep the same relative spacing but center around the new frame
segment_spacing = self.total_frames // (self.segment_count + 1)
new_positions = []
for i in range(self.segment_count):
# Spread segments around center_frame
offset = (i - (self.segment_count - 1) / 2) * segment_spacing
new_frame = int(center_frame + offset)
new_frame = max(0, min(new_frame, self.total_frames - 1))
new_positions.append(new_frame)
# Update segment positions and seek all captures
self.segment_positions = new_positions
for i, cap in enumerate(self.segment_caps):
if cap and cap.isOpened():
cap.set(cv2.CAP_PROP_POS_FRAMES, self.segment_positions[i])
# Load new frame
ret, frame = cap.read()
if ret:
self.segment_frames[i] = frame
# Reset position for next read
cap.set(cv2.CAP_PROP_POS_FRAMES, self.segment_positions[i])
def seek_all_segments(self, frames_delta: int):
"""Seek all segments by the specified number of frames"""
if not self.multi_segment_mode or not self.segment_caps:
return
for i, cap in enumerate(self.segment_caps):
if cap and cap.isOpened():
current_frame = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
segment_start = self.segment_positions[i]
segment_duration = self.total_frames // self.segment_count
segment_end = min(self.total_frames - 1, segment_start + segment_duration)
target_frame = max(segment_start, min(current_frame + frames_delta, segment_end))
cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
# Load new frame
ret, frame = cap.read()
if ret:
self.segment_frames[i] = frame
# Reset position for next read
cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
def display_current_frame(self):
"""Display the current cached frame with overlays"""
if self.multi_segment_mode:
self.display_multi_segment_frame()
else:
self.display_single_frame()
def display_single_frame(self):
"""Display single frame view"""
if self.current_display_frame is None:
return
frame = self.current_display_frame.copy()
# Add info overlay
current_file = self.media_files[self.current_index]
if self.is_video(self.media_files[self.current_index]):
info_text = f"Speed: {self.playback_speed:.1f}x | Frame: {self.current_frame}/{self.total_frames} | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}"
cv2.putText(
frame,
info_text,
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(255, 255, 255),
2,
)
cv2.putText(
frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 1
)
# Draw timeline
self.draw_timeline(frame)
cv2.imshow("Media Grader", frame)
def display_multi_segment_frame(self):
"""Display multi-segment frame view"""
if not self.segment_frames or not any(frame is not None for frame in self.segment_frames):
return
# Calculate grid dimensions (2x2 for 4 segments)
grid_rows = int(self.segment_count ** 0.5)
grid_cols = int(self.segment_count / grid_rows)
# Get reference frame size
ref_frame = next((f for f in self.segment_frames if f is not None), None)
if ref_frame is None:
return
frame_height, frame_width = ref_frame.shape[:2]
# Calculate segment display size
segment_width = frame_width // grid_cols
segment_height = frame_height // grid_rows
# Create combined display frame
combined_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
# Place each segment in the grid
for i, segment_frame in enumerate(self.segment_frames):
if segment_frame is None:
continue
row = i // grid_cols
col = i % grid_cols
# Resize segment frame to fit grid cell
resized_segment = cv2.resize(segment_frame, (segment_width, segment_height))
# Calculate position in combined frame
y_start = row * segment_height
y_end = y_start + segment_height
x_start = col * segment_width
x_end = x_start + segment_width
# Place segment in combined frame
combined_frame[y_start:y_end, x_start:x_end] = resized_segment
# Add segment label
segment_position = int((self.segment_positions[i] / self.total_frames) * 100)
label_text = f"Seg {i+1}: {segment_position}%"
cv2.putText(
combined_frame,
label_text,
(x_start + 5, y_start + 20),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(255, 255, 255),
2,
)
cv2.putText(
combined_frame,
label_text,
(x_start + 5, y_start + 20),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 0, 0),
1,
)
# Draw grid borders
cv2.rectangle(combined_frame, (x_start, y_start), (x_end-1, y_end-1), (128, 128, 128), 1)
# Add overall info overlay
current_file = self.media_files[self.current_index]
info_text = f"MULTI-SEGMENT | Speed: {self.playback_speed:.1f}x | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}"
cv2.putText(
combined_frame,
info_text,
(10, frame_height - 20),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 255, 255),
2,
)
cv2.putText(
combined_frame,
info_text,
(10, frame_height - 20),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 0, 0),
1
)
# Draw multi-segment timeline
self.draw_multi_segment_timeline(combined_frame)
cv2.imshow("Media Grader", combined_frame)
def draw_multi_segment_timeline(self, frame):
"""Draw timeline showing all segment positions"""
if not self.is_video(self.media_files[self.current_index]) or not self.segment_caps or not self.timeline_visible:
return
height, width = frame.shape[:2]
# Timeline area - smaller than normal timeline
timeline_height = 30
timeline_y = height - timeline_height - 25 # Leave space for info text
timeline_margin = 20
timeline_bar_height = 8
# Draw timeline background
cv2.rectangle(frame, (0, timeline_y), (width, timeline_y + timeline_height), (40, 40, 40), -1)
# Calculate timeline bar position
bar_y = timeline_y + (timeline_height - timeline_bar_height) // 2
bar_x_start = timeline_margin
bar_x_end = width - timeline_margin
bar_width = bar_x_end - bar_x_start
# Draw timeline background bar
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + timeline_bar_height), (80, 80, 80), -1)
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + timeline_bar_height), (200, 200, 200), 1)
# Draw segment markers
if self.total_frames > 0:
for i, segment_pos in enumerate(self.segment_positions):
# Calculate position on timeline
progress = segment_pos / max(1, self.total_frames - 1)
marker_x = bar_x_start + int(bar_width * progress)
# Draw segment marker
color = (0, 255, 100) if i < len(self.segment_caps) and self.segment_caps[i] else (100, 100, 100)
cv2.circle(frame, (marker_x, bar_y + timeline_bar_height // 2), 4, color, -1)
cv2.circle(frame, (marker_x, bar_y + timeline_bar_height // 2), 4, (255, 255, 255), 1)
# Add segment number
cv2.putText(frame, str(i+1), (marker_x - 3, bar_y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 255), 1)
def draw_timeline(self, frame):
"""Draw timeline at the bottom of the frame"""
# Only draw timeline for video files in single mode and when visible
if not self.is_video(self.media_files[self.current_index]) or self.multi_segment_mode or not self.timeline_visible:
return
height, width = frame.shape[:2]
self.window_height = height
self.window_width = width
# Timeline background area
timeline_y = height - self.TIMELINE_HEIGHT
cv2.rectangle(frame, (0, timeline_y), (width, height), (40, 40, 40), -1)
# Calculate timeline bar position
bar_y = timeline_y + (self.TIMELINE_HEIGHT - self.TIMELINE_BAR_HEIGHT) // 2
bar_x_start = self.TIMELINE_MARGIN
bar_x_end = width - self.TIMELINE_MARGIN
bar_width = bar_x_end - bar_x_start
self.timeline_rect = (bar_x_start, bar_y, bar_width, self.TIMELINE_BAR_HEIGHT)
# Draw timeline background
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BG, -1)
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BORDER, 1)
# Draw progress for videos
if self.total_frames > 0:
progress = self.current_frame / max(1, self.total_frames - 1)
progress_width = int(bar_width * progress)
if progress_width > 0:
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_start + progress_width, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_PROGRESS, -1)
# Draw handle
handle_x = bar_x_start + progress_width
handle_y = bar_y + self.TIMELINE_BAR_HEIGHT // 2
cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_HANDLE, -1)
cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_BORDER, 2)
def mouse_callback(self, event, x, y, flags, param):
"""Handle mouse events for timeline interaction"""
if not self.timeline_rect or not self.is_video(self.media_files[self.current_index]) or self.multi_segment_mode:
return
bar_x_start, bar_y, bar_width, bar_height = self.timeline_rect
bar_x_end = bar_x_start + bar_width
# Check if mouse is over timeline
if bar_y <= y <= bar_y + bar_height + 10: # Add some extra height for easier clicking
if event == cv2.EVENT_LBUTTONDOWN:
if bar_x_start <= x <= bar_x_end:
self.mouse_dragging = True
self.seek_to_position(x, bar_x_start, bar_width)
elif event == cv2.EVENT_MOUSEMOVE and self.mouse_dragging:
if bar_x_start <= x <= bar_x_end:
self.seek_to_position(x, bar_x_start, bar_width)
elif event == cv2.EVENT_LBUTTONUP:
self.mouse_dragging = False
def seek_to_position(self, mouse_x, bar_x_start, bar_width):
"""Seek to position based on mouse click/drag on timeline"""
if not self.current_cap or not self.is_video(self.media_files[self.current_index]):
return
# Calculate position ratio
relative_x = mouse_x - bar_x_start
position_ratio = max(0, min(1, relative_x / bar_width))
# Calculate target frame
target_frame = int(position_ratio * (self.total_frames - 1))
target_frame = max(0, min(target_frame, self.total_frames - 1))
# Seek to target frame
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
self.load_current_frame()
def advance_frame(self):
"""Advance to next frame(s) based on playback speed"""
if (
not self.is_video(self.media_files[self.current_index])
or not self.is_playing
):
return
if self.multi_segment_mode:
# Update all segment frames
self.update_segment_frames()
return True
else:
frames_to_skip = self.calculate_frames_to_skip()
for _ in range(frames_to_skip + 1):
ret, frame = self.current_cap.read()
if not ret:
return False
self.current_display_frame = frame
self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES))
# Update watch tracking
self.update_watch_tracking()
return True
def seek_video(self, frames_delta: int):
"""Seek video by specified number of frames"""
if not self.is_video(self.media_files[self.current_index]):
return
if self.multi_segment_mode:
self.seek_all_segments(frames_delta)
else:
if not self.current_cap:
return
target_frame = max(
0, min(self.current_frame + frames_delta, self.total_frames - 1)
)
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
self.load_current_frame()
def process_seek_key(self, key: int) -> bool:
"""Process seeking keys with proper rate limiting"""
current_time = time.time()
seek_direction = 0
seek_amount = 0
seek_multiplier = 1 # Default multiplier
# Check for A/D keys with modifiers
if key == ord("a") or key == ord("A"):
seek_direction = -1
# SHIFT+A gives uppercase A
if key == ord("A"):
seek_multiplier = self.SHIFT_SEEK_MULTIPLIER
elif key == ord("d") or key == ord("D"):
seek_direction = 1
# SHIFT+D gives uppercase D
if key == ord("D"):
seek_multiplier = self.SHIFT_SEEK_MULTIPLIER
elif key == 1: # CTRL+A
seek_direction = -1
seek_multiplier = self.CTRL_SEEK_MULTIPLIER
elif key == 4: # CTRL+D
seek_direction = 1
seek_multiplier = self.CTRL_SEEK_MULTIPLIER
elif key == ord(","):
seek_amount = -self.fine_seek_frames
elif key == ord("."):
seek_amount = self.fine_seek_frames
else:
if self.current_seek_key is not None:
self.current_seek_key = None
self.is_seeking = False
return False
# Handle fine seeking (comma/period) - always immediate
if seek_amount != 0:
self.seek_video(seek_amount)
return True
# Handle A/D key seeking with rate limiting and modifiers
if seek_direction != 0:
if self.current_seek_key != key:
self.current_seek_key = key
self.key_first_press_time = current_time
self.last_seek_time = current_time
self.is_seeking = True
seek_amount = seek_direction * self.coarse_seek_frames * seek_multiplier
self.seek_video(seek_amount)
return True
elif self.is_seeking:
time_since_last_seek = current_time - self.last_seek_time
time_held = current_time - self.key_first_press_time
if time_since_last_seek >= self.KEY_REPEAT_RATE_SEC:
self.last_seek_time = current_time
if time_held > self.FAST_SEEK_ACTIVATION_TIME:
seek_amount = seek_direction * self.fast_seek_frames * seek_multiplier
else:
seek_amount = seek_direction * self.coarse_seek_frames * seek_multiplier
self.seek_video(seek_amount)
return True
return False
def grade_media(self, grade: int):
"""Move current media file to grade directory"""
if not self.media_files or grade < 1 or grade > 5:
return
current_file = self.media_files[self.current_index]
grade_dir = self.directory / str(grade)
# Create grade directory if it doesn't exist
grade_dir.mkdir(exist_ok=True)
destination = grade_dir / current_file.name
counter = 1
while destination.exists():
stem = current_file.stem
suffix = current_file.suffix
destination = grade_dir / f"{stem}_{counter}{suffix}"
counter += 1
# Track this move for undo functionality BEFORE making changes
self.undo_history.append((str(destination), str(current_file), self.current_index))
# Release video capture to unlock the file before moving
if self.current_cap:
self.current_cap.release()
self.current_cap = None
# Also release segment captures if in multi-segment mode
if self.multi_segment_mode:
self.cleanup_segment_captures()
try:
shutil.move(str(current_file), str(destination))
print(f"Moved {current_file.name} to grade {grade}")
self.media_files.pop(self.current_index)
if self.current_index >= len(self.media_files):
self.current_index = 0
if not self.media_files:
print("No more media files to grade!")
return False
except Exception as e:
print(f"Error moving file: {e}")
# Remove the undo entry since the move failed
self.undo_history.pop()
return True
def undo_last_action(self):
"""Undo the last grading action by moving file back and restoring to media list"""
if not self.undo_history:
print("No actions to undo!")
return False
# Get the last action
moved_file_path, original_file_path, original_index = self.undo_history.pop()
# Release video capture to unlock any current file before moving
if self.current_cap:
self.current_cap.release()
self.current_cap = None
try:
# Move the file back to its original location
shutil.move(moved_file_path, original_file_path)
# Add the file back to the media list at its original position
original_file = Path(original_file_path)
# Insert the file back at the appropriate position
if original_index <= len(self.media_files):
self.media_files.insert(original_index, original_file)
else:
self.media_files.append(original_file)
# Navigate to the restored file
print("Navigating to: ", original_index)
self.current_index = original_index
print(f"Undone: Moved {original_file.name} back from grade folder")
return True
except Exception as e:
print(f"Error undoing action: {e}")
# If undo failed, put the action back in history
self.undo_history.append((moved_file_path, original_file_path, original_index))
return False
def run(self):
"""Main grading loop"""
self.media_files = self.find_media_files()
if not self.media_files:
print("No media files found in directory!")
return
print(f"Found {len(self.media_files)} media files")
print("Controls:")
print(" Space: Pause/Play")
print(" A/D: Seek backward/forward (hold for FAST seek)")
print(" Shift+A/D: Seek backward/forward (5x multiplier)")
print(" Ctrl+A/D: Seek backward/forward (10x multiplier)")
print(" , / . : Frame-by-frame seek (fine control)")
print(" W/S: Decrease/Increase playback speed")
print(" G: Toggle multi-segment mode (videos only)")
print(" 1-5: Grade and move file")
print(" N: Next file")
print(" P: Previous file")
print(" U: Undo last grading action")
print(" L: Sample video at key points (videos only)")
print(" H: Toggle timeline visibility")
print(" J: Bisect backwards from current position (videos only, disabled in multi-segment)")
print(" K: Bisect forwards toward next sample (videos only, disabled in multi-segment)")
print(" Q/ESC: Quit")
cv2.namedWindow("Media Grader", cv2.WINDOW_NORMAL)
cv2.setMouseCallback("Media Grader", self.mouse_callback)
while self.media_files and self.current_index < len(self.media_files):
current_file = self.media_files[self.current_index]
if not self.load_media(current_file):
print(f"Could not load {current_file}")
self.current_index += 1
continue
# Setup multi-segment mode if enabled and this is a video
if self.multi_segment_mode and self.is_video(current_file):
self.setup_segment_captures()
window_title = f"Media Grader - {current_file.name} ({self.current_index + 1}/{len(self.media_files)})"
cv2.setWindowTitle("Media Grader", window_title)
while True:
self.display_current_frame()
if self.is_video(current_file):
if self.is_seeking:
delay = self.FRAME_RENDER_TIME_MS
else:
delay = self.calculate_frame_delay()
else:
delay = self.IMAGE_DISPLAY_DELAY_MS
key = cv2.waitKey(delay) & 0xFF
if key == ord("q") or key == 27:
return
elif key == ord(" "):
self.is_playing = not self.is_playing
elif key == ord("s"):
self.playback_speed = max(
self.MIN_PLAYBACK_SPEED,
self.playback_speed - self.SPEED_INCREMENT,
)
elif key == ord("w"):
self.playback_speed = min(
self.MAX_PLAYBACK_SPEED,
self.playback_speed + self.SPEED_INCREMENT,
)
elif self.process_seek_key(key):
pass
elif key == ord("n"):
break
elif key == ord("p"):
self.current_index = max(0, self.current_index - 1)
print("Navigating to: ", self.current_index)
break
elif key == ord("u"):
if self.undo_last_action():
# File was restored, reload it
break
elif key == ord("l"):
# Jump to largest unwatched region (works in both modes)
self.jump_to_unwatched_region()
elif key == ord("j"):
if not self.multi_segment_mode:
self.bisect_backwards()
else:
print("Navigation keys (H/J/K/L) disabled in multi-segment mode")
elif key == ord("k"):
if not self.multi_segment_mode:
self.bisect_forwards()
else:
print("Navigation keys (H/J/K/L) disabled in multi-segment mode")
elif key == ord("h"): # Toggle timeline visibility
self.toggle_timeline()
elif key == ord("g"):
self.toggle_multi_segment_mode()
elif key in [ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]:
grade = int(chr(key))
if not self.grade_media(grade):
return
break
elif key == 255:
if self.is_seeking and self.current_seek_key is not None:
self.process_seek_key(self.current_seek_key)
if (
self.is_playing
and self.is_video(current_file)
and not self.is_seeking
):
if not self.advance_frame():
# Video reached the end, restart it instead of navigating
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
self.current_frame = 0
self.load_current_frame()
if key not in [ord("p"), ord("u"), ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]:
print("Navigating to (pu12345): ", self.current_index)
self.current_index += 1
if self.current_cap:
self.current_cap.release()
self.cleanup_segment_captures()
cv2.destroyAllWindows()
print("Grading session complete!")
def main():
parser = argparse.ArgumentParser(
description="Media Grader - Grade media files by moving them to numbered folders"
)
parser.add_argument(
"directory",
nargs="?",
default=".",
help="Directory to scan for media files (default: current directory)",
)
parser.add_argument(
"--seek-frames",
type=int,
default=30,
help="Number of frames to seek when using arrow keys (default: 30)",
)
parser.add_argument(
"--snap-to-iframe",
action="store_true",
help="Snap to I-frames when seeking backward for better performance",
)
args = parser.parse_args()
if not os.path.isdir(args.directory):
print(f"Error: {args.directory} is not a valid directory")
sys.exit(1)
grader = MediaGrader(args.directory, args.seek_frames, args.snap_to_iframe)
try:
grader.run()
except KeyboardInterrupt:
print("\nGrading session interrupted")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()