refactor(main.py): remove unused constants and simplify video seeking logic

This commit is contained in:
2025-08-18 16:50:10 +02:00
parent 85768f6323
commit 2840ed14bc

204
main.py
View File

@@ -6,33 +6,26 @@ import argparse
import shutil import shutil
import time import time
from pathlib import Path from pathlib import Path
from typing import List, Tuple, Optional from typing import List
class MediaGrader: class MediaGrader:
# Configuration constants # Configuration constants
DEFAULT_FPS = 30
BASE_FRAME_DELAY_MS = 33 # ~30 FPS BASE_FRAME_DELAY_MS = 33 # ~30 FPS
KEY_REPEAT_RATE_SEC = 0.5 # How often to process key repeats (10 times per second) KEY_REPEAT_RATE_SEC = 0.5 # How often to process key repeats
FAST_SEEK_ACTIVATION_TIME = 0.5 # How long to hold before fast seek FAST_SEEK_ACTIVATION_TIME = 0.5 # How long to hold before fast seek
FRAME_RENDER_TIME_MS = 50 # Time to let frames render between seeks FRAME_RENDER_TIME_MS = 50 # Time to let frames render between seeks
WINDOW_MAX_WIDTH = 1200
WINDOW_MAX_HEIGHT = 800
WINDOW_MAX_SCALE_UP = 2.0
SPEED_INCREMENT = 0.1 SPEED_INCREMENT = 0.1
MIN_PLAYBACK_SPEED = 0.1 MIN_PLAYBACK_SPEED = 0.1
MAX_PLAYBACK_SPEED = 100.0 MAX_PLAYBACK_SPEED = 100.0
FAST_SEEK_MULTIPLIER = 5 FAST_SEEK_MULTIPLIER = 5
IFRAME_SNAP_INTERVAL = 30
IMAGE_DISPLAY_DELAY_MS = 100 IMAGE_DISPLAY_DELAY_MS = 100
SEEK_DISPLAY_INTERVAL = 10 # Update display every N frames during seeking
def __init__( def __init__(
self, directory: str, seek_frames: int = 30, snap_to_iframe: bool = False self, directory: str, seek_frames: int = 30, snap_to_iframe: bool = False
): ):
self.directory = Path(directory) self.directory = Path(directory)
self.seek_frames = seek_frames self.seek_frames = seek_frames
self.snap_to_iframe = snap_to_iframe
self.current_index = 0 self.current_index = 0
self.playback_speed = 1.0 self.playback_speed = 1.0
self.media_files = [] self.media_files = []
@@ -54,18 +47,17 @@ class MediaGrader:
# Current frame cache for display # Current frame cache for display
self.current_display_frame = None self.current_display_frame = None
self.window_resized = False
# Supported media extensions # Supported media extensions
self.extensions = [ self.extensions = [
"*.png", ".png",
"*.jpg", ".jpg",
"*.jpeg", ".jpeg",
"*.gif", ".gif",
"*.mp4", ".mp4",
"*.avi", ".avi",
"*.mov", ".mov",
"*.mkv", ".mkv",
] ]
# Create grade directories # Create grade directories
@@ -77,7 +69,7 @@ class MediaGrader:
"""Find all media files recursively in the directory""" """Find all media files recursively in the directory"""
media_files = [] media_files = []
for ext in self.extensions: for ext in self.extensions:
pattern = str(self.directory / "**" / ext) pattern = str(self.directory / "**" / f"*{ext}")
files = glob.glob(pattern, recursive=True) files = glob.glob(pattern, recursive=True)
media_files.extend([Path(f) for f in files]) media_files.extend([Path(f) for f in files])
@@ -96,20 +88,19 @@ class MediaGrader:
def calculate_frame_delay(self) -> int: def calculate_frame_delay(self) -> int:
"""Calculate frame delay in milliseconds based on playback speed""" """Calculate frame delay in milliseconds based on playback speed"""
# Base delay for 30 FPS, adjusted by playback speed
delay_ms = int(self.BASE_FRAME_DELAY_MS / self.playback_speed) delay_ms = int(self.BASE_FRAME_DELAY_MS / self.playback_speed)
return max(1, delay_ms) # Minimum 1ms delay return max(1, delay_ms)
def calculate_frames_to_skip(self) -> int: def calculate_frames_to_skip(self) -> int:
"""Calculate how many frames to skip for high-speed playback""" """Calculate how many frames to skip for high-speed playback"""
if self.playback_speed <= 1.0: if self.playback_speed <= 1.0:
return 0 return 0
elif self.playback_speed <= 2.0: elif self.playback_speed <= 2.0:
return 0 # No skipping for moderate speeds return 0
elif self.playback_speed <= 5.0: elif self.playback_speed <= 5.0:
return int(self.playback_speed - 1) # Skip some frames return int(self.playback_speed - 1)
else: else:
return int(self.playback_speed * 2) # Skip many frames for very high speeds return int(self.playback_speed * 2)
def load_media(self, file_path: Path) -> bool: def load_media(self, file_path: Path) -> bool:
"""Load media file for display""" """Load media file for display"""
@@ -123,14 +114,12 @@ class MediaGrader:
self.total_frames = int(self.current_cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.total_frames = int(self.current_cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.current_frame = 0 self.current_frame = 0
else: else:
# For images, we'll just display them
self.current_cap = None self.current_cap = None
self.total_frames = 1 self.total_frames = 1
self.current_frame = 0 self.current_frame = 0
# Load initial frame # Load initial frame
self.load_current_frame() self.load_current_frame()
self.window_resized = False
return True return True
def load_current_frame(self): def load_current_frame(self):
@@ -139,7 +128,6 @@ class MediaGrader:
if not self.current_cap: if not self.current_cap:
return False return False
# Read frame at current position
ret, frame = self.current_cap.read() ret, frame = self.current_cap.read()
if ret: if ret:
self.current_display_frame = frame self.current_display_frame = frame
@@ -147,7 +135,6 @@ class MediaGrader:
return True return True
return False return False
else: else:
# Load image
frame = cv2.imread(str(self.media_files[self.current_index])) frame = cv2.imread(str(self.media_files[self.current_index]))
if frame is not None: if frame is not None:
self.current_display_frame = frame self.current_display_frame = frame
@@ -161,17 +148,10 @@ class MediaGrader:
frame = self.current_display_frame.copy() frame = self.current_display_frame.copy()
# Auto-resize window on first frame
if not self.window_resized:
self.auto_resize_window(frame)
self.window_resized = True
# Add info overlay # Add info overlay
current_file = self.media_files[self.current_index] current_file = self.media_files[self.current_index]
info_text = f"Speed: {self.playback_speed:.1f}x | Frame: {self.current_frame}/{self.total_frames} | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}" info_text = f"Speed: {self.playback_speed:.1f}x | Frame: {self.current_frame}/{self.total_frames} | File: {self.current_index + 1}/{len(self.media_files)} | {'Playing' if self.is_playing else 'PAUSED'}"
help_text = "Seek: A/D (hold=FAST) ,. (fine) | W/S speed | 1-5 grade | Space pause | Q quit"
# White background for text visibility
cv2.putText( cv2.putText(
frame, frame,
info_text, info_text,
@@ -182,31 +162,7 @@ class MediaGrader:
2, 2,
) )
cv2.putText( cv2.putText(
frame, frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 1
info_text,
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(0, 0, 0),
1,
)
cv2.putText(
frame,
help_text,
(10, 60),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(255, 255, 255),
2,
)
cv2.putText(
frame,
help_text,
(10, 60),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 0, 0),
1,
) )
cv2.imshow("Media Grader", frame) cv2.imshow("Media Grader", frame)
@@ -219,10 +175,9 @@ class MediaGrader:
): ):
return return
# Skip frames for high-speed playback
frames_to_skip = self.calculate_frames_to_skip() frames_to_skip = self.calculate_frames_to_skip()
for _ in range(frames_to_skip + 1): # +1 to advance at least one frame for _ in range(frames_to_skip + 1):
ret, frame = self.current_cap.read() ret, frame = self.current_cap.read()
if not ret: if not ret:
return False return False
@@ -231,67 +186,6 @@ class MediaGrader:
self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES)) self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES))
return True return True
def auto_resize_window(self, frame):
"""Auto-resize window to fit media while respecting screen limits"""
height, width = frame.shape[:2]
# Calculate scaling factor to fit within max dimensions
scale_w = (
self.WINDOW_MAX_WIDTH / width if width > self.WINDOW_MAX_WIDTH else 1.0
)
scale_h = (
self.WINDOW_MAX_HEIGHT / height if height > self.WINDOW_MAX_HEIGHT else 1.0
)
scale = min(scale_w, scale_h)
# Don't scale up small images too much
if scale > self.WINDOW_MAX_SCALE_UP:
scale = self.WINDOW_MAX_SCALE_UP
new_width = int(width * scale)
new_height = int(height * scale)
cv2.resizeWindow("Media Grader", new_width, new_height)
def seek_to_iframe(self, target_frame):
"""Seek to the nearest I-frame at or before target_frame"""
if not self.current_cap:
return False
# For more reliable seeking, always snap to I-frames
iframe_frame = (
target_frame // self.IFRAME_SNAP_INTERVAL
) * self.IFRAME_SNAP_INTERVAL
iframe_frame = max(0, min(iframe_frame, self.total_frames - 1))
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, iframe_frame)
# If we need to get closer to target, read frames sequentially
current_pos = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES))
frames_to_read = target_frame - current_pos
if frames_to_read > 0 and frames_to_read < 60: # Only if it's reasonable
for i in range(frames_to_read):
ret, frame = self.current_cap.read()
if not ret:
break
# Update display every few frames during seeking
if i % self.SEEK_DISPLAY_INTERVAL == 0:
self.current_display_frame = frame
self.current_frame = int(
self.current_cap.get(cv2.CAP_PROP_POS_FRAMES)
)
self.display_current_frame()
cv2.waitKey(1) # Process display events
else:
# For large seeks, just go to the I-frame
ret, frame = self.current_cap.read()
if ret:
self.current_display_frame = frame
self.current_frame = int(self.current_cap.get(cv2.CAP_PROP_POS_FRAMES))
return True
def seek_video(self, frames_delta: int): def seek_video(self, frames_delta: int):
"""Seek video by specified number of frames""" """Seek video by specified number of frames"""
if not self.current_cap or not self.is_video( if not self.current_cap or not self.is_video(
@@ -303,38 +197,28 @@ class MediaGrader:
0, min(self.current_frame + frames_delta, self.total_frames - 1) 0, min(self.current_frame + frames_delta, self.total_frames - 1)
) )
print(
f"Seeking from {self.current_frame} to {target_frame} (delta: {frames_delta})"
)
# Use simple direct seeking - let OpenCV handle it
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
self.load_current_frame() self.load_current_frame()
print(f"Seeked to frame {self.current_frame}")
def process_seek_key(self, key: int) -> bool: def process_seek_key(self, key: int) -> bool:
"""Process seeking keys with proper rate limiting""" """Process seeking keys with proper rate limiting"""
current_time = time.time() current_time = time.time()
# Check if this is a seek key
seek_direction = 0 seek_direction = 0
seek_amount = 0 seek_amount = 0
if key == ord("a"): # Seek backward if key == ord("a"):
seek_direction = -1 seek_direction = -1
elif key == ord("d"): # Seek forward elif key == ord("d"):
seek_direction = 1 seek_direction = 1
elif key == ord(","): # Fine seek backward elif key == ord(","):
seek_amount = -self.fine_seek_frames seek_amount = -self.fine_seek_frames
elif key == ord("."): # Fine seek forward elif key == ord("."):
seek_amount = self.fine_seek_frames seek_amount = self.fine_seek_frames
else: else:
# Not a seek key, reset seeking state
if self.current_seek_key is not None: if self.current_seek_key is not None:
self.current_seek_key = None self.current_seek_key = None
self.is_seeking = False self.is_seeking = False
print("Seek key released")
return False return False
# Handle fine seeking (comma/period) - always immediate # Handle fine seeking (comma/period) - always immediate
@@ -344,33 +228,25 @@ class MediaGrader:
# Handle arrow key seeking with rate limiting # Handle arrow key seeking with rate limiting
if seek_direction != 0: if seek_direction != 0:
# Check if we should process this key press
if self.current_seek_key != key: if self.current_seek_key != key:
# New key press
self.current_seek_key = key self.current_seek_key = key
self.key_first_press_time = current_time self.key_first_press_time = current_time
self.last_seek_time = current_time self.last_seek_time = current_time
self.is_seeking = True self.is_seeking = True
# Immediate first seek
seek_amount = seek_direction * self.coarse_seek_frames seek_amount = seek_direction * self.coarse_seek_frames
self.seek_video(seek_amount) self.seek_video(seek_amount)
print(f"Started seeking {seek_direction}")
return True return True
elif self.is_seeking: elif self.is_seeking:
# Continuing to hold the same key
time_since_last_seek = current_time - self.last_seek_time time_since_last_seek = current_time - self.last_seek_time
time_held = current_time - self.key_first_press_time time_held = current_time - self.key_first_press_time
# Only seek if enough time has passed (rate limiting)
if time_since_last_seek >= self.KEY_REPEAT_RATE_SEC: if time_since_last_seek >= self.KEY_REPEAT_RATE_SEC:
self.last_seek_time = current_time self.last_seek_time = current_time
# Determine seek amount based on how long key has been held
if time_held > self.FAST_SEEK_ACTIVATION_TIME: if time_held > self.FAST_SEEK_ACTIVATION_TIME:
seek_amount = seek_direction * self.fast_seek_frames seek_amount = seek_direction * self.fast_seek_frames
print(f"FAST SEEK: {seek_amount} frames")
else: else:
seek_amount = seek_direction * self.coarse_seek_frames seek_amount = seek_direction * self.coarse_seek_frames
@@ -388,7 +264,6 @@ class MediaGrader:
grade_dir = self.directory / str(grade) grade_dir = self.directory / str(grade)
destination = grade_dir / current_file.name destination = grade_dir / current_file.name
# Handle name conflicts
counter = 1 counter = 1
while destination.exists(): while destination.exists():
stem = current_file.stem stem = current_file.stem
@@ -400,10 +275,8 @@ class MediaGrader:
shutil.move(str(current_file), str(destination)) shutil.move(str(current_file), str(destination))
print(f"Moved {current_file.name} to grade {grade}") print(f"Moved {current_file.name} to grade {grade}")
# Remove from current list
self.media_files.pop(self.current_index) self.media_files.pop(self.current_index)
# Adjust current index
if self.current_index >= len(self.media_files): if self.current_index >= len(self.media_files):
self.current_index = 0 self.current_index = 0
@@ -434,7 +307,6 @@ class MediaGrader:
print(" N: Next file") print(" N: Next file")
print(" P: Previous file") print(" P: Previous file")
print(" Q/ESC: Quit") print(" Q/ESC: Quit")
print(f" Seek repeat rate: {1/self.KEY_REPEAT_RATE_SEC:.1f} seeks/second")
cv2.namedWindow("Media Grader", cv2.WINDOW_NORMAL) cv2.namedWindow("Media Grader", cv2.WINDOW_NORMAL)
@@ -450,13 +322,10 @@ class MediaGrader:
cv2.setWindowTitle("Media Grader", window_title) cv2.setWindowTitle("Media Grader", window_title)
while True: while True:
# Always display the current cached frame
self.display_current_frame() self.display_current_frame()
# Calculate appropriate delay
if self.is_video(current_file): if self.is_video(current_file):
if self.is_seeking: if self.is_seeking:
# Shorter delay when seeking to be more responsive
delay = self.FRAME_RENDER_TIME_MS delay = self.FRAME_RENDER_TIME_MS
else: else:
delay = self.calculate_frame_delay() delay = self.calculate_frame_delay()
@@ -465,52 +334,45 @@ class MediaGrader:
key = cv2.waitKey(delay) & 0xFF key = cv2.waitKey(delay) & 0xFF
# Debug: print key codes to help with arrow key detection if key == ord("q") or key == 27:
if key != 255: # 255 means no key pressed
print(f"Key pressed: {key}")
if key == ord("q") or key == 27: # Q or ESC
return return
elif key == ord(" "): # Space - pause/play elif key == ord(" "):
self.is_playing = not self.is_playing self.is_playing = not self.is_playing
print(f"{'Playing' if self.is_playing else 'Paused'}")
elif key == ord("w"): elif key == ord("w"):
self.playback_speed = max( self.playback_speed = max(
self.MIN_PLAYBACK_SPEED, self.MIN_PLAYBACK_SPEED,
self.playback_speed - self.SPEED_INCREMENT, self.playback_speed - self.SPEED_INCREMENT,
) )
print(f"Speed: {self.playback_speed:.1f}x")
elif key == ord("s"): elif key == ord("s"):
self.playback_speed = min( self.playback_speed = min(
self.MAX_PLAYBACK_SPEED, self.MAX_PLAYBACK_SPEED,
self.playback_speed + self.SPEED_INCREMENT, self.playback_speed + self.SPEED_INCREMENT,
) )
print(f"Speed: {self.playback_speed:.1f}x")
elif self.process_seek_key(key): elif self.process_seek_key(key):
# Seeking was handled
pass pass
elif key == ord("n"): # Next file elif key == ord("n"):
break break
elif key == ord("p"): # Previous file elif key == ord("p"):
self.current_index = max(0, self.current_index - 1) self.current_index = max(0, self.current_index - 1)
break break
elif key in [ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]: # Grade elif key in [ord("1"), ord("2"), ord("3"), ord("4"), ord("5")]:
grade = int(chr(key)) grade = int(chr(key))
if not self.grade_media(grade): if not self.grade_media(grade):
return return
break break
elif key == 255: # No key pressed elif key == 255:
# Continue seeking if we're in seeking mode
if self.is_seeking and self.current_seek_key is not None: if self.is_seeking and self.current_seek_key is not None:
self.process_seek_key(self.current_seek_key) self.process_seek_key(self.current_seek_key)
# Advance frame only if playing (and it's a video) and not seeking if (
if self.is_playing and self.is_video(current_file) and not self.is_seeking: self.is_playing
and self.is_video(current_file)
and not self.is_seeking
):
if not self.advance_frame(): if not self.advance_frame():
# End of video
break break
if key not in [ord("p")]: # Don't increment for previous if key not in [ord("p")]:
self.current_index += 1 self.current_index += 1
if self.current_cap: if self.current_cap: