Compare commits
3 Commits
cf44597268
...
099d551e1d
Author | SHA1 | Date | |
---|---|---|---|
099d551e1d | |||
762bc2e5e0 | |||
cb4fd02a42 |
87
croppa/capture.py
Normal file
87
croppa/capture.py
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
import cv2
|
||||||
|
|
||||||
|
|
||||||
|
class Cv2BufferedCap:
|
||||||
|
"""Buffered wrapper around cv2.VideoCapture that handles frame loading, seeking, and caching correctly"""
|
||||||
|
|
||||||
|
def __init__(self, video_path, backend=None):
|
||||||
|
self.video_path = video_path
|
||||||
|
self.cap = cv2.VideoCapture(str(video_path), backend)
|
||||||
|
if not self.cap.isOpened():
|
||||||
|
raise ValueError(f"Could not open video: {video_path}")
|
||||||
|
|
||||||
|
# Video properties
|
||||||
|
self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||||
|
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
|
||||||
|
self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||||
|
self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||||
|
|
||||||
|
# Frame cache
|
||||||
|
self.frame_cache = {}
|
||||||
|
self.cache_access_order = []
|
||||||
|
self.MAX_CACHE_FRAMES = 3000
|
||||||
|
|
||||||
|
# Current position tracking
|
||||||
|
self.current_frame = 0
|
||||||
|
|
||||||
|
def _manage_cache(self):
|
||||||
|
"""Manage cache size using LRU eviction"""
|
||||||
|
while len(self.frame_cache) > self.MAX_CACHE_FRAMES:
|
||||||
|
oldest_frame = self.cache_access_order.pop(0)
|
||||||
|
if oldest_frame in self.frame_cache:
|
||||||
|
del self.frame_cache[oldest_frame]
|
||||||
|
|
||||||
|
def _add_to_cache(self, frame_number, frame):
|
||||||
|
"""Add frame to cache"""
|
||||||
|
self.frame_cache[frame_number] = frame.copy()
|
||||||
|
if frame_number in self.cache_access_order:
|
||||||
|
self.cache_access_order.remove(frame_number)
|
||||||
|
self.cache_access_order.append(frame_number)
|
||||||
|
self._manage_cache()
|
||||||
|
|
||||||
|
def _get_from_cache(self, frame_number):
|
||||||
|
"""Get frame from cache and update LRU"""
|
||||||
|
if frame_number in self.frame_cache:
|
||||||
|
if frame_number in self.cache_access_order:
|
||||||
|
self.cache_access_order.remove(frame_number)
|
||||||
|
self.cache_access_order.append(frame_number)
|
||||||
|
return self.frame_cache[frame_number].copy()
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_frame(self, frame_number):
|
||||||
|
"""Get frame at specific index - always accurate"""
|
||||||
|
# Clamp frame number to valid range
|
||||||
|
frame_number = max(0, min(frame_number, self.total_frames - 1))
|
||||||
|
|
||||||
|
# Check cache first
|
||||||
|
cached_frame = self._get_from_cache(frame_number)
|
||||||
|
if cached_frame is not None:
|
||||||
|
self.current_frame = frame_number
|
||||||
|
return cached_frame
|
||||||
|
|
||||||
|
# Not in cache, seek to frame and read
|
||||||
|
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
|
||||||
|
ret, frame = self.cap.read()
|
||||||
|
|
||||||
|
if ret:
|
||||||
|
self._add_to_cache(frame_number, frame)
|
||||||
|
self.current_frame = frame_number
|
||||||
|
return frame
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Failed to read frame {frame_number}")
|
||||||
|
|
||||||
|
def advance_frame(self, frames=1):
|
||||||
|
"""Advance by specified number of frames"""
|
||||||
|
new_frame = self.current_frame + frames
|
||||||
|
return self.get_frame(new_frame)
|
||||||
|
|
||||||
|
def release(self):
|
||||||
|
"""Release the video capture"""
|
||||||
|
if self.cap:
|
||||||
|
self.cap.release()
|
||||||
|
|
||||||
|
def isOpened(self):
|
||||||
|
"""Check if capture is opened"""
|
||||||
|
return self.cap and self.cap.isOpened()
|
||||||
|
|
||||||
|
|
3240
croppa/editor.py
Normal file
3240
croppa/editor.py
Normal file
File diff suppressed because it is too large
Load Diff
3569
croppa/main.py
3569
croppa/main.py
File diff suppressed because it is too large
Load Diff
110
croppa/project_view.py
Normal file
110
croppa/project_view.py
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
class ProjectView:
|
||||||
|
"""Project view that displays videos in current directory with progress bars"""
|
||||||
|
|
||||||
|
THUMBNAIL_SIZE = (200, 150)
|
||||||
|
THUMBNAIL_MARGIN = 20
|
||||||
|
PROGRESS_BAR_HEIGHT = 8
|
||||||
|
TEXT_HEIGHT = 30
|
||||||
|
|
||||||
|
BG_COLOR = (40, 40, 40)
|
||||||
|
THUMBNAIL_BG_COLOR = (60, 60, 60)
|
||||||
|
PROGRESS_BG_COLOR = (80, 80, 80)
|
||||||
|
PROGRESS_FILL_COLOR = (0, 120, 255)
|
||||||
|
TEXT_COLOR = (255, 255, 255)
|
||||||
|
SELECTED_COLOR = (255, 165, 0)
|
||||||
|
|
||||||
|
def __init__(self, directory: Path, video_editor):
|
||||||
|
self.directory = directory
|
||||||
|
self.video_editor = video_editor
|
||||||
|
self.video_files = []
|
||||||
|
self.thumbnails: Dict[Path, np.ndarray] = {}
|
||||||
|
self.progress_data = {}
|
||||||
|
self.selected_index = 0
|
||||||
|
self.scroll_offset = 0
|
||||||
|
self.items_per_row = 2
|
||||||
|
self.window_width = 1200
|
||||||
|
self.window_height = 800
|
||||||
|
|
||||||
|
self._load_video_files()
|
||||||
|
self._load_progress_data()
|
||||||
|
|
||||||
|
def _calculate_thumbnail_size(self, window_width: int) -> tuple:
|
||||||
|
available_width = window_width - self.THUMBNAIL_MARGIN
|
||||||
|
item_width = (available_width - (self.items_per_row - 1) * self.THUMBNAIL_MARGIN) // self.items_per_row
|
||||||
|
thumbnail_width = max(50, item_width)
|
||||||
|
thumbnail_height = int(thumbnail_width * self.THUMBNAIL_SIZE[1] / self.THUMBNAIL_SIZE[0])
|
||||||
|
return (thumbnail_width, thumbnail_height)
|
||||||
|
|
||||||
|
def _load_video_files(self):
|
||||||
|
self.video_files = []
|
||||||
|
for file_path in self.directory.iterdir():
|
||||||
|
if (file_path.is_file() and
|
||||||
|
file_path.suffix.lower() in self.video_editor.VIDEO_EXTENSIONS):
|
||||||
|
self.video_files.append(file_path)
|
||||||
|
self.video_files.sort(key=lambda x: x.name)
|
||||||
|
|
||||||
|
def _load_progress_data(self):
|
||||||
|
self.progress_data = {}
|
||||||
|
for video_path in self.video_files:
|
||||||
|
state_file = video_path.with_suffix('.json')
|
||||||
|
if state_file.exists():
|
||||||
|
try:
|
||||||
|
with open(state_file, 'r') as f:
|
||||||
|
import json
|
||||||
|
state = json.load(f)
|
||||||
|
current_frame = state.get('current_frame', 0)
|
||||||
|
cap = cv2.VideoCapture(str(video_path))
|
||||||
|
if cap.isOpened():
|
||||||
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||||
|
cap.release()
|
||||||
|
if total_frames > 0:
|
||||||
|
progress = current_frame / (total_frames - 1)
|
||||||
|
self.progress_data[video_path] = {
|
||||||
|
'current_frame': current_frame,
|
||||||
|
'total_frames': total_frames,
|
||||||
|
'progress': progress
|
||||||
|
}
|
||||||
|
except Exception as e: # noqa: BLE001 - preserve original behavior
|
||||||
|
print(f"Error loading progress for {video_path.name}: {e}")
|
||||||
|
|
||||||
|
def refresh_progress_data(self):
|
||||||
|
self._load_progress_data()
|
||||||
|
|
||||||
|
def get_progress_for_video(self, video_path: Path) -> float:
|
||||||
|
if video_path in self.progress_data:
|
||||||
|
return self.progress_data[video_path]['progress']
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
def get_thumbnail_for_video(self, video_path: Path, size: tuple = None) -> np.ndarray:
|
||||||
|
if size is None:
|
||||||
|
size = self.THUMBNAIL_SIZE
|
||||||
|
if video_path in self.thumbnails:
|
||||||
|
original_thumbnail = self.thumbnails[video_path]
|
||||||
|
return cv2.resize(original_thumbnail, size)
|
||||||
|
try:
|
||||||
|
cap = cv2.VideoCapture(str(video_path))
|
||||||
|
if cap.isOpened():
|
||||||
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||||
|
if total_frames > 0:
|
||||||
|
middle_frame = total_frames // 2
|
||||||
|
cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame)
|
||||||
|
ret, frame = cap.read()
|
||||||
|
if ret:
|
||||||
|
original_thumbnail = cv2.resize(frame, self.THUMBNAIL_SIZE)
|
||||||
|
self.thumbnails[video_path] = original_thumbnail
|
||||||
|
cap.release()
|
||||||
|
return cv2.resize(original_thumbnail, size)
|
||||||
|
cap.release()
|
||||||
|
except Exception as e: # noqa: BLE001 - preserve original behavior
|
||||||
|
print(f"Error generating thumbnail for {video_path.name}: {e}")
|
||||||
|
placeholder = np.full((size[1], size[0], 3), self.THUMBNAIL_BG_COLOR, dtype=np.uint8)
|
||||||
|
return placeholder
|
||||||
|
|
||||||
|
# draw() and input handling remain in main editor for now to minimize churn
|
@@ -10,4 +10,4 @@ dependencies = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
[project.scripts]
|
[project.scripts]
|
||||||
croppa = "main:main"
|
croppa = "croppa.main:main"
|
||||||
|
149
croppa/tracking.py
Normal file
149
croppa/tracking.py
Normal file
@@ -0,0 +1,149 @@
|
|||||||
|
from typing import List, Dict, Tuple, Optional
|
||||||
|
|
||||||
|
|
||||||
|
class MotionTracker:
|
||||||
|
"""Handles motion tracking for crop and pan operations"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.tracking_points = {} # {frame_number: [(x, y), ...]}
|
||||||
|
self.tracking_enabled = False
|
||||||
|
self.base_crop_rect = None # Original crop rect when tracking started
|
||||||
|
self.base_zoom_center = None # Original zoom center when tracking started
|
||||||
|
|
||||||
|
def add_tracking_point(self, frame_number: int, x: int, y: int):
|
||||||
|
"""Add a tracking point at the specified frame and coordinates"""
|
||||||
|
if frame_number not in self.tracking_points:
|
||||||
|
self.tracking_points[frame_number] = []
|
||||||
|
self.tracking_points[frame_number].append((x, y))
|
||||||
|
|
||||||
|
def remove_tracking_point(self, frame_number: int, point_index: int):
|
||||||
|
"""Remove a tracking point by frame and index"""
|
||||||
|
if frame_number in self.tracking_points and 0 <= point_index < len(self.tracking_points[frame_number]):
|
||||||
|
del self.tracking_points[frame_number][point_index]
|
||||||
|
if not self.tracking_points[frame_number]:
|
||||||
|
del self.tracking_points[frame_number]
|
||||||
|
|
||||||
|
def clear_tracking_points(self):
|
||||||
|
"""Clear all tracking points"""
|
||||||
|
self.tracking_points.clear()
|
||||||
|
|
||||||
|
def get_tracking_points_for_frame(self, frame_number: int) -> List[Tuple[int, int]]:
|
||||||
|
"""Get all tracking points for a specific frame"""
|
||||||
|
return self.tracking_points.get(frame_number, [])
|
||||||
|
|
||||||
|
def has_tracking_points(self) -> bool:
|
||||||
|
"""Check if any tracking points exist"""
|
||||||
|
return bool(self.tracking_points)
|
||||||
|
|
||||||
|
def get_interpolated_position(self, frame_number: int) -> Optional[Tuple[float, float]]:
|
||||||
|
"""Get interpolated position for a frame based on tracking points"""
|
||||||
|
if not self.tracking_points:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Get all frames with tracking points
|
||||||
|
frames = sorted(self.tracking_points.keys())
|
||||||
|
|
||||||
|
if not frames:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# If we have a point at this exact frame, return it
|
||||||
|
if frame_number in self.tracking_points:
|
||||||
|
points = self.tracking_points[frame_number]
|
||||||
|
if points:
|
||||||
|
# Return average of all points at this frame
|
||||||
|
avg_x = sum(p[0] for p in points) / len(points)
|
||||||
|
avg_y = sum(p[1] for p in points) / len(points)
|
||||||
|
return (avg_x, avg_y)
|
||||||
|
|
||||||
|
# If frame is before first tracking point
|
||||||
|
if frame_number < frames[0]:
|
||||||
|
points = self.tracking_points[frames[0]]
|
||||||
|
if points:
|
||||||
|
avg_x = sum(p[0] for p in points) / len(points)
|
||||||
|
avg_y = sum(p[1] for p in points) / len(points)
|
||||||
|
return (avg_x, avg_y)
|
||||||
|
|
||||||
|
# If frame is after last tracking point
|
||||||
|
if frame_number > frames[-1]:
|
||||||
|
points = self.tracking_points[frames[-1]]
|
||||||
|
if points:
|
||||||
|
avg_x = sum(p[0] for p in points) / len(points)
|
||||||
|
avg_y = sum(p[1] for p in points) / len(points)
|
||||||
|
return (avg_x, avg_y)
|
||||||
|
|
||||||
|
# Find the two frames to interpolate between
|
||||||
|
for i in range(len(frames) - 1):
|
||||||
|
if frames[i] <= frame_number <= frames[i + 1]:
|
||||||
|
frame1, frame2 = frames[i], frames[i + 1]
|
||||||
|
points1 = self.tracking_points[frame1]
|
||||||
|
points2 = self.tracking_points[frame2]
|
||||||
|
|
||||||
|
if not points1 or not points2:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Get average positions for each frame
|
||||||
|
avg_x1 = sum(p[0] for p in points1) / len(points1)
|
||||||
|
avg_y1 = sum(p[1] for p in points1) / len(points1)
|
||||||
|
avg_x2 = sum(p[0] for p in points2) / len(points2)
|
||||||
|
avg_y2 = sum(p[1] for p in points2) / len(points2)
|
||||||
|
|
||||||
|
# Linear interpolation
|
||||||
|
t = (frame_number - frame1) / (frame2 - frame1)
|
||||||
|
interp_x = avg_x1 + t * (avg_x2 - avg_x1)
|
||||||
|
interp_y = avg_y1 + t * (avg_y2 - avg_y1)
|
||||||
|
|
||||||
|
return (interp_x, interp_y)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_tracking_offset(self, frame_number: int) -> Tuple[float, float]:
|
||||||
|
"""Get the offset to center the crop on the tracked point"""
|
||||||
|
if not self.tracking_enabled or not self.base_zoom_center:
|
||||||
|
return (0.0, 0.0)
|
||||||
|
|
||||||
|
current_pos = self.get_interpolated_position(frame_number)
|
||||||
|
if not current_pos:
|
||||||
|
return (0.0, 0.0)
|
||||||
|
|
||||||
|
# Calculate offset to center the crop on the tracked point
|
||||||
|
# The offset should move the display so the tracked point stays centered
|
||||||
|
offset_x = current_pos[0] - self.base_zoom_center[0]
|
||||||
|
offset_y = current_pos[1] - self.base_zoom_center[1]
|
||||||
|
|
||||||
|
return (offset_x, offset_y)
|
||||||
|
|
||||||
|
def start_tracking(self, base_crop_rect: Tuple[int, int, int, int], base_zoom_center: Tuple[int, int]):
|
||||||
|
"""Start motion tracking with base positions"""
|
||||||
|
self.tracking_enabled = True
|
||||||
|
self.base_crop_rect = base_crop_rect
|
||||||
|
self.base_zoom_center = base_zoom_center
|
||||||
|
|
||||||
|
def stop_tracking(self):
|
||||||
|
"""Stop motion tracking"""
|
||||||
|
self.tracking_enabled = False
|
||||||
|
self.base_crop_rect = None
|
||||||
|
self.base_zoom_center = None
|
||||||
|
|
||||||
|
def to_dict(self) -> Dict:
|
||||||
|
"""Convert to dictionary for serialization"""
|
||||||
|
return {
|
||||||
|
'tracking_points': self.tracking_points,
|
||||||
|
'tracking_enabled': self.tracking_enabled,
|
||||||
|
'base_crop_rect': self.base_crop_rect,
|
||||||
|
'base_zoom_center': self.base_zoom_center
|
||||||
|
}
|
||||||
|
|
||||||
|
def from_dict(self, data: Dict):
|
||||||
|
"""Load from dictionary for deserialization"""
|
||||||
|
# Convert string keys back to integers for tracking_points
|
||||||
|
tracking_points_data = data.get('tracking_points', {})
|
||||||
|
self.tracking_points = {}
|
||||||
|
for frame_str, points in tracking_points_data.items():
|
||||||
|
frame_num = int(frame_str) # Convert string key to integer
|
||||||
|
self.tracking_points[frame_num] = points
|
||||||
|
|
||||||
|
self.tracking_enabled = data.get('tracking_enabled', False)
|
||||||
|
self.base_crop_rect = data.get('base_crop_rect', None)
|
||||||
|
self.base_zoom_center = data.get('base_zoom_center', None)
|
||||||
|
|
||||||
|
|
13
croppa/utils.py
Normal file
13
croppa/utils.py
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
import ctypes
|
||||||
|
|
||||||
|
|
||||||
|
def get_active_window_title():
|
||||||
|
"""Get the title of the currently active window"""
|
||||||
|
try:
|
||||||
|
hwnd = ctypes.windll.user32.GetForegroundWindow()
|
||||||
|
length = ctypes.windll.user32.GetWindowTextLengthW(hwnd)
|
||||||
|
buffer = ctypes.create_unicode_buffer(length + 1)
|
||||||
|
ctypes.windll.user32.GetWindowTextW(hwnd, buffer, length + 1)
|
||||||
|
return buffer.value
|
||||||
|
except: # noqa: E722 - preserve original broad exception style
|
||||||
|
return ""
|
Reference in New Issue
Block a user