Files
py-media-grader/main.py
PhatPhuckDave bae760837c Refactor display logic in MediaGrader to maintain aspect ratio and improve frame rendering
This commit enhances the display functionality by calculating the available height for video timelines, resizing frames while preserving their aspect ratio, and centering them on a canvas. The window resizing logic has been updated to ensure the entire monitor space is utilized effectively, improving the visual presentation of media files.
2025-09-26 10:59:15 +02:00

1338 lines
52 KiB
Python

import os
import sys
import glob
import cv2
import numpy as np
import argparse
import shutil
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import List
class Cv2BufferedCap:
"""Buffered wrapper around cv2.VideoCapture that handles frame loading, seeking, and caching correctly"""
def __init__(self, video_path, backend=None):
self.video_path = video_path
self.cap = cv2.VideoCapture(str(video_path), backend)
if not self.cap.isOpened():
raise ValueError(f"Could not open video: {video_path}")
# Video properties
self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Current position tracking
self.current_frame = 0
def get_frame(self, frame_number):
"""Get frame at specific index - always accurate"""
# Clamp frame number to valid range
frame_number = max(0, min(frame_number, self.total_frames - 1))
# Optimize for sequential reading (next frame)
if frame_number == self.current_frame + 1:
ret, frame = self.cap.read()
else:
# Seek for non-sequential access
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
ret, frame = self.cap.read()
if ret:
self.current_frame = frame_number
return frame
else:
raise ValueError(f"Failed to read frame {frame_number}")
def advance_frame(self, frames=1):
"""Advance by specified number of frames"""
new_frame = self.current_frame + frames
return self.get_frame(new_frame)
def release(self):
"""Release the video capture"""
if self.cap:
self.cap.release()
def isOpened(self):
"""Check if capture is opened"""
return self.cap and self.cap.isOpened()
class MediaGrader:
# Configuration constants - matching croppa implementation
TARGET_FPS = 80 # Target FPS for speed calculations
SPEED_INCREMENT = 0.1
MIN_PLAYBACK_SPEED = 0.05
MAX_PLAYBACK_SPEED = 1.0
# Legacy constants for compatibility
KEY_REPEAT_RATE_SEC = 0.5
FAST_SEEK_ACTIVATION_TIME = 2.0
FAST_SEEK_MULTIPLIER = 60
MONITOR_WIDTH = 2560
MONITOR_HEIGHT = 1440
TIMELINE_HEIGHT = 60
TIMELINE_MARGIN = 20
TIMELINE_BAR_HEIGHT = 12
TIMELINE_HANDLE_SIZE = 12
TIMELINE_COLOR_BG = (80, 80, 80)
TIMELINE_COLOR_PROGRESS = (0, 120, 255)
TIMELINE_COLOR_HANDLE = (255, 255, 255)
TIMELINE_COLOR_BORDER = (200, 200, 200)
SHIFT_SEEK_MULTIPLIER = 5
CTRL_SEEK_MULTIPLIER = 10
SEGMENT_COUNT = 16
def __init__(
self, directory: str, seek_frames: int = 30, snap_to_iframe: bool = False
):
self.directory = Path(directory)
self.seek_frames = seek_frames
self.current_index = 0
self.playback_speed = 1.0
self.media_files = []
self.current_cap = None
self.is_playing = True
self.current_frame = 0
self.total_frames = 0
self.multi_segment_mode = False
self.segment_count = self.SEGMENT_COUNT
self.segment_caps = []
self.segment_frames = []
self.segment_positions = []
self.segment_end_positions = [] # Track where each segment should loop back to
self.timeline_visible = True
self.last_seek_time = 0
self.current_seek_key = None
self.key_first_press_time = 0
self.is_seeking = False
self.fine_seek_frames = 1
self.coarse_seek_frames = self.seek_frames
self.fast_seek_frames = self.seek_frames * self.FAST_SEEK_MULTIPLIER
self.current_display_frame = None
self.extensions = [
".png",
".jpg",
".jpeg",
".gif",
".mp4",
".avi",
".mov",
".mkv",
]
self.mouse_dragging = False
self.timeline_rect = None
self.undo_history = []
self.watched_regions = {}
self.current_watch_start = None
self.last_frame_position = 0
self.last_jump_position = {}
self.jump_history = {}
self.thread_pool = ThreadPoolExecutor(max_workers=4)
def display_with_aspect_ratio(self, frame):
"""Display frame while maintaining aspect ratio and maximizing screen usage"""
if frame is None:
return
# Get frame dimensions
frame_height, frame_width = frame.shape[:2]
# Calculate available height (subtract timeline height for videos)
timeline_height = self.TIMELINE_HEIGHT if self.is_video(self.media_files[self.current_index]) else 0
available_height = self.MONITOR_HEIGHT - timeline_height
# Calculate scale to fit within monitor bounds while maintaining aspect ratio
scale_x = self.MONITOR_WIDTH / frame_width
scale_y = available_height / frame_height
scale = min(scale_x, scale_y)
# Calculate display dimensions
display_width = int(frame_width * scale)
display_height = int(frame_height * scale)
# Resize the frame to maintain aspect ratio
if scale != 1.0:
resized_frame = cv2.resize(frame, (display_width, display_height), interpolation=cv2.INTER_AREA)
else:
resized_frame = frame
# Create canvas with proper dimensions
canvas_height = self.MONITOR_HEIGHT
canvas_width = self.MONITOR_WIDTH
canvas = np.zeros((canvas_height, canvas_width, 3), dtype=np.uint8)
# Center the resized frame on canvas
start_y = (available_height - display_height) // 2
start_x = (self.MONITOR_WIDTH - display_width) // 2
# Ensure frame fits within canvas bounds
end_y = min(start_y + display_height, available_height)
end_x = min(start_x + display_width, self.MONITOR_WIDTH)
actual_height = end_y - start_y
actual_width = end_x - start_x
if actual_height > 0 and actual_width > 0:
canvas[start_y:end_y, start_x:end_x] = resized_frame[:actual_height, :actual_width]
# Resize window to full monitor size
cv2.resizeWindow("Media Grader", self.MONITOR_WIDTH, self.MONITOR_HEIGHT)
# Center the window on screen
x_position = 0
y_position = 0
cv2.moveWindow("Media Grader", x_position, y_position)
# Display the canvas with properly aspect-ratioed frame
cv2.imshow("Media Grader", canvas)
def find_media_files(self) -> List[Path]:
"""Find all media files recursively in the directory"""
media_files = []
for ext in self.extensions:
pattern = str(self.directory / "**" / f"*{ext}")
files = glob.glob(pattern, recursive=True)
media_files.extend([Path(f) for f in files])
# Filter out files already in grade directories
filtered_files = []
for file in media_files:
# Check if file is not in a grade directory (1-5)
if not any(part in ["1", "2", "3", "4", "5"] for part in file.parts):
print("Adding file: ", file)
filtered_files.append(file)
return sorted(filtered_files)
def is_video(self, file_path: Path) -> bool:
"""Check if file is a video"""
video_extensions = [".mp4", ".avi", ".mov", ".mkv", ".gif"]
return file_path.suffix.lower() in video_extensions
def calculate_frame_delay(self) -> int:
"""Calculate frame delay in milliseconds based on playback speed"""
# Round to 2 decimals to handle floating point precision issues
speed = round(self.playback_speed, 2)
if speed >= 1.0:
# Speed >= 1: maximum FPS (no delay)
return 1
else:
# Speed < 1: scale FPS based on speed
# Formula: fps = TARGET_FPS * speed, so delay = 1000 / fps
target_fps = self.TARGET_FPS * speed
delay_ms = int(1000 / target_fps)
return max(1, delay_ms)
def load_media(self, file_path: Path) -> bool:
"""Load media file for display"""
if self.current_cap:
self.current_cap.release()
if self.is_video(file_path):
try:
# Use Cv2BufferedCap for better frame handling
self.current_cap = Cv2BufferedCap(file_path)
self.total_frames = self.current_cap.total_frames
self.current_frame = 0
print(f"Loaded: {file_path.name} | Frames: {self.total_frames} | FPS: {self.current_cap.fps:.2f}")
except Exception as e:
print(f"Warning: Could not open video file {file_path.name}: {e}")
return False
else:
self.current_cap = None
self.total_frames = 1
self.current_frame = 0
# Load initial frame
self.load_current_frame()
# Start watch tracking session for videos
self.start_watch_session()
return True
def load_current_frame(self):
"""Load the current frame into display cache"""
if self.is_video(self.media_files[self.current_index]):
if not self.current_cap:
return False
try:
# Use Cv2BufferedCap to get frame
self.current_display_frame = self.current_cap.get_frame(self.current_frame)
return True
except Exception as e:
print(f"Failed to load frame {self.current_frame}: {e}")
return False
else:
frame = cv2.imread(str(self.media_files[self.current_index]))
if frame is not None:
self.current_display_frame = frame
return True
return False
def start_watch_session(self):
"""Start tracking a new viewing session"""
if self.is_video(self.media_files[self.current_index]):
self.current_watch_start = self.current_frame
self.last_frame_position = self.current_frame
def update_watch_tracking(self):
"""Update watch tracking based on current frame position"""
if not self.is_video(self.media_files[self.current_index]) or self.current_watch_start is None:
return
current_file = str(self.media_files[self.current_index])
# If we've moved more than a few frames from last position, record the watched region
if abs(self.current_frame - self.last_frame_position) > 5 or \
abs(self.current_frame - self.current_watch_start) > 30:
# Record the region we just watched
start_frame = min(self.current_watch_start, self.last_frame_position)
end_frame = max(self.current_watch_start, self.last_frame_position)
if current_file not in self.watched_regions:
self.watched_regions[current_file] = []
# Merge with existing regions if they overlap
self.add_watched_region(current_file, start_frame, end_frame)
# Start new session from current position
self.current_watch_start = self.current_frame
self.last_frame_position = self.current_frame
def add_watched_region(self, file_path, start_frame, end_frame):
"""Add a watched region, merging with existing overlapping regions"""
if file_path not in self.watched_regions:
self.watched_regions[file_path] = []
regions = self.watched_regions[file_path]
new_region = [start_frame, end_frame]
# Merge overlapping regions
merged = []
for region in regions:
if new_region[1] < region[0] or new_region[0] > region[1]:
# No overlap
merged.append(region)
else:
# Overlap, merge
new_region[0] = min(new_region[0], region[0])
new_region[1] = max(new_region[1], region[1])
merged.append(tuple(new_region))
self.watched_regions[file_path] = merged
def get_sample_points(self):
"""Get standardized sample points for video navigation"""
segments = 8 # Divide video into 8 segments for sampling
segment_size = self.total_frames // segments
if segment_size == 0:
return []
return [
segment_size * 2, # 1/4 through
segment_size * 4, # 1/2 through
segment_size * 6, # 3/4 through
segment_size * 1, # 1/8 through
segment_size * 3, # 3/8 through
segment_size * 5, # 5/8 through
segment_size * 7, # 7/8 through
0 # Beginning
]
def jump_to_unwatched_region(self):
"""Jump to the next unwatched region of the video"""
if not self.is_video(self.media_files[self.current_index]):
return False
current_file = str(self.media_files[self.current_index])
# Get or initialize jump counter for this file
if not hasattr(self, 'jump_counters'):
self.jump_counters = {}
if current_file not in self.jump_counters:
self.jump_counters[current_file] = 0
# Get standardized sample points
sample_points = self.get_sample_points()
if not sample_points:
print("Video too short for sampling")
return False
current_jump = self.jump_counters[current_file]
if current_jump >= len(sample_points):
print("All sample points visited! Video fully sampled.")
return False
target_frame = sample_points[current_jump]
target_frame = min(target_frame, self.total_frames - 1)
# Track last position for bisection
self.last_jump_position[current_file] = self.current_frame
# Track jump history for H key undo
if current_file not in self.jump_history:
self.jump_history[current_file] = []
self.jump_history[current_file].append(self.current_frame)
# Jump to the target frame
if not self.multi_segment_mode:
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
self.load_current_frame()
# Increment jump counter
self.jump_counters[current_file] += 1
# Calculate percentage through video
percentage = (target_frame / self.total_frames) * 100
print(f"Sample {current_jump + 1}/{len(sample_points)}: jumped to frame {target_frame} ({percentage:.1f}% through video)")
return True
def bisect_backwards(self):
"""Bisect backwards between last position and current position"""
if not self.is_video(self.media_files[self.current_index]):
return False
current_file = str(self.media_files[self.current_index])
if current_file not in self.last_jump_position:
print("No previous position to bisect from. Use L first to establish a reference point.")
return False
last_pos = self.last_jump_position[current_file]
current_pos = self.current_frame
if last_pos == current_pos:
print("Already at the same position as last jump.")
return False
# Calculate midpoint
if last_pos < current_pos:
midpoint = (last_pos + current_pos) // 2
else:
midpoint = (current_pos + last_pos) // 2
# Update last position for further bisection
self.last_jump_position[current_file] = current_pos
# Jump to midpoint
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, midpoint)
self.load_current_frame()
percentage = (midpoint / self.total_frames) * 100
print(f"Bisected backwards to frame {midpoint} ({percentage:.1f}% through video)")
return True
def bisect_forwards(self):
"""Bisect forwards between current position and next sample point"""
if not self.is_video(self.media_files[self.current_index]):
return False
current_file = str(self.media_files[self.current_index])
# Get next sample point
if not hasattr(self, 'jump_counters') or current_file not in self.jump_counters:
print("No sampling started yet. Use L first to establish sample points.")
return False
# Use same sampling strategy as L key
sample_points = self.get_sample_points()
current_jump = self.jump_counters[current_file]
if current_jump >= len(sample_points):
print("All sample points visited. No forward reference point.")
return False
next_sample = sample_points[current_jump]
next_sample = min(next_sample, self.total_frames - 1)
current_pos = self.current_frame
# Calculate midpoint between current and next sample
midpoint = (current_pos + next_sample) // 2
if midpoint == current_pos:
print("Already at or very close to next sample point.")
return False
# Update last position for further bisection
self.last_jump_position[current_file] = current_pos
# Jump to midpoint
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, midpoint)
self.load_current_frame()
percentage = (midpoint / self.total_frames) * 100
print(f"Bisected forwards to frame {midpoint} ({percentage:.1f}% through video)")
return True
def toggle_multi_segment_mode(self):
"""Toggle between single and multi-segment video mode"""
if not self.is_video(self.media_files[self.current_index]):
print("Multi-segment mode only works with videos")
return False
self.multi_segment_mode = not self.multi_segment_mode
if self.multi_segment_mode:
print(f"Enabling multi-segment mode ({self.segment_count} segments)...")
try:
self.setup_segment_captures()
print("Multi-segment mode enabled successfully")
except Exception as e:
print(f"Failed to setup multi-segment mode: {e}")
import traceback
traceback.print_exc()
self.multi_segment_mode = False
return False
else:
print("Disabling multi-segment mode...")
self.cleanup_segment_captures()
# Reload single video
self.load_media(self.media_files[self.current_index])
print("Multi-segment mode disabled")
return True
def toggle_timeline(self):
"""Toggle timeline visibility"""
self.timeline_visible = not self.timeline_visible
print(f"Timeline {'visible' if self.timeline_visible else 'hidden'}")
return True
def setup_segment_captures(self):
if not self.is_video(self.media_files[self.current_index]):
return
# Calculate actual memory usage based on frame dimensions
frame_width = int(self.current_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(self.current_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_mb = frame_width * frame_height * 3 / (1024 * 1024)
# Memory-based limits (not frame count)
if total_mb > 8000: # 8GB limit
print(f"Video too large for preloading!")
print(f" Resolution: {frame_width}x{frame_height}")
print(f" Frames: {self.total_frames} frames would use {total_mb:.1f}GB RAM")
print(f"Multi-segment mode not available for videos requiring >8GB RAM")
return
elif total_mb > 500: # 500MB warning
print(f"Large video detected:")
print(f" Resolution: {frame_width}x{frame_height}")
print(f" Memory: {self.total_frames} frames will use {total_mb:.0f}GB RAM")
print("Press any key to continue or 'q' to cancel...")
# Note: In a real implementation, you'd want proper input handling here
start_time = time.time()
print(f"Setting up {self.segment_count} segments with video preloading...")
try:
print("Cleaning up existing captures...")
self.cleanup_segment_captures()
current_file = self.media_files[self.current_index]
print(f"Working with file: {current_file}")
# Initialize arrays
print("Initializing arrays...")
self.segment_caps = [None] * self.segment_count # Keep for compatibility
self.segment_frames = [None] * self.segment_count
self.segment_positions = []
self.segment_end_positions = []
self.segment_current_frames = [0] * self.segment_count # Track current frame for each segment
# Calculate target positions
print("Calculating segment positions...")
if self.total_frames <= 1:
print("Error: Video has insufficient frames for multi-segment mode")
return
for i in range(self.segment_count):
if self.segment_count <= 1:
position_ratio = 0
end_ratio = 1.0
else:
position_ratio = i / (self.segment_count - 1)
end_ratio = (i + 1) / (self.segment_count - 1) if i < self.segment_count - 1 else 1.0
start_frame = int(position_ratio * (self.total_frames - 1))
end_frame = int(end_ratio * (self.total_frames - 1))
start_frame = max(0, min(start_frame, self.total_frames - 1))
end_frame = max(start_frame + 1, min(end_frame, self.total_frames - 1)) # Ensure at least 1 frame per segment
self.segment_positions.append(start_frame)
self.segment_end_positions.append(end_frame)
self.segment_current_frames[i] = start_frame # Start each segment at its position
print(f"Segment positions: {self.segment_positions}")
print(f"Segment end positions: {self.segment_end_positions}")
# Preload the entire video into memory - simple and fast
print("Preloading entire video into memory...")
preload_start = time.time()
if self.current_cap and self.current_cap.isOpened():
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
# Simple, fast sequential read
frames = []
frame_count = 0
while frame_count < self.total_frames:
ret, frame = self.current_cap.read()
if ret and frame is not None:
frames.append(frame)
frame_count += 1
else:
break
self.video_frame_cache = frames
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, self.current_frame)
else:
self.video_frame_cache = []
preload_time = (time.time() - preload_start) * 1000
print(f"Video preloading: {preload_time:.1f}ms ({len(self.video_frame_cache)} frames)")
# Initialize segment frames from the preloaded cache
print("Initializing segment frames...")
for i in range(self.segment_count):
if self.segment_current_frames[i] < len(self.video_frame_cache):
self.segment_frames[i] = self.video_frame_cache[self.segment_current_frames[i]]
except Exception as e:
print(f"Error in setup: {e}")
import traceback
traceback.print_exc()
return
total_time = time.time() - start_time
print(f"Total setup time: {total_time * 1000:.1f}ms")
# Report success
successful_segments = sum(1 for frame in self.segment_frames if frame is not None)
print(f"Successfully preloaded video with {successful_segments}/{self.segment_count} active segments")
def cleanup_segment_captures(self):
"""Clean up all segment video captures and preloaded cache"""
for cap in self.segment_caps:
if cap:
cap.release()
self.segment_caps = []
self.segment_frames = []
self.segment_positions = []
self.segment_end_positions = []
if hasattr(self, 'video_frame_cache'):
self.video_frame_cache = []
if hasattr(self, 'segment_current_frames'):
self.segment_current_frames = []
def update_segment_frames(self):
"""Update frames for segments - each segment loops within its own range"""
if not self.multi_segment_mode or not self.segment_frames or not hasattr(self, 'video_frame_cache'):
return
for i in range(len(self.segment_frames)):
if self.segment_frames[i] is not None and self.video_frame_cache:
# Advance to next frame in this segment
self.segment_current_frames[i] += 1
# Get the segment boundaries
start_frame = self.segment_positions[i]
end_frame = self.segment_end_positions[i]
# Loop within the segment bounds
if self.segment_current_frames[i] > end_frame:
# Loop back to start of segment
self.segment_current_frames[i] = start_frame
# Ensure we don't go beyond the video cache
if self.segment_current_frames[i] < len(self.video_frame_cache):
# Direct reference - no copy needed for display
self.segment_frames[i] = self.video_frame_cache[self.segment_current_frames[i]]
def display_current_frame(self):
"""Display the current cached frame with overlays"""
if self.multi_segment_mode:
self.display_multi_segment_frame()
else:
self.display_single_frame()
def display_single_frame(self):
"""Display single frame view"""
if self.current_display_frame is None:
return
frame = self.current_display_frame.copy()
# Add info overlay
current_file = self.media_files[self.current_index]
if self.is_video(self.media_files[self.current_index]):
info_text = f"Speed: {self.playback_speed:.1f}x | Frame: {self.current_frame}/{self.total_frames} | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}"
cv2.putText(
frame,
info_text,
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(255, 255, 255),
2,
)
cv2.putText(
frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 1
)
# Draw timeline
self.draw_timeline(frame)
# Display with proper aspect ratio
self.display_with_aspect_ratio(frame)
def display_multi_segment_frame(self):
"""Display multi-segment frame view"""
if not self.segment_frames or not any(frame is not None for frame in self.segment_frames):
return
# Calculate grid dimensions (2x2 for 4 segments)
grid_rows = int(self.segment_count ** 0.5)
grid_cols = int(self.segment_count / grid_rows)
# Get reference frame size
ref_frame = next((f for f in self.segment_frames if f is not None), None)
if ref_frame is None:
return
frame_height, frame_width = ref_frame.shape[:2]
# Calculate segment display size
segment_width = frame_width // grid_cols
segment_height = frame_height // grid_rows
# Create combined display frame
combined_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
# Place each segment in the grid
for i, segment_frame in enumerate(self.segment_frames):
if segment_frame is None:
continue
row = i // grid_cols
col = i % grid_cols
# Resize segment frame to fit grid cell while maintaining aspect ratio
frame_height, frame_width = segment_frame.shape[:2]
seg_scale_x = segment_width / frame_width
seg_scale_y = segment_height / frame_height
seg_scale = min(seg_scale_x, seg_scale_y)
new_seg_width = int(frame_width * seg_scale)
new_seg_height = int(frame_height * seg_scale)
resized_segment = cv2.resize(segment_frame, (new_seg_width, new_seg_height), interpolation=cv2.INTER_AREA)
# Center the resized segment in the grid cell
y_offset = (segment_height - new_seg_height) // 2
x_offset = (segment_width - new_seg_width) // 2
# Calculate position in combined frame
y_start = row * segment_height
y_end = y_start + segment_height
x_start = col * segment_width
x_end = x_start + segment_width
# Place segment in combined frame (centered)
y_place_start = y_start + y_offset
y_place_end = y_place_start + new_seg_height
x_place_start = x_start + x_offset
x_place_end = x_place_start + new_seg_width
# Ensure we don't go out of bounds
y_place_end = min(y_place_end, y_end)
x_place_end = min(x_place_end, x_end)
combined_frame[y_place_start:y_place_end, x_place_start:x_place_end] = resized_segment
# Add segment label
segment_position = int((self.segment_positions[i] / self.total_frames) * 100)
label_text = f"Seg {i+1}: {segment_position}%"
cv2.putText(
combined_frame,
label_text,
(x_place_start + 5, y_place_start + 20),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(255, 255, 255),
2,
)
cv2.putText(
combined_frame,
label_text,
(x_place_start + 5, y_place_start + 20),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 0, 0),
1,
)
# Draw grid borders
cv2.rectangle(combined_frame, (x_start, y_start), (x_end-1, y_end-1), (128, 128, 128), 1)
# Add overall info overlay
current_file = self.media_files[self.current_index]
info_text = f"MULTI-SEGMENT | Speed: {self.playback_speed:.1f}x | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}"
cv2.putText(
combined_frame,
info_text,
(10, frame_height - 20),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 255, 255),
2,
)
cv2.putText(
combined_frame,
info_text,
(10, frame_height - 20),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 0, 0),
1
)
# Draw multi-segment timeline
self.draw_multi_segment_timeline(combined_frame)
# Display with proper aspect ratio
self.display_with_aspect_ratio(combined_frame)
def draw_multi_segment_timeline(self, frame):
"""Draw timeline showing all segment positions"""
if not self.is_video(self.media_files[self.current_index]) or not self.segment_caps or not self.timeline_visible:
return
height, width = frame.shape[:2]
# Timeline area - smaller than normal timeline
timeline_height = 30
timeline_y = height - timeline_height - 25 # Leave space for info text
timeline_margin = 20
timeline_bar_height = 8
# Draw timeline background
cv2.rectangle(frame, (0, timeline_y), (width, timeline_y + timeline_height), (40, 40, 40), -1)
# Calculate timeline bar position
bar_y = timeline_y + (timeline_height - timeline_bar_height) // 2
bar_x_start = timeline_margin
bar_x_end = width - timeline_margin
bar_width = bar_x_end - bar_x_start
# Draw timeline background bar
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + timeline_bar_height), (80, 80, 80), -1)
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + timeline_bar_height), (200, 200, 200), 1)
# Draw segment markers
if self.total_frames > 0:
for i, segment_pos in enumerate(self.segment_positions):
# Calculate position on timeline
progress = segment_pos / max(1, self.total_frames - 1)
marker_x = bar_x_start + int(bar_width * progress)
# Draw segment marker
color = (0, 255, 100) if i < len(self.segment_caps) and self.segment_caps[i] else (100, 100, 100)
cv2.circle(frame, (marker_x, bar_y + timeline_bar_height // 2), 4, color, -1)
cv2.circle(frame, (marker_x, bar_y + timeline_bar_height // 2), 4, (255, 255, 255), 1)
# Add segment number
cv2.putText(frame, str(i+1), (marker_x - 3, bar_y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 255), 1)
def draw_timeline(self, frame):
"""Draw timeline at the bottom of the frame"""
# Only draw timeline for video files in single mode and when visible
if not self.is_video(self.media_files[self.current_index]) or self.multi_segment_mode or not self.timeline_visible:
return
height, width = frame.shape[:2]
# Timeline background area
timeline_y = height - self.TIMELINE_HEIGHT
cv2.rectangle(frame, (0, timeline_y), (width, height), (40, 40, 40), -1)
# Calculate timeline bar position
bar_y = timeline_y + (self.TIMELINE_HEIGHT - self.TIMELINE_BAR_HEIGHT) // 2
bar_x_start = self.TIMELINE_MARGIN
bar_x_end = width - self.TIMELINE_MARGIN
bar_width = bar_x_end - bar_x_start
self.timeline_rect = (bar_x_start, bar_y, bar_width, self.TIMELINE_BAR_HEIGHT)
# Draw timeline background
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BG, -1)
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BORDER, 1)
# Draw progress for videos
if self.total_frames > 0:
progress = self.current_frame / max(1, self.total_frames - 1)
progress_width = int(bar_width * progress)
if progress_width > 0:
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_start + progress_width, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_PROGRESS, -1)
# Draw handle
handle_x = bar_x_start + progress_width
handle_y = bar_y + self.TIMELINE_BAR_HEIGHT // 2
cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_HANDLE, -1)
cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_BORDER, 2)
def mouse_callback(self, event, x, y, _, __):
"""Handle mouse events for timeline interaction"""
if not self.timeline_rect or not self.is_video(self.media_files[self.current_index]) or self.multi_segment_mode:
return
bar_x_start, bar_y, bar_width, bar_height = self.timeline_rect
bar_x_end = bar_x_start + bar_width
# Check if mouse is over timeline
if bar_y <= y <= bar_y + bar_height + 10: # Add some extra height for easier clicking
if event == cv2.EVENT_LBUTTONDOWN:
if bar_x_start <= x <= bar_x_end:
self.mouse_dragging = True
self.seek_to_position(x, bar_x_start, bar_width)
elif event == cv2.EVENT_MOUSEMOVE and self.mouse_dragging:
if bar_x_start <= x <= bar_x_end:
self.seek_to_position(x, bar_x_start, bar_width)
elif event == cv2.EVENT_LBUTTONUP:
self.mouse_dragging = False
def seek_to_position(self, mouse_x, bar_x_start, bar_width):
"""Seek to position based on mouse click/drag on timeline"""
if not self.current_cap or not self.is_video(self.media_files[self.current_index]):
return
# Calculate position ratio
relative_x = mouse_x - bar_x_start
position_ratio = max(0, min(1, relative_x / bar_width))
# Calculate target frame
target_frame = int(position_ratio * (self.total_frames - 1))
target_frame = max(0, min(target_frame, self.total_frames - 1))
# Seek to target frame
self.current_frame = target_frame
self.load_current_frame()
def advance_frame(self):
"""Advance to next frame - handles playback speed and marker looping"""
if not self.is_playing:
return True
if self.multi_segment_mode:
self.update_segment_frames()
return True
else:
# Always advance by 1 frame - speed is controlled by delay timing
new_frame = self.current_frame + 1
# Handle looping bounds
if new_frame >= self.total_frames:
# Loop to beginning
new_frame = 0
# Update current frame and load it
self.current_frame = new_frame
self.update_watch_tracking()
return self.load_current_frame()
def seek_video(self, frames_delta: int):
"""Seek video by specified number of frames"""
if not self.is_video(self.media_files[self.current_index]):
return
if self.multi_segment_mode:
return
if not self.current_cap:
return
target_frame = max(
0, min(self.current_frame + frames_delta, self.total_frames - 1)
)
self.current_frame = target_frame
self.load_current_frame()
def process_seek_key(self, key: int) -> bool:
"""Process seeking keys with proper rate limiting"""
current_time = time.time()
seek_direction = 0
seek_amount = 0
seek_multiplier = 1 # Default multiplier
# Check for A/D keys with modifiers
if key == ord("a") or key == ord("A"):
seek_direction = -1
# SHIFT+A gives uppercase A
if key == ord("A"):
seek_multiplier = self.SHIFT_SEEK_MULTIPLIER
elif key == ord("d") or key == ord("D"):
seek_direction = 1
# SHIFT+D gives uppercase D
if key == ord("D"):
seek_multiplier = self.SHIFT_SEEK_MULTIPLIER
elif key == 1: # CTRL+A
seek_direction = -1
seek_multiplier = self.CTRL_SEEK_MULTIPLIER
elif key == 4: # CTRL+D
seek_direction = 1
seek_multiplier = self.CTRL_SEEK_MULTIPLIER
elif key == ord(","):
seek_amount = -self.fine_seek_frames
elif key == ord("."):
seek_amount = self.fine_seek_frames
else:
if self.current_seek_key is not None:
self.current_seek_key = None
self.is_seeking = False
return False
# Handle fine seeking (comma/period) - always immediate
if seek_amount != 0:
self.seek_video(seek_amount)
return True
# Handle A/D key seeking with rate limiting and modifiers
if seek_direction != 0:
if self.current_seek_key != key:
self.current_seek_key = key
self.key_first_press_time = current_time
self.last_seek_time = current_time
self.is_seeking = True
seek_amount = seek_direction * self.coarse_seek_frames * seek_multiplier
self.seek_video(seek_amount)
return True
elif self.is_seeking:
time_since_last_seek = current_time - self.last_seek_time
time_held = current_time - self.key_first_press_time
if time_since_last_seek >= self.KEY_REPEAT_RATE_SEC:
self.last_seek_time = current_time
if time_held > self.FAST_SEEK_ACTIVATION_TIME:
seek_amount = seek_direction * self.fast_seek_frames * seek_multiplier
else:
seek_amount = seek_direction * self.coarse_seek_frames * seek_multiplier
self.seek_video(seek_amount)
return True
return False
def grade_media(self, grade: int):
"""Move current media file to grade directory"""
if not self.media_files or grade < 1 or grade > 5:
return
current_file = self.media_files[self.current_index]
grade_dir = self.directory / str(grade)
# Create grade directory if it doesn't exist
grade_dir.mkdir(exist_ok=True)
destination = grade_dir / current_file.name
counter = 1
while destination.exists():
stem = current_file.stem
suffix = current_file.suffix
destination = grade_dir / f"{stem}_{counter}{suffix}"
counter += 1
# Track this move for undo functionality BEFORE making changes
self.undo_history.append((str(destination), str(current_file), self.current_index))
# Release video capture to unlock the file before moving
if self.current_cap:
self.current_cap.release()
self.current_cap = None
# Also release segment captures if in multi-segment mode
if self.multi_segment_mode:
self.cleanup_segment_captures()
try:
shutil.move(str(current_file), str(destination))
print(f"Moved {current_file.name} to grade {grade}")
self.media_files.pop(self.current_index)
if self.current_index >= len(self.media_files):
self.current_index = 0
if not self.media_files:
print("No more media files to grade!")
return False
except Exception as e:
print(f"Error moving file: {e}")
# Remove the undo entry since the move failed
self.undo_history.pop()
return True
def undo_last_action(self):
"""Undo the last grading action by moving file back and restoring to media list"""
if not self.undo_history:
print("No actions to undo!")
return False
# Get the last action
moved_file_path, original_file_path, original_index = self.undo_history.pop()
# Release video capture to unlock any current file before moving
if self.current_cap:
self.current_cap.release()
self.current_cap = None
try:
# Move the file back to its original location
shutil.move(moved_file_path, original_file_path)
# Add the file back to the media list at its original position
original_file = Path(original_file_path)
# Insert the file back at the appropriate position
if original_index <= len(self.media_files):
self.media_files.insert(original_index, original_file)
else:
self.media_files.append(original_file)
# Navigate to the restored file
print("Navigating to: ", original_index)
self.current_index = original_index
print(f"Undone: Moved {original_file.name} back from grade folder")
return True
except Exception as e:
print(f"Error undoing action: {e}")
# If undo failed, put the action back in history
self.undo_history.append((moved_file_path, original_file_path, original_index))
return False
def run(self):
"""Main grading loop"""
self.media_files = self.find_media_files()
if not self.media_files:
print("No media files found in directory!")
return
print(f"Found {len(self.media_files)} media files")
print("Controls:")
print(" Space: Pause/Play")
print(" A/D: Seek backward/forward (hold for FAST seek)")
print(" Shift+A/D: Seek backward/forward (5x multiplier)")
print(" Ctrl+A/D: Seek backward/forward (10x multiplier)")
print(" , / . : Frame-by-frame seek (fine control)")
print(" W/S: Decrease/Increase playback speed")
print(" G: Toggle multi-segment mode (videos only)")
print(" 1-5: Grade and move file")
print(" N: Next file")
print(" P: Previous file")
print(" U: Undo last grading action")
print(" L: Sample video at key points (videos only)")
print(" H: Toggle timeline visibility")
print(" J: Bisect backwards from current position (videos only, disabled in multi-segment)")
print(" K: Bisect forwards toward next sample (videos only, disabled in multi-segment)")
print(" Q/ESC: Quit")
cv2.namedWindow("Media Grader", cv2.WINDOW_NORMAL)
cv2.setMouseCallback("Media Grader", self.mouse_callback)
# Set initial window size to a reasonable default (will be resized on first frame)
cv2.resizeWindow("Media Grader", 1280, 720)
while self.media_files and self.current_index < len(self.media_files):
current_file = self.media_files[self.current_index]
if not self.load_media(current_file):
print(f"Could not load {current_file}")
self.current_index += 1
continue
# Setup multi-segment mode if enabled and this is a video
if self.multi_segment_mode and self.is_video(current_file):
self.setup_segment_captures()
window_title = f"Media Grader - {current_file.name} ({self.current_index + 1}/{len(self.media_files)})"
cv2.setWindowTitle("Media Grader", window_title)
while True:
# Update display
self.display_current_frame()
# Calculate appropriate delay based on playback state
if self.is_playing and self.is_video(current_file):
# Use calculated frame delay for proper playback speed
delay_ms = self.calculate_frame_delay()
else:
# Use minimal delay for immediate responsiveness when not playing
delay_ms = 1
# Auto advance frame when playing (videos only)
if self.is_playing and self.is_video(current_file):
self.advance_frame()
# Key capture with appropriate delay
key = cv2.waitKey(delay_ms) & 0xFF
if key == ord("q") or key == 27:
return
elif key == ord(" "):
self.is_playing = not self.is_playing
elif key == ord("s"):
# Speed control only for videos
if self.is_video(current_file):
self.playback_speed = max(
self.MIN_PLAYBACK_SPEED,
self.playback_speed - self.SPEED_INCREMENT,
)
elif key == ord("w"):
# Speed control only for videos
if self.is_video(current_file):
self.playback_speed = min(
self.MAX_PLAYBACK_SPEED,
self.playback_speed + self.SPEED_INCREMENT,
)
elif self.process_seek_key(key):
continue
elif key == ord("n"):
break
elif key == ord("p"):
self.current_index = max(0, self.current_index - 1)
print("Navigating to: ", self.current_index)
break
elif key == ord("u"):
if self.undo_last_action():
# File was restored, reload it
break
elif key == ord("l"):
# Jump to largest unwatched region (works in both modes)
self.jump_to_unwatched_region()
elif key == ord("j"):
if not self.multi_segment_mode:
self.bisect_backwards()
else:
print("Navigation keys (H/J/K/L) disabled in multi-segment mode")
elif key == ord("k"):
if not self.multi_segment_mode:
self.bisect_forwards()
else:
print("Navigation keys (H/J/K/L) disabled in multi-segment mode")
elif key == ord("h"): # Toggle timeline visibility
self.toggle_timeline()
elif key == ord("g"):
self.toggle_multi_segment_mode()
elif key in [ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]:
grade = int(chr(key))
if not self.grade_media(grade):
return
break
elif key == 255:
if self.is_seeking and self.current_seek_key is not None:
self.process_seek_key(self.current_seek_key)
if key not in [ord("p"), ord("u"), ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]:
print("Navigating to (pu12345): ", self.current_index)
self.current_index += 1
if self.current_cap:
self.current_cap.release()
self.cleanup_segment_captures()
# Cleanup thread pool
self.thread_pool.shutdown(wait=True)
cv2.destroyAllWindows()
print("Grading session complete!")
def main():
parser = argparse.ArgumentParser(
description="Media Grader - Grade media files by moving them to numbered folders"
)
parser.add_argument(
"directory",
nargs="?",
default=".",
help="Directory to scan for media files (default: current directory)",
)
parser.add_argument(
"--seek-frames",
type=int,
default=30,
help="Number of frames to seek when using arrow keys (default: 30)",
)
parser.add_argument(
"--snap-to-iframe",
action="store_true",
help="Snap to I-frames when seeking backward for better performance",
)
args = parser.parse_args()
if not os.path.isdir(args.directory):
print(f"Error: {args.directory} is not a valid directory")
sys.exit(1)
grader = MediaGrader(args.directory, args.seek_frames, args.snap_to_iframe)
try:
grader.run()
except KeyboardInterrupt:
print("\nGrading session interrupted")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()