Files
py-media-grader/main.py

1299 lines
51 KiB
Python

import os
import sys
import glob
import cv2
import numpy as np
import argparse
import shutil
import time
import threading
import subprocess
import json
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import List
class MediaGrader:
BASE_FRAME_DELAY_MS = 16
KEY_REPEAT_RATE_SEC = 0.5
FAST_SEEK_ACTIVATION_TIME = 2.0
FRAME_RENDER_TIME_MS = 50
SPEED_INCREMENT = 0.2
MIN_PLAYBACK_SPEED = 0.1
MAX_PLAYBACK_SPEED = 100.0
FAST_SEEK_MULTIPLIER = 60
IMAGE_DISPLAY_DELAY_MS = 100
MONITOR_WIDTH = 2560
MONITOR_HEIGHT = 1440
TIMELINE_HEIGHT = 60
TIMELINE_MARGIN = 20
TIMELINE_BAR_HEIGHT = 12
TIMELINE_HANDLE_SIZE = 12
TIMELINE_COLOR_BG = (80, 80, 80)
TIMELINE_COLOR_PROGRESS = (0, 120, 255)
TIMELINE_COLOR_HANDLE = (255, 255, 255)
TIMELINE_COLOR_BORDER = (200, 200, 200)
SHIFT_SEEK_MULTIPLIER = 5
CTRL_SEEK_MULTIPLIER = 10
SEGMENT_COUNT = 16
def __init__(
self, directory: str, seek_frames: int = 30, snap_to_iframe: bool = False
):
self.directory = Path(directory)
self.seek_frames = seek_frames
self.current_index = 0
self.playback_speed = 1.0
self.media_files = []
self.current_cap = None
self.is_playing = True
self.current_frame = 0
self.total_frames = 0
self.multi_segment_mode = False
self.segment_count = self.SEGMENT_COUNT
self.segment_caps = []
self.segment_frames = []
self.segment_positions = []
self.segment_end_positions = [] # Track where each segment should loop back to
self.timeline_visible = True
self.last_seek_time = 0
self.current_seek_key = None
self.key_first_press_time = 0
self.is_seeking = False
self.fine_seek_frames = 1
self.coarse_seek_frames = self.seek_frames
self.fast_seek_frames = self.seek_frames * self.FAST_SEEK_MULTIPLIER
self.current_display_frame = None
self.extensions = [
".png",
".jpg",
".jpeg",
".gif",
".mp4",
".avi",
".mov",
".mkv",
]
self.mouse_dragging = False
self.timeline_rect = None
self.undo_history = []
self.watched_regions = {}
self.current_watch_start = None
self.last_frame_position = 0
self.last_jump_position = {}
self.jump_history = {}
self.thread_pool = ThreadPoolExecutor(max_workers=4)
def display_with_aspect_ratio(self, frame):
"""Display frame while maintaining aspect ratio and maximizing screen usage"""
if frame is None:
return
# Get frame dimensions
frame_height, frame_width = frame.shape[:2]
# Calculate aspect ratio
frame_aspect_ratio = frame_width / frame_height
monitor_aspect_ratio = self.MONITOR_WIDTH / self.MONITOR_HEIGHT
# Determine if frame is vertical or horizontal relative to monitor
if frame_aspect_ratio < monitor_aspect_ratio:
# Frame is more vertical than monitor - maximize height
display_height = self.MONITOR_HEIGHT
display_width = int(display_height * frame_aspect_ratio)
else:
# Frame is more horizontal than monitor - maximize width
display_width = self.MONITOR_WIDTH
display_height = int(display_width / frame_aspect_ratio)
# Resize window to calculated dimensions
cv2.resizeWindow("Media Grader", display_width, display_height)
# Center the window on screen
x_position = (self.MONITOR_WIDTH - display_width) // 2
y_position = (self.MONITOR_HEIGHT - display_height) // 2
cv2.moveWindow("Media Grader", x_position, y_position)
# Display the frame
cv2.imshow("Media Grader", frame)
def find_media_files(self) -> List[Path]:
"""Find all media files recursively in the directory"""
media_files = []
for ext in self.extensions:
pattern = str(self.directory / "**" / f"*{ext}")
files = glob.glob(pattern, recursive=True)
media_files.extend([Path(f) for f in files])
# Filter out files already in grade directories
filtered_files = []
for file in media_files:
# Check if file is not in a grade directory (1-5)
if not any(part in ["1", "2", "3", "4", "5"] for part in file.parts):
print("Adding file: ", file)
filtered_files.append(file)
return sorted(filtered_files)
def is_video(self, file_path: Path) -> bool:
"""Check if file is a video"""
video_extensions = [".mp4", ".avi", ".mov", ".mkv", ".gif"]
return file_path.suffix.lower() in video_extensions
def calculate_frame_delay(self) -> int:
"""Calculate frame delay in milliseconds based on playback speed"""
delay_ms = int(self.BASE_FRAME_DELAY_MS / self.playback_speed)
return max(1, delay_ms)
def calculate_frames_to_skip(self) -> int:
"""Calculate how many frames to skip for high-speed playback"""
if self.playback_speed <= 1.0:
return 0
elif self.playback_speed <= 2.0:
return 0
elif self.playback_speed <= 5.0:
return int(self.playback_speed - 1)
else:
return int(self.playback_speed * 2)
def load_media(self, file_path: Path) -> bool:
"""Load media file for display"""
if self.current_cap:
self.current_cap.release()
if self.is_video(file_path):
# Try different backends for better performance
# For video files: FFmpeg is usually best, DirectShow is for cameras
backends_to_try = []
if hasattr(cv2, 'CAP_FFMPEG'): # FFmpeg - best for video files
backends_to_try.append(cv2.CAP_FFMPEG)
if hasattr(cv2, 'CAP_DSHOW'): # DirectShow - usually for cameras, but try as fallback
backends_to_try.append(cv2.CAP_DSHOW)
backends_to_try.append(cv2.CAP_ANY) # Final fallback
self.current_cap = None
for backend in backends_to_try:
try:
self.current_cap = cv2.VideoCapture(str(file_path), backend)
if self.current_cap.isOpened():
# Optimize buffer settings for better performance
self.current_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Minimize buffer to reduce latency
# Try to set hardware acceleration if available
if hasattr(cv2, 'CAP_PROP_HW_ACCELERATION'):
self.current_cap.set(cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY)
break
self.current_cap.release()
except:
continue
if not self.current_cap or not self.current_cap.isOpened():
print(f"Warning: Could not open video file {file_path.name} (unsupported codec)")
return False
self.total_frames = int(self.current_cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.current_frame = 0
# Get codec information for debugging
fourcc = int(self.current_cap.get(cv2.CAP_PROP_FOURCC))
codec = "".join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)])
backend = self.current_cap.getBackendName()
print(f"Loaded: {file_path.name} | Codec: {codec} | Backend: {backend} | Frames: {self.total_frames}")
else:
self.current_cap = None
self.total_frames = 1
self.current_frame = 0
# Load initial frame
self.load_current_frame()
# Start watch tracking session for videos
self.start_watch_session()
return True
def load_current_frame(self):
"""Load the current frame into display cache"""
if self.is_video(self.media_files[self.current_index]):
if not self.current_cap:
return False
ret, frame = self.current_cap.read()
if ret:
self.current_display_frame = frame
self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES))
return True
return False
else:
frame = cv2.imread(str(self.media_files[self.current_index]))
if frame is not None:
self.current_display_frame = frame
return True
return False
def start_watch_session(self):
"""Start tracking a new viewing session"""
if self.is_video(self.media_files[self.current_index]):
self.current_watch_start = self.current_frame
self.last_frame_position = self.current_frame
def update_watch_tracking(self):
"""Update watch tracking based on current frame position"""
if not self.is_video(self.media_files[self.current_index]) or self.current_watch_start is None:
return
current_file = str(self.media_files[self.current_index])
# If we've moved more than a few frames from last position, record the watched region
if abs(self.current_frame - self.last_frame_position) > 5 or \
abs(self.current_frame - self.current_watch_start) > 30:
# Record the region we just watched
start_frame = min(self.current_watch_start, self.last_frame_position)
end_frame = max(self.current_watch_start, self.last_frame_position)
if current_file not in self.watched_regions:
self.watched_regions[current_file] = []
# Merge with existing regions if they overlap
self.add_watched_region(current_file, start_frame, end_frame)
# Start new session from current position
self.current_watch_start = self.current_frame
self.last_frame_position = self.current_frame
def add_watched_region(self, file_path, start_frame, end_frame):
"""Add a watched region, merging with existing overlapping regions"""
if file_path not in self.watched_regions:
self.watched_regions[file_path] = []
regions = self.watched_regions[file_path]
new_region = [start_frame, end_frame]
# Merge overlapping regions
merged = []
for region in regions:
if new_region[1] < region[0] or new_region[0] > region[1]:
# No overlap
merged.append(region)
else:
# Overlap, merge
new_region[0] = min(new_region[0], region[0])
new_region[1] = max(new_region[1], region[1])
merged.append(tuple(new_region))
self.watched_regions[file_path] = merged
def get_sample_points(self):
"""Get standardized sample points for video navigation"""
segments = 8 # Divide video into 8 segments for sampling
segment_size = self.total_frames // segments
if segment_size == 0:
return []
return [
segment_size * 2, # 1/4 through
segment_size * 4, # 1/2 through
segment_size * 6, # 3/4 through
segment_size * 1, # 1/8 through
segment_size * 3, # 3/8 through
segment_size * 5, # 5/8 through
segment_size * 7, # 7/8 through
0 # Beginning
]
def jump_to_unwatched_region(self):
"""Jump to the next unwatched region of the video"""
if not self.is_video(self.media_files[self.current_index]):
return False
current_file = str(self.media_files[self.current_index])
# Get or initialize jump counter for this file
if not hasattr(self, 'jump_counters'):
self.jump_counters = {}
if current_file not in self.jump_counters:
self.jump_counters[current_file] = 0
# Get standardized sample points
sample_points = self.get_sample_points()
if not sample_points:
print("Video too short for sampling")
return False
current_jump = self.jump_counters[current_file]
if current_jump >= len(sample_points):
print("All sample points visited! Video fully sampled.")
return False
target_frame = sample_points[current_jump]
target_frame = min(target_frame, self.total_frames - 1)
# Track last position for bisection
self.last_jump_position[current_file] = self.current_frame
# Track jump history for H key undo
if current_file not in self.jump_history:
self.jump_history[current_file] = []
self.jump_history[current_file].append(self.current_frame)
# Jump to the target frame
if not self.multi_segment_mode:
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
self.load_current_frame()
# Increment jump counter
self.jump_counters[current_file] += 1
# Calculate percentage through video
percentage = (target_frame / self.total_frames) * 100
print(f"Sample {current_jump + 1}/{len(sample_points)}: jumped to frame {target_frame} ({percentage:.1f}% through video)")
return True
def bisect_backwards(self):
"""Bisect backwards between last position and current position"""
if not self.is_video(self.media_files[self.current_index]):
return False
current_file = str(self.media_files[self.current_index])
if current_file not in self.last_jump_position:
print("No previous position to bisect from. Use L first to establish a reference point.")
return False
last_pos = self.last_jump_position[current_file]
current_pos = self.current_frame
if last_pos == current_pos:
print("Already at the same position as last jump.")
return False
# Calculate midpoint
if last_pos < current_pos:
midpoint = (last_pos + current_pos) // 2
else:
midpoint = (current_pos + last_pos) // 2
# Update last position for further bisection
self.last_jump_position[current_file] = current_pos
# Jump to midpoint
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, midpoint)
self.load_current_frame()
percentage = (midpoint / self.total_frames) * 100
print(f"Bisected backwards to frame {midpoint} ({percentage:.1f}% through video)")
return True
def bisect_forwards(self):
"""Bisect forwards between current position and next sample point"""
if not self.is_video(self.media_files[self.current_index]):
return False
current_file = str(self.media_files[self.current_index])
# Get next sample point
if not hasattr(self, 'jump_counters') or current_file not in self.jump_counters:
print("No sampling started yet. Use L first to establish sample points.")
return False
# Use same sampling strategy as L key
sample_points = self.get_sample_points()
current_jump = self.jump_counters[current_file]
if current_jump >= len(sample_points):
print("All sample points visited. No forward reference point.")
return False
next_sample = sample_points[current_jump]
next_sample = min(next_sample, self.total_frames - 1)
current_pos = self.current_frame
# Calculate midpoint between current and next sample
midpoint = (current_pos + next_sample) // 2
if midpoint == current_pos:
print("Already at or very close to next sample point.")
return False
# Update last position for further bisection
self.last_jump_position[current_file] = current_pos
# Jump to midpoint
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, midpoint)
self.load_current_frame()
percentage = (midpoint / self.total_frames) * 100
print(f"Bisected forwards to frame {midpoint} ({percentage:.1f}% through video)")
return True
def toggle_multi_segment_mode(self):
"""Toggle between single and multi-segment video mode"""
if not self.is_video(self.media_files[self.current_index]):
print("Multi-segment mode only works with videos")
return False
self.multi_segment_mode = not self.multi_segment_mode
if self.multi_segment_mode:
print(f"Enabling multi-segment mode ({self.segment_count} segments)...")
try:
self.setup_segment_captures()
print("Multi-segment mode enabled successfully")
except Exception as e:
print(f"Failed to setup multi-segment mode: {e}")
import traceback
traceback.print_exc()
self.multi_segment_mode = False
return False
else:
print("Disabling multi-segment mode...")
self.cleanup_segment_captures()
# Reload single video
self.load_media(self.media_files[self.current_index])
print("Multi-segment mode disabled")
return True
def toggle_timeline(self):
"""Toggle timeline visibility"""
self.timeline_visible = not self.timeline_visible
print(f"Timeline {'visible' if self.timeline_visible else 'hidden'}")
return True
def setup_segment_captures(self):
if not self.is_video(self.media_files[self.current_index]):
return
# Calculate actual memory usage based on frame dimensions
frame_width = int(self.current_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(self.current_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_mb = frame_width * frame_height * 3 / (1024 * 1024)
# Memory-based limits (not frame count)
if total_mb > 8000: # 8GB limit
print(f"Video too large for preloading!")
print(f" Resolution: {frame_width}x{frame_height}")
print(f" Frames: {self.total_frames} frames would use {total_mb:.1f}GB RAM")
print(f"Multi-segment mode not available for videos requiring >8GB RAM")
return
elif total_mb > 500: # 500MB warning
print(f"Large video detected:")
print(f" Resolution: {frame_width}x{frame_height}")
print(f" Memory: {self.total_frames} frames will use {total_mb:.0f}GB RAM")
print("Press any key to continue or 'q' to cancel...")
# Note: In a real implementation, you'd want proper input handling here
start_time = time.time()
print(f"Setting up {self.segment_count} segments with video preloading...")
try:
print("Cleaning up existing captures...")
self.cleanup_segment_captures()
current_file = self.media_files[self.current_index]
print(f"Working with file: {current_file}")
# Initialize arrays
print("Initializing arrays...")
self.segment_caps = [None] * self.segment_count # Keep for compatibility
self.segment_frames = [None] * self.segment_count
self.segment_positions = []
self.segment_end_positions = []
self.segment_current_frames = [0] * self.segment_count # Track current frame for each segment
# Calculate target positions
print("Calculating segment positions...")
if self.total_frames <= 1:
print("Error: Video has insufficient frames for multi-segment mode")
return
for i in range(self.segment_count):
if self.segment_count <= 1:
position_ratio = 0
end_ratio = 1.0
else:
position_ratio = i / (self.segment_count - 1)
end_ratio = (i + 1) / (self.segment_count - 1) if i < self.segment_count - 1 else 1.0
start_frame = int(position_ratio * (self.total_frames - 1))
end_frame = int(end_ratio * (self.total_frames - 1))
start_frame = max(0, min(start_frame, self.total_frames - 1))
end_frame = max(start_frame + 1, min(end_frame, self.total_frames - 1)) # Ensure at least 1 frame per segment
self.segment_positions.append(start_frame)
self.segment_end_positions.append(end_frame)
self.segment_current_frames[i] = start_frame # Start each segment at its position
print(f"Segment positions: {self.segment_positions}")
print(f"Segment end positions: {self.segment_end_positions}")
# Preload the entire video into memory - simple and fast
print("Preloading entire video into memory...")
preload_start = time.time()
if self.current_cap and self.current_cap.isOpened():
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
# Simple, fast sequential read
frames = []
frame_count = 0
while frame_count < self.total_frames:
ret, frame = self.current_cap.read()
if ret and frame is not None:
frames.append(frame)
frame_count += 1
else:
break
self.video_frame_cache = frames
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, self.current_frame)
else:
self.video_frame_cache = []
preload_time = (time.time() - preload_start) * 1000
print(f"Video preloading: {preload_time:.1f}ms ({len(self.video_frame_cache)} frames)")
# Initialize segment frames from the preloaded cache
print("Initializing segment frames...")
for i in range(self.segment_count):
if self.segment_current_frames[i] < len(self.video_frame_cache):
self.segment_frames[i] = self.video_frame_cache[self.segment_current_frames[i]]
except Exception as e:
print(f"Error in setup: {e}")
import traceback
traceback.print_exc()
return
total_time = time.time() - start_time
print(f"Total setup time: {total_time * 1000:.1f}ms")
# Report success
successful_segments = sum(1 for frame in self.segment_frames if frame is not None)
print(f"Successfully preloaded video with {successful_segments}/{self.segment_count} active segments")
def cleanup_segment_captures(self):
"""Clean up all segment video captures and preloaded cache"""
for cap in self.segment_caps:
if cap:
cap.release()
self.segment_caps = []
self.segment_frames = []
self.segment_positions = []
self.segment_end_positions = []
if hasattr(self, 'video_frame_cache'):
self.video_frame_cache = []
if hasattr(self, 'segment_current_frames'):
self.segment_current_frames = []
def update_segment_frames(self):
"""Update frames for segments - each segment loops within its own range"""
if not self.multi_segment_mode or not self.segment_frames or not hasattr(self, 'video_frame_cache'):
return
for i in range(len(self.segment_frames)):
if self.segment_frames[i] is not None and self.video_frame_cache:
# Advance to next frame in this segment
self.segment_current_frames[i] += 1
# Get the segment boundaries
start_frame = self.segment_positions[i]
end_frame = self.segment_end_positions[i]
# Loop within the segment bounds
if self.segment_current_frames[i] > end_frame:
# Loop back to start of segment
self.segment_current_frames[i] = start_frame
# Ensure we don't go beyond the video cache
if self.segment_current_frames[i] < len(self.video_frame_cache):
# Direct reference - no copy needed for display
self.segment_frames[i] = self.video_frame_cache[self.segment_current_frames[i]]
def display_current_frame(self):
"""Display the current cached frame with overlays"""
if self.multi_segment_mode:
self.display_multi_segment_frame()
else:
self.display_single_frame()
def display_single_frame(self):
"""Display single frame view"""
if self.current_display_frame is None:
return
frame = self.current_display_frame.copy()
# Add info overlay
current_file = self.media_files[self.current_index]
if self.is_video(self.media_files[self.current_index]):
info_text = f"Speed: {self.playback_speed:.1f}x | Frame: {self.current_frame}/{self.total_frames} | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}"
cv2.putText(
frame,
info_text,
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(255, 255, 255),
2,
)
cv2.putText(
frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 1
)
# Draw timeline
self.draw_timeline(frame)
# Maintain aspect ratio when displaying
self.display_with_aspect_ratio(frame)
def display_multi_segment_frame(self):
"""Display multi-segment frame view"""
if not self.segment_frames or not any(frame is not None for frame in self.segment_frames):
return
# Calculate grid dimensions (2x2 for 4 segments)
grid_rows = int(self.segment_count ** 0.5)
grid_cols = int(self.segment_count / grid_rows)
# Get reference frame size
ref_frame = next((f for f in self.segment_frames if f is not None), None)
if ref_frame is None:
return
frame_height, frame_width = ref_frame.shape[:2]
# Calculate segment display size
segment_width = frame_width // grid_cols
segment_height = frame_height // grid_rows
# Create combined display frame
combined_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
# Place each segment in the grid
for i, segment_frame in enumerate(self.segment_frames):
if segment_frame is None:
continue
row = i // grid_cols
col = i % grid_cols
# Resize segment frame to fit grid cell while maintaining aspect ratio
frame_height, frame_width = segment_frame.shape[:2]
seg_scale_x = segment_width / frame_width
seg_scale_y = segment_height / frame_height
seg_scale = min(seg_scale_x, seg_scale_y)
new_seg_width = int(frame_width * seg_scale)
new_seg_height = int(frame_height * seg_scale)
resized_segment = cv2.resize(segment_frame, (new_seg_width, new_seg_height), interpolation=cv2.INTER_AREA)
# Center the resized segment in the grid cell
y_offset = (segment_height - new_seg_height) // 2
x_offset = (segment_width - new_seg_width) // 2
# Calculate position in combined frame
y_start = row * segment_height
y_end = y_start + segment_height
x_start = col * segment_width
x_end = x_start + segment_width
# Place segment in combined frame (centered)
y_place_start = y_start + y_offset
y_place_end = y_place_start + new_seg_height
x_place_start = x_start + x_offset
x_place_end = x_place_start + new_seg_width
# Ensure we don't go out of bounds
y_place_end = min(y_place_end, y_end)
x_place_end = min(x_place_end, x_end)
combined_frame[y_place_start:y_place_end, x_place_start:x_place_end] = resized_segment
# Add segment label
segment_position = int((self.segment_positions[i] / self.total_frames) * 100)
label_text = f"Seg {i+1}: {segment_position}%"
cv2.putText(
combined_frame,
label_text,
(x_place_start + 5, y_place_start + 20),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(255, 255, 255),
2,
)
cv2.putText(
combined_frame,
label_text,
(x_place_start + 5, y_place_start + 20),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 0, 0),
1,
)
# Draw grid borders
cv2.rectangle(combined_frame, (x_start, y_start), (x_end-1, y_end-1), (128, 128, 128), 1)
# Add overall info overlay
current_file = self.media_files[self.current_index]
info_text = f"MULTI-SEGMENT | Speed: {self.playback_speed:.1f}x | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}"
cv2.putText(
combined_frame,
info_text,
(10, frame_height - 20),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 255, 255),
2,
)
cv2.putText(
combined_frame,
info_text,
(10, frame_height - 20),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 0, 0),
1
)
# Draw multi-segment timeline
self.draw_multi_segment_timeline(combined_frame)
# Maintain aspect ratio when displaying
self.display_with_aspect_ratio(combined_frame)
def draw_multi_segment_timeline(self, frame):
"""Draw timeline showing all segment positions"""
if not self.is_video(self.media_files[self.current_index]) or not self.segment_caps or not self.timeline_visible:
return
height, width = frame.shape[:2]
# Timeline area - smaller than normal timeline
timeline_height = 30
timeline_y = height - timeline_height - 25 # Leave space for info text
timeline_margin = 20
timeline_bar_height = 8
# Draw timeline background
cv2.rectangle(frame, (0, timeline_y), (width, timeline_y + timeline_height), (40, 40, 40), -1)
# Calculate timeline bar position
bar_y = timeline_y + (timeline_height - timeline_bar_height) // 2
bar_x_start = timeline_margin
bar_x_end = width - timeline_margin
bar_width = bar_x_end - bar_x_start
# Draw timeline background bar
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + timeline_bar_height), (80, 80, 80), -1)
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + timeline_bar_height), (200, 200, 200), 1)
# Draw segment markers
if self.total_frames > 0:
for i, segment_pos in enumerate(self.segment_positions):
# Calculate position on timeline
progress = segment_pos / max(1, self.total_frames - 1)
marker_x = bar_x_start + int(bar_width * progress)
# Draw segment marker
color = (0, 255, 100) if i < len(self.segment_caps) and self.segment_caps[i] else (100, 100, 100)
cv2.circle(frame, (marker_x, bar_y + timeline_bar_height // 2), 4, color, -1)
cv2.circle(frame, (marker_x, bar_y + timeline_bar_height // 2), 4, (255, 255, 255), 1)
# Add segment number
cv2.putText(frame, str(i+1), (marker_x - 3, bar_y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 255), 1)
def draw_timeline(self, frame):
"""Draw timeline at the bottom of the frame"""
# Only draw timeline for video files in single mode and when visible
if not self.is_video(self.media_files[self.current_index]) or self.multi_segment_mode or not self.timeline_visible:
return
height, width = frame.shape[:2]
# Timeline background area
timeline_y = height - self.TIMELINE_HEIGHT
cv2.rectangle(frame, (0, timeline_y), (width, height), (40, 40, 40), -1)
# Calculate timeline bar position
bar_y = timeline_y + (self.TIMELINE_HEIGHT - self.TIMELINE_BAR_HEIGHT) // 2
bar_x_start = self.TIMELINE_MARGIN
bar_x_end = width - self.TIMELINE_MARGIN
bar_width = bar_x_end - bar_x_start
self.timeline_rect = (bar_x_start, bar_y, bar_width, self.TIMELINE_BAR_HEIGHT)
# Draw timeline background
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BG, -1)
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BORDER, 1)
# Draw progress for videos
if self.total_frames > 0:
progress = self.current_frame / max(1, self.total_frames - 1)
progress_width = int(bar_width * progress)
if progress_width > 0:
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_start + progress_width, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_PROGRESS, -1)
# Draw handle
handle_x = bar_x_start + progress_width
handle_y = bar_y + self.TIMELINE_BAR_HEIGHT // 2
cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_HANDLE, -1)
cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_BORDER, 2)
def mouse_callback(self, event, x, y, _, __):
"""Handle mouse events for timeline interaction"""
if not self.timeline_rect or not self.is_video(self.media_files[self.current_index]) or self.multi_segment_mode:
return
bar_x_start, bar_y, bar_width, bar_height = self.timeline_rect
bar_x_end = bar_x_start + bar_width
# Check if mouse is over timeline
if bar_y <= y <= bar_y + bar_height + 10: # Add some extra height for easier clicking
if event == cv2.EVENT_LBUTTONDOWN:
if bar_x_start <= x <= bar_x_end:
self.mouse_dragging = True
self.seek_to_position(x, bar_x_start, bar_width)
elif event == cv2.EVENT_MOUSEMOVE and self.mouse_dragging:
if bar_x_start <= x <= bar_x_end:
self.seek_to_position(x, bar_x_start, bar_width)
elif event == cv2.EVENT_LBUTTONUP:
self.mouse_dragging = False
def seek_to_position(self, mouse_x, bar_x_start, bar_width):
"""Seek to position based on mouse click/drag on timeline"""
if not self.current_cap or not self.is_video(self.media_files[self.current_index]):
return
# Calculate position ratio
relative_x = mouse_x - bar_x_start
position_ratio = max(0, min(1, relative_x / bar_width))
# Calculate target frame
target_frame = int(position_ratio * (self.total_frames - 1))
target_frame = max(0, min(target_frame, self.total_frames - 1))
# Seek to target frame
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
self.load_current_frame()
def advance_frame(self):
"""Advance to next frame(s) based on playback speed"""
if (
not self.is_video(self.media_files[self.current_index])
or not self.is_playing
):
return
if self.multi_segment_mode:
self.update_segment_frames()
return True
else:
frames_to_skip = self.calculate_frames_to_skip()
for _ in range(frames_to_skip + 1):
ret, frame = self.current_cap.read()
if not ret:
actual_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES))
if actual_frame < self.total_frames - 5:
print(f"Frame count mismatch! Reported: {self.total_frames}, Actual: {actual_frame}")
self.total_frames = actual_frame
return False
self.current_display_frame = frame
self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES))
self.update_watch_tracking()
return True
def seek_video(self, frames_delta: int):
"""Seek video by specified number of frames"""
if not self.is_video(self.media_files[self.current_index]):
return
if self.multi_segment_mode:
return
if not self.current_cap:
return
target_frame = max(
0, min(self.current_frame + frames_delta, self.total_frames - 1)
)
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
self.load_current_frame()
def process_seek_key(self, key: int) -> bool:
"""Process seeking keys with proper rate limiting"""
current_time = time.time()
seek_direction = 0
seek_amount = 0
seek_multiplier = 1 # Default multiplier
# Check for A/D keys with modifiers
if key == ord("a") or key == ord("A"):
seek_direction = -1
# SHIFT+A gives uppercase A
if key == ord("A"):
seek_multiplier = self.SHIFT_SEEK_MULTIPLIER
elif key == ord("d") or key == ord("D"):
seek_direction = 1
# SHIFT+D gives uppercase D
if key == ord("D"):
seek_multiplier = self.SHIFT_SEEK_MULTIPLIER
elif key == 1: # CTRL+A
seek_direction = -1
seek_multiplier = self.CTRL_SEEK_MULTIPLIER
elif key == 4: # CTRL+D
seek_direction = 1
seek_multiplier = self.CTRL_SEEK_MULTIPLIER
elif key == ord(","):
seek_amount = -self.fine_seek_frames
elif key == ord("."):
seek_amount = self.fine_seek_frames
else:
if self.current_seek_key is not None:
self.current_seek_key = None
self.is_seeking = False
return False
# Handle fine seeking (comma/period) - always immediate
if seek_amount != 0:
self.seek_video(seek_amount)
return True
# Handle A/D key seeking with rate limiting and modifiers
if seek_direction != 0:
if self.current_seek_key != key:
self.current_seek_key = key
self.key_first_press_time = current_time
self.last_seek_time = current_time
self.is_seeking = True
seek_amount = seek_direction * self.coarse_seek_frames * seek_multiplier
self.seek_video(seek_amount)
return True
elif self.is_seeking:
time_since_last_seek = current_time - self.last_seek_time
time_held = current_time - self.key_first_press_time
if time_since_last_seek >= self.KEY_REPEAT_RATE_SEC:
self.last_seek_time = current_time
if time_held > self.FAST_SEEK_ACTIVATION_TIME:
seek_amount = seek_direction * self.fast_seek_frames * seek_multiplier
else:
seek_amount = seek_direction * self.coarse_seek_frames * seek_multiplier
self.seek_video(seek_amount)
return True
return False
def grade_media(self, grade: int):
"""Move current media file to grade directory"""
if not self.media_files or grade < 1 or grade > 5:
return
current_file = self.media_files[self.current_index]
grade_dir = self.directory / str(grade)
# Create grade directory if it doesn't exist
grade_dir.mkdir(exist_ok=True)
destination = grade_dir / current_file.name
counter = 1
while destination.exists():
stem = current_file.stem
suffix = current_file.suffix
destination = grade_dir / f"{stem}_{counter}{suffix}"
counter += 1
# Track this move for undo functionality BEFORE making changes
self.undo_history.append((str(destination), str(current_file), self.current_index))
# Release video capture to unlock the file before moving
if self.current_cap:
self.current_cap.release()
self.current_cap = None
# Also release segment captures if in multi-segment mode
if self.multi_segment_mode:
self.cleanup_segment_captures()
try:
shutil.move(str(current_file), str(destination))
print(f"Moved {current_file.name} to grade {grade}")
self.media_files.pop(self.current_index)
if self.current_index >= len(self.media_files):
self.current_index = 0
if not self.media_files:
print("No more media files to grade!")
return False
except Exception as e:
print(f"Error moving file: {e}")
# Remove the undo entry since the move failed
self.undo_history.pop()
return True
def undo_last_action(self):
"""Undo the last grading action by moving file back and restoring to media list"""
if not self.undo_history:
print("No actions to undo!")
return False
# Get the last action
moved_file_path, original_file_path, original_index = self.undo_history.pop()
# Release video capture to unlock any current file before moving
if self.current_cap:
self.current_cap.release()
self.current_cap = None
try:
# Move the file back to its original location
shutil.move(moved_file_path, original_file_path)
# Add the file back to the media list at its original position
original_file = Path(original_file_path)
# Insert the file back at the appropriate position
if original_index <= len(self.media_files):
self.media_files.insert(original_index, original_file)
else:
self.media_files.append(original_file)
# Navigate to the restored file
print("Navigating to: ", original_index)
self.current_index = original_index
print(f"Undone: Moved {original_file.name} back from grade folder")
return True
except Exception as e:
print(f"Error undoing action: {e}")
# If undo failed, put the action back in history
self.undo_history.append((moved_file_path, original_file_path, original_index))
return False
def run(self):
"""Main grading loop"""
self.media_files = self.find_media_files()
if not self.media_files:
print("No media files found in directory!")
return
print(f"Found {len(self.media_files)} media files")
print("Controls:")
print(" Space: Pause/Play")
print(" A/D: Seek backward/forward (hold for FAST seek)")
print(" Shift+A/D: Seek backward/forward (5x multiplier)")
print(" Ctrl+A/D: Seek backward/forward (10x multiplier)")
print(" , / . : Frame-by-frame seek (fine control)")
print(" W/S: Decrease/Increase playback speed")
print(" G: Toggle multi-segment mode (videos only)")
print(" 1-5: Grade and move file")
print(" N: Next file")
print(" P: Previous file")
print(" U: Undo last grading action")
print(" L: Sample video at key points (videos only)")
print(" H: Toggle timeline visibility")
print(" J: Bisect backwards from current position (videos only, disabled in multi-segment)")
print(" K: Bisect forwards toward next sample (videos only, disabled in multi-segment)")
print(" Q/ESC: Quit")
cv2.namedWindow("Media Grader", cv2.WINDOW_NORMAL)
cv2.setMouseCallback("Media Grader", self.mouse_callback)
# Set initial window size to a reasonable default (will be resized on first frame)
cv2.resizeWindow("Media Grader", 1280, 720)
while self.media_files and self.current_index < len(self.media_files):
current_file = self.media_files[self.current_index]
if not self.load_media(current_file):
print(f"Could not load {current_file}")
self.current_index += 1
continue
# Setup multi-segment mode if enabled and this is a video
if self.multi_segment_mode and self.is_video(current_file):
self.setup_segment_captures()
window_title = f"Media Grader - {current_file.name} ({self.current_index + 1}/{len(self.media_files)})"
cv2.setWindowTitle("Media Grader", window_title)
while True:
self.display_current_frame()
if self.is_video(current_file):
if self.is_seeking:
delay = self.FRAME_RENDER_TIME_MS
else:
delay = self.calculate_frame_delay()
else:
delay = self.IMAGE_DISPLAY_DELAY_MS
key = cv2.waitKey(delay) & 0xFF
if key == ord("q") or key == 27:
return
elif key == ord(" "):
self.is_playing = not self.is_playing
elif key == ord("s"):
self.playback_speed = max(
self.MIN_PLAYBACK_SPEED,
self.playback_speed - self.SPEED_INCREMENT,
)
elif key == ord("w"):
self.playback_speed = min(
self.MAX_PLAYBACK_SPEED,
self.playback_speed + self.SPEED_INCREMENT,
)
elif self.process_seek_key(key):
continue
elif key == ord("n"):
break
elif key == ord("p"):
self.current_index = max(0, self.current_index - 1)
print("Navigating to: ", self.current_index)
break
elif key == ord("u"):
if self.undo_last_action():
# File was restored, reload it
break
elif key == ord("l"):
# Jump to largest unwatched region (works in both modes)
self.jump_to_unwatched_region()
elif key == ord("j"):
if not self.multi_segment_mode:
self.bisect_backwards()
else:
print("Navigation keys (H/J/K/L) disabled in multi-segment mode")
elif key == ord("k"):
if not self.multi_segment_mode:
self.bisect_forwards()
else:
print("Navigation keys (H/J/K/L) disabled in multi-segment mode")
elif key == ord("h"): # Toggle timeline visibility
self.toggle_timeline()
elif key == ord("g"):
self.toggle_multi_segment_mode()
elif key in [ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]:
grade = int(chr(key))
if not self.grade_media(grade):
return
break
elif key == 255:
if self.is_seeking and self.current_seek_key is not None:
self.process_seek_key(self.current_seek_key)
if (
self.is_playing
and self.is_video(current_file)
and not self.is_seeking
):
if not self.advance_frame():
# Video reached the end, restart it instead of navigating
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
self.current_frame = 0
self.load_current_frame()
if key not in [ord("p"), ord("u"), ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]:
print("Navigating to (pu12345): ", self.current_index)
self.current_index += 1
if self.current_cap:
self.current_cap.release()
self.cleanup_segment_captures()
# Cleanup thread pool
self.thread_pool.shutdown(wait=True)
cv2.destroyAllWindows()
print("Grading session complete!")
def main():
parser = argparse.ArgumentParser(
description="Media Grader - Grade media files by moving them to numbered folders"
)
parser.add_argument(
"directory",
nargs="?",
default=".",
help="Directory to scan for media files (default: current directory)",
)
parser.add_argument(
"--seek-frames",
type=int,
default=30,
help="Number of frames to seek when using arrow keys (default: 30)",
)
parser.add_argument(
"--snap-to-iframe",
action="store_true",
help="Snap to I-frames when seeking backward for better performance",
)
args = parser.parse_args()
if not os.path.isdir(args.directory):
print(f"Error: {args.directory} is not a valid directory")
sys.exit(1)
grader = MediaGrader(args.directory, args.seek_frames, args.snap_to_iframe)
try:
grader.run()
except KeyboardInterrupt:
print("\nGrading session interrupted")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()