Refactor main.py to streamline imports and remove unused classes

This commit simplifies the main.py file by removing the Cv2BufferedCap, MotionTracker, and ProjectView classes, which have been relocated to their respective modules. The import statements have been updated accordingly to enhance clarity and maintainability of the codebase.
This commit is contained in:
2025-09-16 17:06:30 +02:00
parent cb4fd02a42
commit 762bc2e5e0

View File

@@ -11,612 +11,10 @@ import json
import threading
import queue
import subprocess
import ctypes
class Cv2BufferedCap:
"""Buffered wrapper around cv2.VideoCapture that handles frame loading, seeking, and caching correctly"""
def __init__(self, video_path, backend=None):
self.video_path = video_path
self.cap = cv2.VideoCapture(str(video_path), backend)
if not self.cap.isOpened():
raise ValueError(f"Could not open video: {video_path}")
# Video properties
self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Frame cache
self.frame_cache = {}
self.cache_access_order = []
self.MAX_CACHE_FRAMES = 3000
# Current position tracking
self.current_frame = 0
def _manage_cache(self):
"""Manage cache size using LRU eviction"""
while len(self.frame_cache) > self.MAX_CACHE_FRAMES:
oldest_frame = self.cache_access_order.pop(0)
if oldest_frame in self.frame_cache:
del self.frame_cache[oldest_frame]
def _add_to_cache(self, frame_number, frame):
"""Add frame to cache"""
self.frame_cache[frame_number] = frame.copy()
if frame_number in self.cache_access_order:
self.cache_access_order.remove(frame_number)
self.cache_access_order.append(frame_number)
self._manage_cache()
def _get_from_cache(self, frame_number):
"""Get frame from cache and update LRU"""
if frame_number in self.frame_cache:
if frame_number in self.cache_access_order:
self.cache_access_order.remove(frame_number)
self.cache_access_order.append(frame_number)
return self.frame_cache[frame_number].copy()
return None
def get_frame(self, frame_number):
"""Get frame at specific index - always accurate"""
# Clamp frame number to valid range
frame_number = max(0, min(frame_number, self.total_frames - 1))
# Check cache first
cached_frame = self._get_from_cache(frame_number)
if cached_frame is not None:
self.current_frame = frame_number
return cached_frame
# Not in cache, seek to frame and read
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
ret, frame = self.cap.read()
if ret:
self._add_to_cache(frame_number, frame)
self.current_frame = frame_number
return frame
else:
raise ValueError(f"Failed to read frame {frame_number}")
def advance_frame(self, frames=1):
"""Advance by specified number of frames"""
new_frame = self.current_frame + frames
return self.get_frame(new_frame)
def release(self):
"""Release the video capture"""
if self.cap:
self.cap.release()
def isOpened(self):
"""Check if capture is opened"""
return self.cap and self.cap.isOpened()
class MotionTracker:
"""Handles motion tracking for crop and pan operations"""
def __init__(self):
self.tracking_points = {} # {frame_number: [(x, y), ...]}
self.tracking_enabled = False
self.base_crop_rect = None # Original crop rect when tracking started
self.base_zoom_center = None # Original zoom center when tracking started
def add_tracking_point(self, frame_number: int, x: int, y: int):
"""Add a tracking point at the specified frame and coordinates"""
if frame_number not in self.tracking_points:
self.tracking_points[frame_number] = []
self.tracking_points[frame_number].append((x, y))
def remove_tracking_point(self, frame_number: int, point_index: int):
"""Remove a tracking point by frame and index"""
if frame_number in self.tracking_points and 0 <= point_index < len(self.tracking_points[frame_number]):
del self.tracking_points[frame_number][point_index]
if not self.tracking_points[frame_number]:
del self.tracking_points[frame_number]
def clear_tracking_points(self):
"""Clear all tracking points"""
self.tracking_points.clear()
def get_tracking_points_for_frame(self, frame_number: int) -> List[Tuple[int, int]]:
"""Get all tracking points for a specific frame"""
return self.tracking_points.get(frame_number, [])
def has_tracking_points(self) -> bool:
"""Check if any tracking points exist"""
return bool(self.tracking_points)
def get_interpolated_position(self, frame_number: int) -> Optional[Tuple[float, float]]:
"""Get interpolated position for a frame based on tracking points"""
if not self.tracking_points:
return None
# Get all frames with tracking points
frames = sorted(self.tracking_points.keys())
if not frames:
return None
# If we have a point at this exact frame, return it
if frame_number in self.tracking_points:
points = self.tracking_points[frame_number]
if points:
# Return average of all points at this frame
avg_x = sum(p[0] for p in points) / len(points)
avg_y = sum(p[1] for p in points) / len(points)
return (avg_x, avg_y)
# If frame is before first tracking point
if frame_number < frames[0]:
points = self.tracking_points[frames[0]]
if points:
avg_x = sum(p[0] for p in points) / len(points)
avg_y = sum(p[1] for p in points) / len(points)
return (avg_x, avg_y)
# If frame is after last tracking point
if frame_number > frames[-1]:
points = self.tracking_points[frames[-1]]
if points:
avg_x = sum(p[0] for p in points) / len(points)
avg_y = sum(p[1] for p in points) / len(points)
return (avg_x, avg_y)
# Find the two frames to interpolate between
for i in range(len(frames) - 1):
if frames[i] <= frame_number <= frames[i + 1]:
frame1, frame2 = frames[i], frames[i + 1]
points1 = self.tracking_points[frame1]
points2 = self.tracking_points[frame2]
if not points1 or not points2:
continue
# Get average positions for each frame
avg_x1 = sum(p[0] for p in points1) / len(points1)
avg_y1 = sum(p[1] for p in points1) / len(points1)
avg_x2 = sum(p[0] for p in points2) / len(points2)
avg_y2 = sum(p[1] for p in points2) / len(points2)
# Linear interpolation
t = (frame_number - frame1) / (frame2 - frame1)
interp_x = avg_x1 + t * (avg_x2 - avg_x1)
interp_y = avg_y1 + t * (avg_y2 - avg_y1)
return (interp_x, interp_y)
return None
def get_tracking_offset(self, frame_number: int) -> Tuple[float, float]:
"""Get the offset to center the crop on the tracked point"""
if not self.tracking_enabled or not self.base_zoom_center:
return (0.0, 0.0)
current_pos = self.get_interpolated_position(frame_number)
if not current_pos:
return (0.0, 0.0)
# Calculate offset to center the crop on the tracked point
# The offset should move the display so the tracked point stays centered
offset_x = current_pos[0] - self.base_zoom_center[0]
offset_y = current_pos[1] - self.base_zoom_center[1]
return (offset_x, offset_y)
def start_tracking(self, base_crop_rect: Tuple[int, int, int, int], base_zoom_center: Tuple[int, int]):
"""Start motion tracking with base positions"""
self.tracking_enabled = True
self.base_crop_rect = base_crop_rect
self.base_zoom_center = base_zoom_center
def stop_tracking(self):
"""Stop motion tracking"""
self.tracking_enabled = False
self.base_crop_rect = None
self.base_zoom_center = None
def to_dict(self) -> Dict:
"""Convert to dictionary for serialization"""
return {
'tracking_points': self.tracking_points,
'tracking_enabled': self.tracking_enabled,
'base_crop_rect': self.base_crop_rect,
'base_zoom_center': self.base_zoom_center
}
def from_dict(self, data: Dict):
"""Load from dictionary for deserialization"""
# Convert string keys back to integers for tracking_points
tracking_points_data = data.get('tracking_points', {})
self.tracking_points = {}
for frame_str, points in tracking_points_data.items():
frame_num = int(frame_str) # Convert string key to integer
self.tracking_points[frame_num] = points
self.tracking_enabled = data.get('tracking_enabled', False)
self.base_crop_rect = data.get('base_crop_rect', None)
self.base_zoom_center = data.get('base_zoom_center', None)
def get_active_window_title():
"""Get the title of the currently active window"""
try:
# Get handle to foreground window
hwnd = ctypes.windll.user32.GetForegroundWindow()
# Get window title length
length = ctypes.windll.user32.GetWindowTextLengthW(hwnd)
# Create buffer and get window title
buffer = ctypes.create_unicode_buffer(length + 1)
ctypes.windll.user32.GetWindowTextW(hwnd, buffer, length + 1)
return buffer.value
except:
return ""
class ProjectView:
"""Project view that displays videos in current directory with progress bars"""
# Project view configuration
THUMBNAIL_SIZE = (200, 150) # Width, Height
THUMBNAIL_MARGIN = 20
PROGRESS_BAR_HEIGHT = 8
TEXT_HEIGHT = 30
# Colors
BG_COLOR = (40, 40, 40)
THUMBNAIL_BG_COLOR = (60, 60, 60)
PROGRESS_BG_COLOR = (80, 80, 80)
PROGRESS_FILL_COLOR = (0, 120, 255)
TEXT_COLOR = (255, 255, 255)
SELECTED_COLOR = (255, 165, 0)
def __init__(self, directory: Path, video_editor):
self.directory = directory
self.video_editor = video_editor
self.video_files = []
self.thumbnails = {}
self.progress_data = {}
self.selected_index = 0
self.scroll_offset = 0
self.items_per_row = 2 # Default to 2 items per row
self.window_width = 1200
self.window_height = 800
self._load_video_files()
self._load_progress_data()
def _calculate_thumbnail_size(self, window_width: int) -> tuple:
"""Calculate thumbnail size based on items per row and window width"""
available_width = window_width - self.THUMBNAIL_MARGIN
item_width = (available_width - (self.items_per_row - 1) * self.THUMBNAIL_MARGIN) // self.items_per_row
thumbnail_width = max(50, item_width) # Minimum 50px width
thumbnail_height = int(thumbnail_width * self.THUMBNAIL_SIZE[1] / self.THUMBNAIL_SIZE[0]) # Maintain aspect ratio
return (thumbnail_width, thumbnail_height)
def _load_video_files(self):
"""Load all video files from directory"""
self.video_files = []
for file_path in self.directory.iterdir():
if (file_path.is_file() and
file_path.suffix.lower() in self.video_editor.VIDEO_EXTENSIONS):
self.video_files.append(file_path)
self.video_files.sort(key=lambda x: x.name)
def _load_progress_data(self):
"""Load progress data from JSON state files"""
self.progress_data = {}
for video_path in self.video_files:
state_file = video_path.with_suffix('.json')
if state_file.exists():
try:
with open(state_file, 'r') as f:
state = json.load(f)
current_frame = state.get('current_frame', 0)
# Get total frames from video
cap = cv2.VideoCapture(str(video_path))
if cap.isOpened():
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
if total_frames > 0:
progress = current_frame / (total_frames - 1)
self.progress_data[video_path] = {
'current_frame': current_frame,
'total_frames': total_frames,
'progress': progress
}
except Exception as e:
print(f"Error loading progress for {video_path.name}: {e}")
def refresh_progress_data(self):
"""Refresh progress data from JSON files (call when editor state changes)"""
self._load_progress_data()
def get_progress_for_video(self, video_path: Path) -> float:
"""Get progress (0.0 to 1.0) for a video"""
if video_path in self.progress_data:
return self.progress_data[video_path]['progress']
return 0.0
def get_thumbnail_for_video(self, video_path: Path, size: tuple = None) -> np.ndarray:
"""Get thumbnail for a video, generating it if needed"""
if size is None:
size = self.THUMBNAIL_SIZE
# Cache the original thumbnail by video path only (not size)
if video_path in self.thumbnails:
original_thumbnail = self.thumbnails[video_path]
# Resize the cached thumbnail to the requested size
return cv2.resize(original_thumbnail, size)
# Generate original thumbnail on demand (only once per video)
try:
cap = cv2.VideoCapture(str(video_path))
if cap.isOpened():
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if total_frames > 0:
middle_frame = total_frames // 2
cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame)
ret, frame = cap.read()
if ret:
# Store original thumbnail at original size
original_thumbnail = cv2.resize(frame, self.THUMBNAIL_SIZE)
self.thumbnails[video_path] = original_thumbnail
cap.release()
# Return resized version
return cv2.resize(original_thumbnail, size)
cap.release()
except Exception as e:
print(f"Error generating thumbnail for {video_path.name}: {e}")
# Return a placeholder if thumbnail generation failed
placeholder = np.full((size[1], size[0], 3),
self.THUMBNAIL_BG_COLOR, dtype=np.uint8)
return placeholder
def draw(self) -> np.ndarray:
"""Draw the project view"""
# Get actual window size dynamically
try:
# Try to get the actual window size from OpenCV
window_rect = cv2.getWindowImageRect("Project View")
if window_rect[2] > 0 and window_rect[3] > 0: # width and height > 0
actual_width = window_rect[2]
actual_height = window_rect[3]
else:
# Fallback to default size
actual_width = self.window_width
actual_height = self.window_height
except:
# Fallback to default size
actual_width = self.window_width
actual_height = self.window_height
canvas = np.full((actual_height, actual_width, 3), self.BG_COLOR, dtype=np.uint8)
if not self.video_files:
# No videos message
text = "No videos found in directory"
font = cv2.FONT_HERSHEY_SIMPLEX
text_size = cv2.getTextSize(text, font, 1.0, 2)[0]
text_x = (actual_width - text_size[0]) // 2
text_y = (actual_height - text_size[1]) // 2
cv2.putText(canvas, text, (text_x, text_y), font, 1.0, self.TEXT_COLOR, 2)
return canvas
# Calculate layout - use fixed items_per_row and calculate thumbnail size to fit
items_per_row = min(self.items_per_row, len(self.video_files)) # Don't exceed number of videos
# Calculate thumbnail size to fit the desired number of items per row
thumbnail_width, thumbnail_height = self._calculate_thumbnail_size(actual_width)
# Calculate item height dynamically based on thumbnail size
item_height = thumbnail_height + self.PROGRESS_BAR_HEIGHT + self.TEXT_HEIGHT + self.THUMBNAIL_MARGIN
item_width = (actual_width - (items_per_row + 1) * self.THUMBNAIL_MARGIN) // items_per_row
# Draw videos in grid
for i, video_path in enumerate(self.video_files):
row = i // items_per_row
col = i % items_per_row
# Skip if scrolled out of view
if row < self.scroll_offset:
continue
if row > self.scroll_offset + (actual_height // item_height):
break
# Calculate position
x = self.THUMBNAIL_MARGIN + col * (item_width + self.THUMBNAIL_MARGIN)
y = self.THUMBNAIL_MARGIN + (row - self.scroll_offset) * item_height
# Draw thumbnail background
cv2.rectangle(canvas,
(x, y),
(x + thumbnail_width, y + thumbnail_height),
self.THUMBNAIL_BG_COLOR, -1)
# Draw selection highlight
if i == self.selected_index:
cv2.rectangle(canvas,
(x - 2, y - 2),
(x + thumbnail_width + 2, y + thumbnail_height + 2),
self.SELECTED_COLOR, 3)
# Draw thumbnail
thumbnail = self.get_thumbnail_for_video(video_path, (thumbnail_width, thumbnail_height))
# Thumbnail is already the correct size, no need to resize
resized_thumbnail = thumbnail
# Ensure thumbnail doesn't exceed canvas bounds
end_y = min(y + thumbnail_height, actual_height)
end_x = min(x + thumbnail_width, actual_width)
thumb_height = end_y - y
thumb_width = end_x - x
if thumb_height > 0 and thumb_width > 0:
# Resize thumbnail to fit within bounds if necessary
if thumb_height != thumbnail_height or thumb_width != thumbnail_width:
resized_thumbnail = cv2.resize(thumbnail, (thumb_width, thumb_height))
canvas[y:end_y, x:end_x] = resized_thumbnail
# Draw progress bar
progress_y = y + thumbnail_height + 5
progress_width = thumbnail_width
progress = self.get_progress_for_video(video_path)
# Progress background
cv2.rectangle(canvas,
(x, progress_y),
(x + progress_width, progress_y + self.PROGRESS_BAR_HEIGHT),
self.PROGRESS_BG_COLOR, -1)
# Progress fill
if progress > 0:
fill_width = int(progress_width * progress)
cv2.rectangle(canvas,
(x, progress_y),
(x + fill_width, progress_y + self.PROGRESS_BAR_HEIGHT),
self.PROGRESS_FILL_COLOR, -1)
# Draw filename
filename = video_path.name
# Truncate if too long
if len(filename) > 25:
filename = filename[:22] + "..."
text_y = progress_y + self.PROGRESS_BAR_HEIGHT + 20
cv2.putText(canvas, filename, (x, text_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, self.TEXT_COLOR, 2)
# Draw progress percentage
if video_path in self.progress_data:
progress_text = f"{progress * 100:.0f}%"
text_size = cv2.getTextSize(progress_text, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)[0]
progress_text_x = x + progress_width - text_size[0]
cv2.putText(canvas, progress_text, (progress_text_x, text_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.TEXT_COLOR, 1)
# Draw instructions
instructions = [
"Project View - Videos in current directory",
"WASD: Navigate | E: Open video | Q: Fewer items per row | Y: More items per row | q: Quit | ESC: Back to editor",
f"Showing {len(self.video_files)} videos | {items_per_row} per row | Thumbnail: {thumbnail_width}x{thumbnail_height}"
]
for i, instruction in enumerate(instructions):
y_pos = actual_height - 60 + i * 20
cv2.putText(canvas, instruction, (10, y_pos),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.TEXT_COLOR, 1)
return canvas
def handle_key(self, key: int) -> str:
"""Handle keyboard input, returns action taken"""
if key == 27: # ESC
return "back_to_editor"
elif key == ord('q'): # lowercase q - Quit
return "quit"
elif key == ord('e') or key == ord('E'): # E - Open video
if self.video_files and 0 <= self.selected_index < len(self.video_files):
return f"open_video:{self.video_files[self.selected_index]}"
elif key == ord('w') or key == ord('W'): # W - Up
current_items_per_row = min(self.items_per_row, len(self.video_files))
if self.selected_index >= current_items_per_row:
self.selected_index -= current_items_per_row
else:
self.selected_index = 0
self._update_scroll()
elif key == ord('s') or key == ord('S'): # S - Down
current_items_per_row = min(self.items_per_row, len(self.video_files))
if self.selected_index + current_items_per_row < len(self.video_files):
self.selected_index += current_items_per_row
else:
self.selected_index = len(self.video_files) - 1
self._update_scroll()
elif key == ord('a') or key == ord('A'): # A - Left
if self.selected_index > 0:
self.selected_index -= 1
self._update_scroll()
elif key == ord('d') or key == ord('D'): # D - Right
if self.selected_index < len(self.video_files) - 1:
self.selected_index += 1
self._update_scroll()
elif key == ord('Q'): # uppercase Q - Fewer items per row (larger thumbnails)
if self.items_per_row > 1:
self.items_per_row -= 1
print(f"Items per row: {self.items_per_row}")
elif key == ord('y') or key == ord('Y'): # Y - More items per row (smaller thumbnails)
self.items_per_row += 1
print(f"Items per row: {self.items_per_row}")
return "none"
def _update_scroll(self):
"""Update scroll offset based on selected item"""
if not self.video_files:
return
# Use fixed items per row
items_per_row = min(self.items_per_row, len(self.video_files))
# Get window dimensions for calculations
try:
window_rect = cv2.getWindowImageRect("Project View")
if window_rect[2] > 0 and window_rect[3] > 0:
window_width = window_rect[2]
window_height = window_rect[3]
else:
window_width = self.window_width
window_height = self.window_height
except:
window_width = self.window_width
window_height = self.window_height
# Calculate thumbnail size and item height dynamically
thumbnail_width, thumbnail_height = self._calculate_thumbnail_size(window_width)
item_height = thumbnail_height + self.PROGRESS_BAR_HEIGHT + self.TEXT_HEIGHT + self.THUMBNAIL_MARGIN
selected_row = self.selected_index // items_per_row
visible_rows = max(1, window_height // item_height)
# Calculate how many rows we can actually show
total_rows = (len(self.video_files) + items_per_row - 1) // items_per_row
# If we can show all rows, no scrolling needed
if total_rows <= visible_rows:
self.scroll_offset = 0
return
# Update scroll to keep selected item visible
if selected_row < self.scroll_offset:
self.scroll_offset = selected_row
elif selected_row >= self.scroll_offset + visible_rows:
self.scroll_offset = selected_row - visible_rows + 1
# Ensure scroll offset doesn't go negative or beyond available content
self.scroll_offset = max(0, min(self.scroll_offset, total_rows - visible_rows))
# Use extracted implementations while preserving original definitions above
from .capture import Cv2BufferedCap as _ExtCv2BufferedCap
Cv2BufferedCap = _ExtCv2BufferedCap
from .tracking import MotionTracker as _ExtMotionTracker
MotionTracker = _ExtMotionTracker
from .utils import get_active_window_title as _ext_get_active_window_title
get_active_window_title = _ext_get_active_window_title
from .capture import Cv2BufferedCap
from .tracking import MotionTracker
from .utils import get_active_window_title
from .project_view import ProjectView
class VideoEditor:
# Configuration constants