Compare commits
16 Commits
b90b5e5725
...
e97ce026da
Author | SHA1 | Date | |
---|---|---|---|
e97ce026da | |||
cacaa5f2ac | |||
099d551e1d | |||
762bc2e5e0 | |||
cb4fd02a42 | |||
cf44597268 | |||
fd35c6ac13 | |||
d181644b50 | |||
c1b6567e42 | |||
79aa51a21c | |||
5637a9a3e0 | |||
81f17953f7 | |||
04d914834e | |||
4960812cba | |||
0b007b572e | |||
f111571601 |
87
croppa/capture.py
Normal file
87
croppa/capture.py
Normal file
@@ -0,0 +1,87 @@
|
||||
import cv2
|
||||
|
||||
|
||||
class Cv2BufferedCap:
|
||||
"""Buffered wrapper around cv2.VideoCapture that handles frame loading, seeking, and caching correctly"""
|
||||
|
||||
def __init__(self, video_path, backend=None):
|
||||
self.video_path = video_path
|
||||
self.cap = cv2.VideoCapture(str(video_path), backend)
|
||||
if not self.cap.isOpened():
|
||||
raise ValueError(f"Could not open video: {video_path}")
|
||||
|
||||
# Video properties
|
||||
self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
|
||||
self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
|
||||
# Frame cache
|
||||
self.frame_cache = {}
|
||||
self.cache_access_order = []
|
||||
self.MAX_CACHE_FRAMES = 3000
|
||||
|
||||
# Current position tracking
|
||||
self.current_frame = 0
|
||||
|
||||
def _manage_cache(self):
|
||||
"""Manage cache size using LRU eviction"""
|
||||
while len(self.frame_cache) > self.MAX_CACHE_FRAMES:
|
||||
oldest_frame = self.cache_access_order.pop(0)
|
||||
if oldest_frame in self.frame_cache:
|
||||
del self.frame_cache[oldest_frame]
|
||||
|
||||
def _add_to_cache(self, frame_number, frame):
|
||||
"""Add frame to cache"""
|
||||
self.frame_cache[frame_number] = frame.copy()
|
||||
if frame_number in self.cache_access_order:
|
||||
self.cache_access_order.remove(frame_number)
|
||||
self.cache_access_order.append(frame_number)
|
||||
self._manage_cache()
|
||||
|
||||
def _get_from_cache(self, frame_number):
|
||||
"""Get frame from cache and update LRU"""
|
||||
if frame_number in self.frame_cache:
|
||||
if frame_number in self.cache_access_order:
|
||||
self.cache_access_order.remove(frame_number)
|
||||
self.cache_access_order.append(frame_number)
|
||||
return self.frame_cache[frame_number].copy()
|
||||
return None
|
||||
|
||||
def get_frame(self, frame_number):
|
||||
"""Get frame at specific index - always accurate"""
|
||||
# Clamp frame number to valid range
|
||||
frame_number = max(0, min(frame_number, self.total_frames - 1))
|
||||
|
||||
# Check cache first
|
||||
cached_frame = self._get_from_cache(frame_number)
|
||||
if cached_frame is not None:
|
||||
self.current_frame = frame_number
|
||||
return cached_frame
|
||||
|
||||
# Not in cache, seek to frame and read
|
||||
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
|
||||
ret, frame = self.cap.read()
|
||||
|
||||
if ret:
|
||||
self._add_to_cache(frame_number, frame)
|
||||
self.current_frame = frame_number
|
||||
return frame
|
||||
else:
|
||||
raise ValueError(f"Failed to read frame {frame_number}")
|
||||
|
||||
def advance_frame(self, frames=1):
|
||||
"""Advance by specified number of frames"""
|
||||
new_frame = self.current_frame + frames
|
||||
return self.get_frame(new_frame)
|
||||
|
||||
def release(self):
|
||||
"""Release the video capture"""
|
||||
if self.cap:
|
||||
self.cap.release()
|
||||
|
||||
def isOpened(self):
|
||||
"""Check if capture is opened"""
|
||||
return self.cap and self.cap.isOpened()
|
||||
|
||||
|
3042
croppa/editor.py
Normal file
3042
croppa/editor.py
Normal file
File diff suppressed because it is too large
Load Diff
2824
croppa/main.py
2824
croppa/main.py
File diff suppressed because it is too large
Load Diff
110
croppa/project_view.py
Normal file
110
croppa/project_view.py
Normal file
@@ -0,0 +1,110 @@
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
|
||||
class ProjectView:
|
||||
"""Project view that displays videos in current directory with progress bars"""
|
||||
|
||||
THUMBNAIL_SIZE = (200, 150)
|
||||
THUMBNAIL_MARGIN = 20
|
||||
PROGRESS_BAR_HEIGHT = 8
|
||||
TEXT_HEIGHT = 30
|
||||
|
||||
BG_COLOR = (40, 40, 40)
|
||||
THUMBNAIL_BG_COLOR = (60, 60, 60)
|
||||
PROGRESS_BG_COLOR = (80, 80, 80)
|
||||
PROGRESS_FILL_COLOR = (0, 120, 255)
|
||||
TEXT_COLOR = (255, 255, 255)
|
||||
SELECTED_COLOR = (255, 165, 0)
|
||||
|
||||
def __init__(self, directory: Path, video_editor):
|
||||
self.directory = directory
|
||||
self.video_editor = video_editor
|
||||
self.video_files = []
|
||||
self.thumbnails: Dict[Path, np.ndarray] = {}
|
||||
self.progress_data = {}
|
||||
self.selected_index = 0
|
||||
self.scroll_offset = 0
|
||||
self.items_per_row = 2
|
||||
self.window_width = 1200
|
||||
self.window_height = 800
|
||||
|
||||
self._load_video_files()
|
||||
self._load_progress_data()
|
||||
|
||||
def _calculate_thumbnail_size(self, window_width: int) -> tuple:
|
||||
available_width = window_width - self.THUMBNAIL_MARGIN
|
||||
item_width = (available_width - (self.items_per_row - 1) * self.THUMBNAIL_MARGIN) // self.items_per_row
|
||||
thumbnail_width = max(50, item_width)
|
||||
thumbnail_height = int(thumbnail_width * self.THUMBNAIL_SIZE[1] / self.THUMBNAIL_SIZE[0])
|
||||
return (thumbnail_width, thumbnail_height)
|
||||
|
||||
def _load_video_files(self):
|
||||
self.video_files = []
|
||||
for file_path in self.directory.iterdir():
|
||||
if (file_path.is_file() and
|
||||
file_path.suffix.lower() in self.video_editor.VIDEO_EXTENSIONS):
|
||||
self.video_files.append(file_path)
|
||||
self.video_files.sort(key=lambda x: x.name)
|
||||
|
||||
def _load_progress_data(self):
|
||||
self.progress_data = {}
|
||||
for video_path in self.video_files:
|
||||
state_file = video_path.with_suffix('.json')
|
||||
if state_file.exists():
|
||||
try:
|
||||
with open(state_file, 'r') as f:
|
||||
import json
|
||||
state = json.load(f)
|
||||
current_frame = state.get('current_frame', 0)
|
||||
cap = cv2.VideoCapture(str(video_path))
|
||||
if cap.isOpened():
|
||||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
cap.release()
|
||||
if total_frames > 0:
|
||||
progress = current_frame / (total_frames - 1)
|
||||
self.progress_data[video_path] = {
|
||||
'current_frame': current_frame,
|
||||
'total_frames': total_frames,
|
||||
'progress': progress
|
||||
}
|
||||
except Exception as e: # noqa: BLE001 - preserve original behavior
|
||||
print(f"Error loading progress for {video_path.name}: {e}")
|
||||
|
||||
def refresh_progress_data(self):
|
||||
self._load_progress_data()
|
||||
|
||||
def get_progress_for_video(self, video_path: Path) -> float:
|
||||
if video_path in self.progress_data:
|
||||
return self.progress_data[video_path]['progress']
|
||||
return 0.0
|
||||
|
||||
def get_thumbnail_for_video(self, video_path: Path, size: tuple = None) -> np.ndarray:
|
||||
if size is None:
|
||||
size = self.THUMBNAIL_SIZE
|
||||
if video_path in self.thumbnails:
|
||||
original_thumbnail = self.thumbnails[video_path]
|
||||
return cv2.resize(original_thumbnail, size)
|
||||
try:
|
||||
cap = cv2.VideoCapture(str(video_path))
|
||||
if cap.isOpened():
|
||||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
if total_frames > 0:
|
||||
middle_frame = total_frames // 2
|
||||
cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame)
|
||||
ret, frame = cap.read()
|
||||
if ret:
|
||||
original_thumbnail = cv2.resize(frame, self.THUMBNAIL_SIZE)
|
||||
self.thumbnails[video_path] = original_thumbnail
|
||||
cap.release()
|
||||
return cv2.resize(original_thumbnail, size)
|
||||
cap.release()
|
||||
except Exception as e: # noqa: BLE001 - preserve original behavior
|
||||
print(f"Error generating thumbnail for {video_path.name}: {e}")
|
||||
placeholder = np.full((size[1], size[0], 3), self.THUMBNAIL_BG_COLOR, dtype=np.uint8)
|
||||
return placeholder
|
||||
|
||||
# draw() and input handling remain in main editor for now to minimize churn
|
@@ -9,5 +9,16 @@ dependencies = [
|
||||
"numpy>=1.24.0"
|
||||
]
|
||||
|
||||
[tool.setuptools]
|
||||
py-modules = [
|
||||
"main",
|
||||
"editor",
|
||||
"capture",
|
||||
"tracking",
|
||||
"utils",
|
||||
"project_view",
|
||||
"rendering"
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
croppa = "main:main"
|
||||
|
303
croppa/rendering.py
Normal file
303
croppa/rendering.py
Normal file
@@ -0,0 +1,303 @@
|
||||
import os
|
||||
import time
|
||||
import subprocess
|
||||
import threading
|
||||
import queue
|
||||
import tempfile
|
||||
from typing import Optional
|
||||
|
||||
import cv2
|
||||
|
||||
|
||||
def start_render_thread(editor, output_path: str) -> bool:
|
||||
if editor.render_thread and editor.render_thread.is_alive():
|
||||
print("Render already in progress! Use 'X' to cancel first.")
|
||||
return False
|
||||
editor.render_cancelled = False
|
||||
editor.render_thread = threading.Thread(target=_render_worker, args=(editor, output_path), daemon=True)
|
||||
editor.render_thread.start()
|
||||
print(f"Started rendering to {output_path} in background thread...")
|
||||
print("You can continue editing while rendering. Press 'X' to cancel.")
|
||||
return True
|
||||
|
||||
|
||||
def _render_worker(editor, output_path: str) -> bool:
|
||||
try:
|
||||
if not output_path.endswith(".mp4"):
|
||||
output_path += ".mp4"
|
||||
|
||||
start_frame = editor.cut_start_frame if editor.cut_start_frame is not None else 0
|
||||
end_frame = editor.cut_end_frame if editor.cut_end_frame is not None else editor.total_frames - 1
|
||||
if start_frame >= end_frame:
|
||||
editor.render_progress_queue.put(("error", "Invalid cut range!", 1.0, 0.0))
|
||||
return False
|
||||
|
||||
editor.render_progress_queue.put(("progress", "Calculating output dimensions...", 0.05, 0.0))
|
||||
|
||||
if editor.crop_rect:
|
||||
crop_width = int(editor.crop_rect[2])
|
||||
crop_height = int(editor.crop_rect[3])
|
||||
else:
|
||||
crop_width = editor.frame_width
|
||||
crop_height = editor.frame_height
|
||||
|
||||
if editor.rotation_angle in (90, 270):
|
||||
output_width = int(crop_height * editor.zoom_factor)
|
||||
output_height = int(crop_width * editor.zoom_factor)
|
||||
else:
|
||||
output_width = int(crop_width * editor.zoom_factor)
|
||||
output_height = int(crop_height * editor.zoom_factor)
|
||||
|
||||
output_width -= output_width % 2
|
||||
output_height -= output_height % 2
|
||||
|
||||
editor.render_progress_queue.put(("progress", "Setting up FFmpeg encoder...", 0.1, 0.0))
|
||||
print(f"Output dimensions: {output_width}x{output_height}")
|
||||
print(f"Zoom factor: {editor.zoom_factor}")
|
||||
print(f"Crop dimensions: {crop_width}x{crop_height}")
|
||||
print("Using FFmpeg for encoding with OpenCV transformations...")
|
||||
|
||||
return _render_with_ffmpeg_pipe(editor, output_path, start_frame, end_frame, output_width, output_height)
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
if "async_lock" in error_msg or "pthread_frame" in error_msg:
|
||||
error_msg = "FFmpeg threading error - try restarting the application"
|
||||
elif "Assertion" in error_msg:
|
||||
error_msg = "Video codec error - the video file may be corrupted or incompatible"
|
||||
editor.render_progress_queue.put(("error", f"Render error: {error_msg}", 1.0, 0.0))
|
||||
print(f"Render error: {error_msg}")
|
||||
return False
|
||||
|
||||
|
||||
def pump_progress(editor):
|
||||
try:
|
||||
while True:
|
||||
update_type, text, progress, fps = editor.render_progress_queue.get_nowait()
|
||||
if update_type == "init":
|
||||
editor.show_progress_bar(text)
|
||||
elif update_type == "progress":
|
||||
editor.update_progress_bar(progress, text, fps)
|
||||
elif update_type == "complete":
|
||||
editor.update_progress_bar(progress, text, fps)
|
||||
if hasattr(editor, "overwrite_temp_path") and editor.overwrite_temp_path:
|
||||
_handle_overwrite_completion(editor)
|
||||
elif update_type == "error":
|
||||
editor.update_progress_bar(progress, text, fps)
|
||||
editor.show_feedback_message(f"ERROR: {text}")
|
||||
elif update_type == "cancelled":
|
||||
editor.hide_progress_bar()
|
||||
editor.show_feedback_message("Render cancelled")
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
|
||||
def _handle_overwrite_completion(editor):
|
||||
try:
|
||||
print("Replacing original file...")
|
||||
if hasattr(editor, "cap") and editor.cap:
|
||||
editor.cap.release()
|
||||
import shutil
|
||||
print(f"DEBUG: Moving {editor.overwrite_temp_path} to {editor.overwrite_target_path}")
|
||||
try:
|
||||
shutil.move(editor.overwrite_temp_path, editor.overwrite_target_path)
|
||||
print("DEBUG: File move successful")
|
||||
except Exception as e:
|
||||
print(f"DEBUG: File move failed: {e}")
|
||||
if os.path.exists(editor.overwrite_temp_path):
|
||||
os.remove(editor.overwrite_temp_path)
|
||||
raise
|
||||
time.sleep(0.1)
|
||||
try:
|
||||
editor._load_video(editor.video_path)
|
||||
editor.load_current_frame()
|
||||
print("File reloaded successfully")
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not reload file after overwrite: {e}")
|
||||
print("The file was saved successfully, but you may need to restart the editor to continue editing it.")
|
||||
except Exception as e:
|
||||
print(f"Error during file overwrite: {e}")
|
||||
finally:
|
||||
editor.overwrite_temp_path = None
|
||||
editor.overwrite_target_path = None
|
||||
|
||||
|
||||
def request_cancel(editor) -> bool:
|
||||
if editor.render_thread and editor.render_thread.is_alive():
|
||||
editor.render_cancelled = True
|
||||
print("Render cancellation requested...")
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def is_rendering(editor) -> bool:
|
||||
return editor.render_thread and editor.render_thread.is_alive()
|
||||
|
||||
|
||||
def cleanup_thread(editor):
|
||||
if editor.render_thread and editor.render_thread.is_alive():
|
||||
editor.render_cancelled = True
|
||||
if editor.ffmpeg_process:
|
||||
try:
|
||||
editor.ffmpeg_process.terminate()
|
||||
editor.ffmpeg_process.wait(timeout=1.0)
|
||||
except Exception:
|
||||
try:
|
||||
editor.ffmpeg_process.kill()
|
||||
except Exception:
|
||||
pass
|
||||
editor.ffmpeg_process = None
|
||||
editor.render_thread.join(timeout=2.0)
|
||||
if editor.render_thread.is_alive():
|
||||
print("Warning: Render thread did not finish gracefully")
|
||||
editor.render_thread = None
|
||||
editor.render_cancelled = False
|
||||
|
||||
|
||||
def _process_frame_for_render(editor, frame, output_width: int, output_height: int, frame_number: Optional[int] = None):
|
||||
try:
|
||||
if editor.crop_rect:
|
||||
x, y, w, h = map(int, editor.crop_rect)
|
||||
if editor.motion_tracker.tracking_enabled and frame_number is not None:
|
||||
current_pos = editor.motion_tracker.get_interpolated_position(frame_number)
|
||||
if current_pos:
|
||||
tracked_x, tracked_y = current_pos
|
||||
new_x = int(tracked_x - w // 2)
|
||||
new_y = int(tracked_y - h // 2)
|
||||
x, y = new_x, new_y
|
||||
h_frame, w_frame = frame.shape[:2]
|
||||
x = max(0, min(x, w_frame - 1))
|
||||
y = max(0, min(y, h_frame - 1))
|
||||
w = min(w, w_frame - x)
|
||||
h = min(h, h_frame - y)
|
||||
if w > 0 and h > 0:
|
||||
frame = frame[y:y + h, x:x + w]
|
||||
else:
|
||||
return None
|
||||
frame = editor.apply_brightness_contrast(frame)
|
||||
if editor.rotation_angle != 0:
|
||||
frame = editor.apply_rotation(frame)
|
||||
if editor.zoom_factor != 1.0:
|
||||
height, width = frame.shape[:2]
|
||||
zoomed_width = int(width * editor.zoom_factor)
|
||||
zoomed_height = int(height * editor.zoom_factor)
|
||||
if zoomed_width == output_width and zoomed_height == output_height:
|
||||
frame = cv2.resize(frame, (zoomed_width, zoomed_height), interpolation=cv2.INTER_LINEAR)
|
||||
else:
|
||||
frame = cv2.resize(frame, (output_width, output_height), interpolation=cv2.INTER_LINEAR)
|
||||
else:
|
||||
if frame.shape[1] != output_width or frame.shape[0] != output_height:
|
||||
frame = cv2.resize(frame, (output_width, output_height), interpolation=cv2.INTER_LINEAR)
|
||||
return frame
|
||||
except Exception as e:
|
||||
print(f"Error processing frame: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def _render_with_ffmpeg_pipe(editor, output_path: str, start_frame: int, end_frame: int, output_width: int, output_height: int):
|
||||
try:
|
||||
try:
|
||||
test_result = subprocess.run(["ffmpeg", "-version"], capture_output=True, text=True, timeout=10)
|
||||
if test_result.returncode != 0:
|
||||
print(f"FFmpeg test failed with return code {test_result.returncode}")
|
||||
print(f"FFmpeg stderr: {test_result.stderr}")
|
||||
editor.render_progress_queue.put(("error", "FFmpeg is not working properly", 1.0, 0.0))
|
||||
return False
|
||||
except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired) as e:
|
||||
error_msg = f"FFmpeg not found or not working: {e}"
|
||||
print(error_msg)
|
||||
editor.render_progress_queue.put(("error", error_msg, 1.0, 0.0))
|
||||
return False
|
||||
|
||||
editor.render_progress_queue.put(("progress", "Starting encoder...", 0.0, 0.0))
|
||||
|
||||
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.raw')
|
||||
temp_file.close()
|
||||
|
||||
ffmpeg_cmd = [
|
||||
'ffmpeg', '-y',
|
||||
'-f', 'rawvideo',
|
||||
'-s', f'{output_width}x{output_height}',
|
||||
'-pix_fmt', 'bgr24',
|
||||
'-r', str(editor.fps),
|
||||
'-i', temp_file.name,
|
||||
'-c:v', 'libx264',
|
||||
'-preset', 'fast',
|
||||
'-crf', '18',
|
||||
'-pix_fmt', 'yuv420p',
|
||||
output_path
|
||||
]
|
||||
editor.temp_file_name = temp_file.name
|
||||
|
||||
render_cap = cv2.VideoCapture(str(editor.video_path))
|
||||
render_cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
|
||||
|
||||
total_frames = end_frame - start_frame + 1
|
||||
frames_written = 0
|
||||
start_time = time.time()
|
||||
last_progress_update = 0
|
||||
editor.render_progress_queue.put(("progress", f"Processing {total_frames} frames...", 0.1, 0.0))
|
||||
|
||||
with open(editor.temp_file_name, 'wb') as tf:
|
||||
for i in range(total_frames):
|
||||
if editor.render_cancelled:
|
||||
render_cap.release()
|
||||
editor.render_progress_queue.put(("cancelled", "Render cancelled", 0.0, 0.0))
|
||||
return False
|
||||
ret, frame = render_cap.read()
|
||||
if not ret:
|
||||
break
|
||||
processed = _process_frame_for_render(editor, frame, output_width, output_height, start_frame + i)
|
||||
if processed is not None:
|
||||
if i == 0:
|
||||
print(f"Processed frame dimensions: {processed.shape[1]}x{processed.shape[0]}")
|
||||
print(f"Expected dimensions: {output_width}x{output_height}")
|
||||
tf.write(processed.tobytes())
|
||||
frames_written += 1
|
||||
current_time = time.time()
|
||||
progress = 0.1 + (0.8 * (i + 1) / total_frames)
|
||||
if current_time - last_progress_update > 0.5:
|
||||
elapsed = current_time - start_time
|
||||
fps_rate = frames_written / elapsed if elapsed > 0 else 0
|
||||
editor.render_progress_queue.put(("progress", f"Processed {i+1}/{total_frames} frames", progress, fps_rate))
|
||||
last_progress_update = current_time
|
||||
|
||||
render_cap.release()
|
||||
|
||||
editor.render_progress_queue.put(("progress", "Encoding...", 0.9, 0.0))
|
||||
result = subprocess.run(
|
||||
ffmpeg_cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=300,
|
||||
creationflags=(subprocess.CREATE_NO_WINDOW if hasattr(subprocess, 'CREATE_NO_WINDOW') else 0)
|
||||
)
|
||||
if os.path.exists(editor.temp_file_name):
|
||||
try:
|
||||
os.unlink(editor.temp_file_name)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
if result.returncode == 0:
|
||||
total_time = time.time() - start_time
|
||||
avg_fps = frames_written / total_time if total_time > 0 else 0
|
||||
editor.render_progress_queue.put(("complete", f"Rendered {frames_written} frames", 1.0, avg_fps))
|
||||
print(f"Successfully rendered {frames_written} frames (avg {avg_fps:.1f} FPS)")
|
||||
return True
|
||||
else:
|
||||
error_details = result.stderr if result.stderr else "No error details available"
|
||||
print(f"Encoding failed with return code {result.returncode}")
|
||||
print(f"Error: {error_details}")
|
||||
editor.render_progress_queue.put(("error", f"Encoding failed: {error_details}", 1.0, 0.0))
|
||||
return False
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
print(f"Rendering exception: {error_msg}")
|
||||
print(f"Exception type: {type(e).__name__}")
|
||||
if "Errno 22" in error_msg or "invalid argument" in error_msg.lower():
|
||||
error_msg = "File system error - try using a different output path"
|
||||
elif "BrokenPipeError" in error_msg:
|
||||
error_msg = "Process terminated unexpectedly"
|
||||
elif "FileNotFoundError" in error_msg or "ffmpeg" in error_msg.lower():
|
||||
error_msg = "FFmpeg not found - please install FFmpeg and ensure it's in your PATH"
|
||||
editor.render_progress_queue.put(("error", f"Rendering failed: {error_msg}", 1.0, 0.0))
|
||||
return False
|
149
croppa/tracking.py
Normal file
149
croppa/tracking.py
Normal file
@@ -0,0 +1,149 @@
|
||||
from typing import List, Dict, Tuple, Optional
|
||||
|
||||
|
||||
class MotionTracker:
|
||||
"""Handles motion tracking for crop and pan operations"""
|
||||
|
||||
def __init__(self):
|
||||
self.tracking_points = {} # {frame_number: [(x, y), ...]}
|
||||
self.tracking_enabled = False
|
||||
self.base_crop_rect = None # Original crop rect when tracking started
|
||||
self.base_zoom_center = None # Original zoom center when tracking started
|
||||
|
||||
def add_tracking_point(self, frame_number: int, x: int, y: int):
|
||||
"""Add a tracking point at the specified frame and coordinates"""
|
||||
if frame_number not in self.tracking_points:
|
||||
self.tracking_points[frame_number] = []
|
||||
self.tracking_points[frame_number].append((x, y))
|
||||
|
||||
def remove_tracking_point(self, frame_number: int, point_index: int):
|
||||
"""Remove a tracking point by frame and index"""
|
||||
if frame_number in self.tracking_points and 0 <= point_index < len(self.tracking_points[frame_number]):
|
||||
del self.tracking_points[frame_number][point_index]
|
||||
if not self.tracking_points[frame_number]:
|
||||
del self.tracking_points[frame_number]
|
||||
|
||||
def clear_tracking_points(self):
|
||||
"""Clear all tracking points"""
|
||||
self.tracking_points.clear()
|
||||
|
||||
def get_tracking_points_for_frame(self, frame_number: int) -> List[Tuple[int, int]]:
|
||||
"""Get all tracking points for a specific frame"""
|
||||
return self.tracking_points.get(frame_number, [])
|
||||
|
||||
def has_tracking_points(self) -> bool:
|
||||
"""Check if any tracking points exist"""
|
||||
return bool(self.tracking_points)
|
||||
|
||||
def get_interpolated_position(self, frame_number: int) -> Optional[Tuple[float, float]]:
|
||||
"""Get interpolated position for a frame based on tracking points"""
|
||||
if not self.tracking_points:
|
||||
return None
|
||||
|
||||
# Get all frames with tracking points
|
||||
frames = sorted(self.tracking_points.keys())
|
||||
|
||||
if not frames:
|
||||
return None
|
||||
|
||||
# If we have a point at this exact frame, return it
|
||||
if frame_number in self.tracking_points:
|
||||
points = self.tracking_points[frame_number]
|
||||
if points:
|
||||
# Return average of all points at this frame
|
||||
avg_x = sum(p[0] for p in points) / len(points)
|
||||
avg_y = sum(p[1] for p in points) / len(points)
|
||||
return (avg_x, avg_y)
|
||||
|
||||
# If frame is before first tracking point
|
||||
if frame_number < frames[0]:
|
||||
points = self.tracking_points[frames[0]]
|
||||
if points:
|
||||
avg_x = sum(p[0] for p in points) / len(points)
|
||||
avg_y = sum(p[1] for p in points) / len(points)
|
||||
return (avg_x, avg_y)
|
||||
|
||||
# If frame is after last tracking point
|
||||
if frame_number > frames[-1]:
|
||||
points = self.tracking_points[frames[-1]]
|
||||
if points:
|
||||
avg_x = sum(p[0] for p in points) / len(points)
|
||||
avg_y = sum(p[1] for p in points) / len(points)
|
||||
return (avg_x, avg_y)
|
||||
|
||||
# Find the two frames to interpolate between
|
||||
for i in range(len(frames) - 1):
|
||||
if frames[i] <= frame_number <= frames[i + 1]:
|
||||
frame1, frame2 = frames[i], frames[i + 1]
|
||||
points1 = self.tracking_points[frame1]
|
||||
points2 = self.tracking_points[frame2]
|
||||
|
||||
if not points1 or not points2:
|
||||
continue
|
||||
|
||||
# Get average positions for each frame
|
||||
avg_x1 = sum(p[0] for p in points1) / len(points1)
|
||||
avg_y1 = sum(p[1] for p in points1) / len(points1)
|
||||
avg_x2 = sum(p[0] for p in points2) / len(points2)
|
||||
avg_y2 = sum(p[1] for p in points2) / len(points2)
|
||||
|
||||
# Linear interpolation
|
||||
t = (frame_number - frame1) / (frame2 - frame1)
|
||||
interp_x = avg_x1 + t * (avg_x2 - avg_x1)
|
||||
interp_y = avg_y1 + t * (avg_y2 - avg_y1)
|
||||
|
||||
return (interp_x, interp_y)
|
||||
|
||||
return None
|
||||
|
||||
def get_tracking_offset(self, frame_number: int) -> Tuple[float, float]:
|
||||
"""Get the offset to center the crop on the tracked point"""
|
||||
if not self.tracking_enabled or not self.base_zoom_center:
|
||||
return (0.0, 0.0)
|
||||
|
||||
current_pos = self.get_interpolated_position(frame_number)
|
||||
if not current_pos:
|
||||
return (0.0, 0.0)
|
||||
|
||||
# Calculate offset to center the crop on the tracked point
|
||||
# The offset should move the display so the tracked point stays centered
|
||||
offset_x = current_pos[0] - self.base_zoom_center[0]
|
||||
offset_y = current_pos[1] - self.base_zoom_center[1]
|
||||
|
||||
return (offset_x, offset_y)
|
||||
|
||||
def start_tracking(self, base_crop_rect: Tuple[int, int, int, int], base_zoom_center: Tuple[int, int]):
|
||||
"""Start motion tracking with base positions"""
|
||||
self.tracking_enabled = True
|
||||
self.base_crop_rect = base_crop_rect
|
||||
self.base_zoom_center = base_zoom_center
|
||||
|
||||
def stop_tracking(self):
|
||||
"""Stop motion tracking"""
|
||||
self.tracking_enabled = False
|
||||
self.base_crop_rect = None
|
||||
self.base_zoom_center = None
|
||||
|
||||
def to_dict(self) -> Dict:
|
||||
"""Convert to dictionary for serialization"""
|
||||
return {
|
||||
'tracking_points': self.tracking_points,
|
||||
'tracking_enabled': self.tracking_enabled,
|
||||
'base_crop_rect': self.base_crop_rect,
|
||||
'base_zoom_center': self.base_zoom_center
|
||||
}
|
||||
|
||||
def from_dict(self, data: Dict):
|
||||
"""Load from dictionary for deserialization"""
|
||||
# Convert string keys back to integers for tracking_points
|
||||
tracking_points_data = data.get('tracking_points', {})
|
||||
self.tracking_points = {}
|
||||
for frame_str, points in tracking_points_data.items():
|
||||
frame_num = int(frame_str) # Convert string key to integer
|
||||
self.tracking_points[frame_num] = points
|
||||
|
||||
self.tracking_enabled = data.get('tracking_enabled', False)
|
||||
self.base_crop_rect = data.get('base_crop_rect', None)
|
||||
self.base_zoom_center = data.get('base_zoom_center', None)
|
||||
|
||||
|
13
croppa/utils.py
Normal file
13
croppa/utils.py
Normal file
@@ -0,0 +1,13 @@
|
||||
import ctypes
|
||||
|
||||
|
||||
def get_active_window_title():
|
||||
"""Get the title of the currently active window"""
|
||||
try:
|
||||
hwnd = ctypes.windll.user32.GetForegroundWindow()
|
||||
length = ctypes.windll.user32.GetWindowTextLengthW(hwnd)
|
||||
buffer = ctypes.create_unicode_buffer(length + 1)
|
||||
ctypes.windll.user32.GetWindowTextW(hwnd, buffer, length + 1)
|
||||
return buffer.value
|
||||
except: # noqa: E722 - preserve original broad exception style
|
||||
return ""
|
Reference in New Issue
Block a user