776 lines
33 KiB
Python
776 lines
33 KiB
Python
import os
|
|
import sys
|
|
import cv2
|
|
import argparse
|
|
import numpy as np
|
|
from pathlib import Path
|
|
from typing import Optional, Tuple, List
|
|
import time
|
|
import threading
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from queue import Queue
|
|
|
|
|
|
class VideoEditor:
|
|
# Configuration constants
|
|
BASE_FRAME_DELAY_MS = 16 # ~60 FPS
|
|
KEY_REPEAT_RATE_SEC = 0.3
|
|
FAST_SEEK_ACTIVATION_TIME = 1.5
|
|
SPEED_INCREMENT = 0.2
|
|
MIN_PLAYBACK_SPEED = 0.1
|
|
MAX_PLAYBACK_SPEED = 10.0
|
|
|
|
# Timeline configuration
|
|
TIMELINE_HEIGHT = 60
|
|
TIMELINE_MARGIN = 20
|
|
TIMELINE_BAR_HEIGHT = 12
|
|
TIMELINE_HANDLE_SIZE = 12
|
|
TIMELINE_COLOR_BG = (80, 80, 80)
|
|
TIMELINE_COLOR_PROGRESS = (0, 120, 255)
|
|
TIMELINE_COLOR_HANDLE = (255, 255, 255)
|
|
TIMELINE_COLOR_BORDER = (200, 200, 200)
|
|
TIMELINE_COLOR_CUT_POINT = (255, 0, 0)
|
|
|
|
# Zoom and crop settings
|
|
MIN_ZOOM = 0.1
|
|
MAX_ZOOM = 10.0
|
|
ZOOM_INCREMENT = 0.1
|
|
|
|
# Supported video extensions
|
|
VIDEO_EXTENSIONS = {'.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm', '.m4v'}
|
|
|
|
def __init__(self, path: str):
|
|
self.path = Path(path)
|
|
|
|
# Video file management
|
|
self.video_files = []
|
|
self.current_video_index = 0
|
|
|
|
# Determine if path is file or directory
|
|
if self.path.is_file():
|
|
self.video_files = [self.path]
|
|
elif self.path.is_dir():
|
|
# Load all video files from directory
|
|
self.video_files = self._get_video_files_from_directory(self.path)
|
|
if not self.video_files:
|
|
raise ValueError(f"No video files found in directory: {path}")
|
|
else:
|
|
raise ValueError(f"Path does not exist: {path}")
|
|
|
|
# Initialize with first video
|
|
self._load_video(self.video_files[0])
|
|
|
|
# Mouse and keyboard interaction
|
|
self.mouse_dragging = False
|
|
self.timeline_rect = None
|
|
self.window_width = 1200
|
|
self.window_height = 800
|
|
|
|
# Seeking state
|
|
self.is_seeking = False
|
|
self.current_seek_key = None
|
|
self.key_first_press_time = 0
|
|
self.last_seek_time = 0
|
|
|
|
# Crop settings
|
|
self.crop_rect = None # (x, y, width, height)
|
|
self.crop_selecting = False
|
|
self.crop_start_point = None
|
|
self.crop_preview_rect = None
|
|
self.crop_history = [] # For undo
|
|
|
|
# Zoom settings
|
|
self.zoom_factor = 1.0
|
|
self.zoom_center = None # (x, y) center point for zoom
|
|
|
|
# Cut points
|
|
self.cut_start_frame = None
|
|
self.cut_end_frame = None
|
|
|
|
# Display offset for panning when zoomed
|
|
self.display_offset = [0, 0]
|
|
|
|
def _get_video_files_from_directory(self, directory: Path) -> List[Path]:
|
|
"""Get all video files from a directory, sorted by name"""
|
|
video_files = []
|
|
for file_path in directory.iterdir():
|
|
if file_path.is_file() and file_path.suffix.lower() in self.VIDEO_EXTENSIONS:
|
|
video_files.append(file_path)
|
|
return sorted(video_files)
|
|
|
|
def _load_video(self, video_path: Path):
|
|
"""Load a video file and initialize video properties"""
|
|
if hasattr(self, 'cap') and self.cap:
|
|
self.cap.release()
|
|
|
|
self.video_path = video_path
|
|
self.cap = cv2.VideoCapture(str(self.video_path))
|
|
|
|
if not self.cap.isOpened():
|
|
raise ValueError(f"Could not open video file: {video_path}")
|
|
|
|
# Video properties
|
|
self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
|
|
self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
|
# Reset playback state for new video
|
|
self.current_frame = 0
|
|
self.is_playing = False
|
|
self.playback_speed = 1.0
|
|
self.current_display_frame = None
|
|
|
|
# Reset crop, zoom, and cut settings for new video
|
|
self.crop_rect = None
|
|
self.crop_history = []
|
|
self.zoom_factor = 1.0
|
|
self.zoom_center = None
|
|
self.cut_start_frame = None
|
|
self.cut_end_frame = None
|
|
self.display_offset = [0, 0]
|
|
|
|
print(f"Loaded video: {self.video_path.name} ({self.current_video_index + 1}/{len(self.video_files)})")
|
|
|
|
def switch_to_video(self, index: int):
|
|
"""Switch to a specific video by index"""
|
|
if 0 <= index < len(self.video_files):
|
|
self.current_video_index = index
|
|
self._load_video(self.video_files[index])
|
|
self.load_current_frame()
|
|
|
|
def next_video(self):
|
|
"""Switch to the next video"""
|
|
next_index = (self.current_video_index + 1) % len(self.video_files)
|
|
self.switch_to_video(next_index)
|
|
|
|
def previous_video(self):
|
|
"""Switch to the previous video"""
|
|
prev_index = (self.current_video_index - 1) % len(self.video_files)
|
|
self.switch_to_video(prev_index)
|
|
|
|
def load_current_frame(self) -> bool:
|
|
"""Load the current frame into display cache"""
|
|
self.cap.set(cv2.CAP_PROP_POS_FRAMES, self.current_frame)
|
|
ret, frame = self.cap.read()
|
|
if ret:
|
|
self.current_display_frame = frame
|
|
return True
|
|
return False
|
|
|
|
def calculate_frame_delay(self) -> int:
|
|
"""Calculate frame delay in milliseconds based on playback speed"""
|
|
delay_ms = int(self.BASE_FRAME_DELAY_MS / self.playback_speed)
|
|
return max(1, delay_ms)
|
|
|
|
def seek_video(self, frames_delta: int):
|
|
"""Seek video by specified number of frames"""
|
|
target_frame = max(0, min(self.current_frame + frames_delta, self.total_frames - 1))
|
|
self.current_frame = target_frame
|
|
self.load_current_frame()
|
|
|
|
def seek_to_frame(self, frame_number: int):
|
|
"""Seek to specific frame"""
|
|
self.current_frame = max(0, min(frame_number, self.total_frames - 1))
|
|
self.load_current_frame()
|
|
|
|
def advance_frame(self) -> bool:
|
|
"""Advance to next frame"""
|
|
if not self.is_playing:
|
|
return True
|
|
|
|
self.current_frame += 1
|
|
if self.current_frame >= self.total_frames:
|
|
self.current_frame = 0 # Loop
|
|
|
|
return self.load_current_frame()
|
|
|
|
def apply_crop_and_zoom(self, frame):
|
|
"""Apply current crop and zoom settings to frame"""
|
|
if frame is None:
|
|
return None
|
|
|
|
processed_frame = frame.copy()
|
|
|
|
# Apply crop first
|
|
if self.crop_rect:
|
|
x, y, w, h = self.crop_rect
|
|
x, y, w, h = int(x), int(y), int(w), int(h)
|
|
# Ensure crop is within frame bounds
|
|
x = max(0, min(x, frame.shape[1] - 1))
|
|
y = max(0, min(y, frame.shape[0] - 1))
|
|
w = min(w, frame.shape[1] - x)
|
|
h = min(h, frame.shape[0] - y)
|
|
if w > 0 and h > 0:
|
|
processed_frame = processed_frame[y:y+h, x:x+w]
|
|
|
|
# Apply zoom
|
|
if self.zoom_factor != 1.0:
|
|
height, width = processed_frame.shape[:2]
|
|
new_width = int(width * self.zoom_factor)
|
|
new_height = int(height * self.zoom_factor)
|
|
processed_frame = cv2.resize(processed_frame, (new_width, new_height), interpolation=cv2.INTER_LINEAR)
|
|
|
|
# Handle zoom center and display offset
|
|
if new_width > self.window_width or new_height > self.window_height:
|
|
# Calculate crop from zoomed image to fit window
|
|
start_x = max(0, self.display_offset[0])
|
|
start_y = max(0, self.display_offset[1])
|
|
end_x = min(new_width, start_x + self.window_width)
|
|
end_y = min(new_height, start_y + self.window_height)
|
|
processed_frame = processed_frame[start_y:end_y, start_x:end_x]
|
|
|
|
return processed_frame
|
|
|
|
def draw_timeline(self, frame):
|
|
"""Draw timeline at the bottom of the frame"""
|
|
height, width = frame.shape[:2]
|
|
|
|
# Timeline background area
|
|
timeline_y = height - self.TIMELINE_HEIGHT
|
|
cv2.rectangle(frame, (0, timeline_y), (width, height), (40, 40, 40), -1)
|
|
|
|
# Calculate timeline bar position
|
|
bar_y = timeline_y + (self.TIMELINE_HEIGHT - self.TIMELINE_BAR_HEIGHT) // 2
|
|
bar_x_start = self.TIMELINE_MARGIN
|
|
bar_x_end = width - self.TIMELINE_MARGIN
|
|
bar_width = bar_x_end - bar_x_start
|
|
|
|
self.timeline_rect = (bar_x_start, bar_y, bar_width, self.TIMELINE_BAR_HEIGHT)
|
|
|
|
# Draw timeline background
|
|
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BG, -1)
|
|
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_end, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_BORDER, 1)
|
|
|
|
# Draw progress
|
|
if self.total_frames > 0:
|
|
progress = self.current_frame / max(1, self.total_frames - 1)
|
|
progress_width = int(bar_width * progress)
|
|
if progress_width > 0:
|
|
cv2.rectangle(frame, (bar_x_start, bar_y), (bar_x_start + progress_width, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_PROGRESS, -1)
|
|
|
|
# Draw current position handle
|
|
handle_x = bar_x_start + progress_width
|
|
handle_y = bar_y + self.TIMELINE_BAR_HEIGHT // 2
|
|
cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_HANDLE, -1)
|
|
cv2.circle(frame, (handle_x, handle_y), self.TIMELINE_HANDLE_SIZE // 2, self.TIMELINE_COLOR_BORDER, 2)
|
|
|
|
# Draw cut points
|
|
if self.cut_start_frame is not None:
|
|
cut_start_progress = self.cut_start_frame / max(1, self.total_frames - 1)
|
|
cut_start_x = bar_x_start + int(bar_width * cut_start_progress)
|
|
cv2.line(frame, (cut_start_x, bar_y), (cut_start_x, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_CUT_POINT, 3)
|
|
cv2.putText(frame, "1", (cut_start_x - 5, bar_y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.TIMELINE_COLOR_CUT_POINT, 1)
|
|
|
|
if self.cut_end_frame is not None:
|
|
cut_end_progress = self.cut_end_frame / max(1, self.total_frames - 1)
|
|
cut_end_x = bar_x_start + int(bar_width * cut_end_progress)
|
|
cv2.line(frame, (cut_end_x, bar_y), (cut_end_x, bar_y + self.TIMELINE_BAR_HEIGHT), self.TIMELINE_COLOR_CUT_POINT, 3)
|
|
cv2.putText(frame, "2", (cut_end_x - 5, bar_y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.TIMELINE_COLOR_CUT_POINT, 1)
|
|
|
|
def draw_crop_overlay(self, canvas, start_x, start_y, frame_width, frame_height):
|
|
"""Draw crop overlay on canvas using screen coordinates"""
|
|
# Draw preview rectangle (green) - already in screen coordinates
|
|
if self.crop_preview_rect:
|
|
x, y, w, h = self.crop_preview_rect
|
|
cv2.rectangle(canvas, (int(x), int(y)), (int(x + w), int(y + h)), (0, 255, 0), 2)
|
|
|
|
# Draw final crop rectangle (red) - convert from video to screen coordinates
|
|
if self.crop_rect:
|
|
# Convert crop coordinates from original video to screen coordinates
|
|
x, y, w, h = self.crop_rect
|
|
|
|
# Apply the same scaling logic as in display_current_frame
|
|
original_height, original_width = self.current_display_frame.shape[:2]
|
|
available_height = self.window_height - self.TIMELINE_HEIGHT
|
|
|
|
scale = min(self.window_width / original_width, available_height / original_height)
|
|
if scale < 1.0:
|
|
new_width = int(original_width * scale)
|
|
new_height = int(original_height * scale)
|
|
else:
|
|
new_width = original_width
|
|
new_height = original_height
|
|
|
|
# Convert video coordinates to screen coordinates
|
|
screen_x = start_x + (x * new_width / original_width)
|
|
screen_y = start_y + (y * new_height / original_height)
|
|
screen_w = w * new_width / original_width
|
|
screen_h = h * new_height / original_height
|
|
|
|
cv2.rectangle(canvas, (int(screen_x), int(screen_y)),
|
|
(int(screen_x + screen_w), int(screen_y + screen_h)), (255, 0, 0), 2)
|
|
|
|
def display_current_frame(self):
|
|
"""Display the current frame with all overlays"""
|
|
if self.current_display_frame is None:
|
|
return
|
|
|
|
# Apply crop and zoom transformations for preview
|
|
display_frame = self.apply_crop_and_zoom(self.current_display_frame.copy())
|
|
|
|
if display_frame is None:
|
|
return
|
|
|
|
# Resize to fit window while maintaining aspect ratio
|
|
height, width = display_frame.shape[:2]
|
|
available_height = self.window_height - self.TIMELINE_HEIGHT
|
|
|
|
scale = min(self.window_width / width, available_height / height)
|
|
if scale < 1.0:
|
|
new_width = int(width * scale)
|
|
new_height = int(height * scale)
|
|
display_frame = cv2.resize(display_frame, (new_width, new_height))
|
|
|
|
# Create canvas with timeline space
|
|
canvas = np.zeros((self.window_height, self.window_width, 3), dtype=np.uint8)
|
|
|
|
# Center the frame on canvas
|
|
frame_height, frame_width = display_frame.shape[:2]
|
|
start_y = (available_height - frame_height) // 2
|
|
start_x = (self.window_width - frame_width) // 2
|
|
|
|
canvas[start_y:start_y + frame_height, start_x:start_x + frame_width] = display_frame
|
|
|
|
# Draw crop overlay
|
|
if self.crop_rect or self.crop_preview_rect:
|
|
self.draw_crop_overlay(canvas, start_x, start_y, frame_width, frame_height)
|
|
|
|
# Add info overlay
|
|
info_text = f"Frame: {self.current_frame}/{self.total_frames} | Speed: {self.playback_speed:.1f}x | Zoom: {self.zoom_factor:.1f}x | {'Playing' if self.is_playing else 'Paused'}"
|
|
cv2.putText(canvas, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
|
|
cv2.putText(canvas, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 1)
|
|
|
|
# Add video navigation info
|
|
if len(self.video_files) > 1:
|
|
video_text = f"Video: {self.current_video_index + 1}/{len(self.video_files)} - {self.video_path.name}"
|
|
cv2.putText(canvas, video_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
|
|
cv2.putText(canvas, video_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 1)
|
|
y_offset = 90
|
|
else:
|
|
y_offset = 60
|
|
|
|
# Add crop info
|
|
if self.crop_rect:
|
|
crop_text = f"Crop: {int(self.crop_rect[0])},{int(self.crop_rect[1])} {int(self.crop_rect[2])}x{int(self.crop_rect[3])}"
|
|
cv2.putText(canvas, crop_text, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
|
|
cv2.putText(canvas, crop_text, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 1)
|
|
y_offset += 30
|
|
|
|
# Add cut info
|
|
if self.cut_start_frame is not None or self.cut_end_frame is not None:
|
|
cut_text = f"Cut: {self.cut_start_frame or '?'} - {self.cut_end_frame or '?'}"
|
|
cv2.putText(canvas, cut_text, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
|
|
cv2.putText(canvas, cut_text, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 1)
|
|
|
|
# Draw timeline
|
|
self.draw_timeline(canvas)
|
|
|
|
cv2.imshow("Video Editor", canvas)
|
|
|
|
def mouse_callback(self, event, x, y, flags, param):
|
|
"""Handle mouse events"""
|
|
# Handle timeline interaction
|
|
if self.timeline_rect:
|
|
bar_x_start, bar_y, bar_width, bar_height = self.timeline_rect
|
|
bar_x_end = bar_x_start + bar_width
|
|
|
|
if bar_y <= y <= bar_y + bar_height + 10:
|
|
if event == cv2.EVENT_LBUTTONDOWN:
|
|
if bar_x_start <= x <= bar_x_end:
|
|
self.mouse_dragging = True
|
|
self.seek_to_timeline_position(x, bar_x_start, bar_width)
|
|
elif event == cv2.EVENT_MOUSEMOVE and self.mouse_dragging:
|
|
if bar_x_start <= x <= bar_x_end:
|
|
self.seek_to_timeline_position(x, bar_x_start, bar_width)
|
|
elif event == cv2.EVENT_LBUTTONUP:
|
|
self.mouse_dragging = False
|
|
return
|
|
|
|
# Handle crop selection (Shift + click and drag)
|
|
if flags & cv2.EVENT_FLAG_SHIFTKEY:
|
|
available_height = self.window_height - self.TIMELINE_HEIGHT
|
|
|
|
if event == cv2.EVENT_LBUTTONDOWN:
|
|
self.crop_selecting = True
|
|
self.crop_start_point = (x, y)
|
|
self.crop_preview_rect = None
|
|
elif event == cv2.EVENT_MOUSEMOVE and self.crop_selecting:
|
|
if self.crop_start_point:
|
|
start_x, start_y = self.crop_start_point
|
|
width = abs(x - start_x)
|
|
height = abs(y - start_y)
|
|
crop_x = min(start_x, x)
|
|
crop_y = min(start_y, y)
|
|
self.crop_preview_rect = (crop_x, crop_y, width, height)
|
|
elif event == cv2.EVENT_LBUTTONUP and self.crop_selecting:
|
|
if self.crop_start_point and self.crop_preview_rect:
|
|
# Convert screen coordinates to video coordinates
|
|
self.set_crop_from_screen_coords(self.crop_preview_rect)
|
|
self.crop_selecting = False
|
|
self.crop_start_point = None
|
|
self.crop_preview_rect = None
|
|
|
|
# Handle zoom center (Ctrl + click)
|
|
if flags & cv2.EVENT_FLAG_CTRLKEY and event == cv2.EVENT_LBUTTONDOWN:
|
|
self.zoom_center = (x, y)
|
|
|
|
# Handle scroll wheel for zoom (Ctrl + scroll)
|
|
if flags & cv2.EVENT_FLAG_CTRLKEY:
|
|
if event == cv2.EVENT_MOUSEWHEEL:
|
|
if flags > 0: # Scroll up
|
|
self.zoom_factor = min(self.MAX_ZOOM, self.zoom_factor + self.ZOOM_INCREMENT)
|
|
else: # Scroll down
|
|
self.zoom_factor = max(self.MIN_ZOOM, self.zoom_factor - self.ZOOM_INCREMENT)
|
|
|
|
def set_crop_from_screen_coords(self, screen_rect):
|
|
"""Convert screen coordinates to video frame coordinates and set crop"""
|
|
x, y, w, h = screen_rect
|
|
|
|
if self.current_display_frame is not None:
|
|
# Get the original frame dimensions
|
|
original_height, original_width = self.current_display_frame.shape[:2]
|
|
available_height = self.window_height - self.TIMELINE_HEIGHT
|
|
|
|
# Calculate how the original frame is displayed (after crop/zoom)
|
|
display_frame = self.apply_crop_and_zoom(self.current_display_frame.copy())
|
|
if display_frame is None:
|
|
return
|
|
|
|
display_height, display_width = display_frame.shape[:2]
|
|
|
|
# Calculate scale for the display frame
|
|
scale = min(self.window_width / display_width, available_height / display_height)
|
|
if scale < 1.0:
|
|
final_display_width = int(display_width * scale)
|
|
final_display_height = int(display_height * scale)
|
|
else:
|
|
final_display_width = display_width
|
|
final_display_height = display_height
|
|
scale = 1.0
|
|
|
|
start_x = (self.window_width - final_display_width) // 2
|
|
start_y = (available_height - final_display_height) // 2
|
|
|
|
# Convert screen coordinates to display frame coordinates
|
|
display_x = (x - start_x) / scale
|
|
display_y = (y - start_y) / scale
|
|
display_w = w / scale
|
|
display_h = h / scale
|
|
|
|
# Clamp to display frame bounds
|
|
display_x = max(0, min(display_x, display_width))
|
|
display_y = max(0, min(display_y, display_height))
|
|
display_w = min(display_w, display_width - display_x)
|
|
display_h = min(display_h, display_height - display_y)
|
|
|
|
# Convert display frame coordinates back to original frame coordinates
|
|
# This is the inverse of apply_crop_and_zoom
|
|
# The order in apply_crop_and_zoom is: crop first, then zoom
|
|
# So we need to reverse: zoom first, then crop
|
|
|
|
# Step 1: Reverse zoom (zoom is applied to the cropped frame)
|
|
if self.zoom_factor != 1.0:
|
|
display_x = display_x / self.zoom_factor
|
|
display_y = display_y / self.zoom_factor
|
|
display_w = display_w / self.zoom_factor
|
|
display_h = display_h / self.zoom_factor
|
|
|
|
# Step 2: Reverse crop (crop is applied to the original frame)
|
|
original_x = display_x
|
|
original_y = display_y
|
|
original_w = display_w
|
|
original_h = display_h
|
|
|
|
# Add the crop offset to get back to original frame coordinates
|
|
if self.crop_rect:
|
|
crop_x, crop_y, crop_w, crop_h = self.crop_rect
|
|
original_x += crop_x
|
|
original_y += crop_y
|
|
|
|
# Clamp to original frame bounds
|
|
original_x = max(0, min(original_x, original_width))
|
|
original_y = max(0, min(original_y, original_height))
|
|
original_w = min(original_w, original_width - original_x)
|
|
original_h = min(original_h, original_height - original_y)
|
|
|
|
|
|
if original_w > 10 and original_h > 10: # Minimum size check
|
|
# Save current crop for undo
|
|
if self.crop_rect:
|
|
self.crop_history.append(self.crop_rect)
|
|
self.crop_rect = (original_x, original_y, original_w, original_h)
|
|
|
|
def seek_to_timeline_position(self, mouse_x, bar_x_start, bar_width):
|
|
"""Seek to position based on mouse click on timeline"""
|
|
relative_x = mouse_x - bar_x_start
|
|
position_ratio = max(0, min(1, relative_x / bar_width))
|
|
target_frame = int(position_ratio * (self.total_frames - 1))
|
|
self.seek_to_frame(target_frame)
|
|
|
|
def undo_crop(self):
|
|
"""Undo the last crop operation"""
|
|
if self.crop_history:
|
|
self.crop_rect = self.crop_history.pop()
|
|
else:
|
|
self.crop_rect = None
|
|
|
|
def render_video(self, output_path: str):
|
|
"""Optimized video rendering with multithreading and batch processing"""
|
|
if not output_path.endswith('.mp4'):
|
|
output_path += '.mp4'
|
|
|
|
print(f"Rendering video to {output_path}...")
|
|
start_time = time.time()
|
|
|
|
# Determine frame range
|
|
start_frame = self.cut_start_frame if self.cut_start_frame is not None else 0
|
|
end_frame = self.cut_end_frame if self.cut_end_frame is not None else self.total_frames - 1
|
|
|
|
if start_frame >= end_frame:
|
|
print("Invalid cut range!")
|
|
return False
|
|
|
|
# Calculate output dimensions
|
|
if self.crop_rect:
|
|
output_width = int(self.crop_rect[2] * self.zoom_factor)
|
|
output_height = int(self.crop_rect[3] * self.zoom_factor)
|
|
else:
|
|
output_width = int(self.frame_width * self.zoom_factor)
|
|
output_height = int(self.frame_height * self.zoom_factor)
|
|
|
|
# Use mp4v codec (most compatible with MP4)
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
|
out = cv2.VideoWriter(output_path, fourcc, self.fps, (output_width, output_height))
|
|
|
|
if not out.isOpened():
|
|
print("Error: Could not open video writer!")
|
|
return False
|
|
|
|
total_output_frames = end_frame - start_frame + 1
|
|
frames_written = 0
|
|
last_progress_update = 0
|
|
|
|
# Batch processing configuration
|
|
batch_size = min(50, max(10, total_output_frames // 20)) # Adaptive batch size
|
|
frame_queue = Queue(maxsize=batch_size * 2)
|
|
processed_queue = Queue(maxsize=batch_size * 2)
|
|
|
|
def frame_reader():
|
|
"""Background thread for reading frames"""
|
|
try:
|
|
for frame_idx in range(start_frame, end_frame + 1):
|
|
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
|
|
ret, frame = self.cap.read()
|
|
if ret:
|
|
frame_queue.put((frame_idx, frame))
|
|
else:
|
|
break
|
|
finally:
|
|
frame_queue.put(None) # Signal end of frames
|
|
|
|
def frame_processor():
|
|
"""Background thread for processing frames"""
|
|
try:
|
|
while True:
|
|
item = frame_queue.get()
|
|
if item is None: # End signal
|
|
break
|
|
|
|
frame_idx, frame = item
|
|
processed_frame = self._process_frame_for_render(frame, output_width, output_height)
|
|
|
|
if processed_frame is not None:
|
|
processed_queue.put((frame_idx, processed_frame))
|
|
|
|
frame_queue.task_done()
|
|
finally:
|
|
processed_queue.put(None) # Signal end of processing
|
|
|
|
# Start background threads
|
|
reader_thread = threading.Thread(target=frame_reader, daemon=True)
|
|
processor_thread = threading.Thread(target=frame_processor, daemon=True)
|
|
|
|
reader_thread.start()
|
|
processor_thread.start()
|
|
|
|
# Main thread writes frames in order
|
|
expected_frame = start_frame
|
|
frame_buffer = {} # Buffer for out-of-order frames
|
|
|
|
while frames_written < total_output_frames:
|
|
item = processed_queue.get()
|
|
if item is None: # End signal
|
|
break
|
|
|
|
frame_idx, processed_frame = item
|
|
frame_buffer[frame_idx] = processed_frame
|
|
|
|
# Write frames in order
|
|
while expected_frame in frame_buffer:
|
|
out.write(frame_buffer.pop(expected_frame))
|
|
frames_written += 1
|
|
expected_frame += 1
|
|
|
|
# Progress update (throttled to avoid too frequent updates)
|
|
current_time = time.time()
|
|
if current_time - last_progress_update > 0.5: # Update every 0.5 seconds
|
|
progress = frames_written / total_output_frames * 100
|
|
elapsed = current_time - start_time
|
|
if frames_written > 0:
|
|
eta = (elapsed / frames_written) * (total_output_frames - frames_written)
|
|
fps_rate = frames_written / elapsed
|
|
print(f"Progress: {progress:.1f}% | {frames_written}/{total_output_frames} | "
|
|
f"FPS: {fps_rate:.1f} | ETA: {eta:.1f}s\r", end="")
|
|
last_progress_update = current_time
|
|
|
|
processed_queue.task_done()
|
|
|
|
# Wait for threads to complete
|
|
reader_thread.join()
|
|
processor_thread.join()
|
|
|
|
out.release()
|
|
|
|
total_time = time.time() - start_time
|
|
avg_fps = frames_written / total_time if total_time > 0 else 0
|
|
|
|
print(f"\nVideo rendered successfully to {output_path}")
|
|
print(f"Rendered {frames_written} frames in {total_time:.2f}s (avg {avg_fps:.1f} FPS)")
|
|
return True
|
|
|
|
def _process_frame_for_render(self, frame, output_width: int, output_height: int):
|
|
"""Process a single frame for rendering (optimized for speed)"""
|
|
try:
|
|
# Apply crop (vectorized operation)
|
|
if self.crop_rect:
|
|
x, y, w, h = map(int, self.crop_rect)
|
|
|
|
# Clamp coordinates to frame bounds
|
|
h_frame, w_frame = frame.shape[:2]
|
|
x = max(0, min(x, w_frame - 1))
|
|
y = max(0, min(y, h_frame - 1))
|
|
w = min(w, w_frame - x)
|
|
h = min(h, h_frame - y)
|
|
|
|
if w > 0 and h > 0:
|
|
frame = frame[y:y+h, x:x+w]
|
|
else:
|
|
return None
|
|
|
|
# Apply zoom and resize in one step for efficiency
|
|
if self.zoom_factor != 1.0:
|
|
height, width = frame.shape[:2]
|
|
intermediate_width = int(width * self.zoom_factor)
|
|
intermediate_height = int(height * self.zoom_factor)
|
|
|
|
# If zoom results in different dimensions than output, resize directly to output
|
|
if intermediate_width != output_width or intermediate_height != output_height:
|
|
frame = cv2.resize(frame, (output_width, output_height), interpolation=cv2.INTER_LINEAR)
|
|
else:
|
|
frame = cv2.resize(frame, (intermediate_width, intermediate_height), interpolation=cv2.INTER_LINEAR)
|
|
|
|
# Final size check and resize if needed
|
|
if frame.shape[1] != output_width or frame.shape[0] != output_height:
|
|
frame = cv2.resize(frame, (output_width, output_height), interpolation=cv2.INTER_LINEAR)
|
|
|
|
return frame
|
|
|
|
except Exception as e:
|
|
print(f"Error processing frame: {e}")
|
|
return None
|
|
|
|
def run(self):
|
|
"""Main editor loop"""
|
|
print("Video Editor Controls:")
|
|
print(" Space: Play/Pause")
|
|
print(" A/D: Seek backward/forward")
|
|
print(" W/S: Increase/Decrease speed")
|
|
print(" Shift+Click+Drag: Select crop area")
|
|
print(" U: Undo crop")
|
|
print(" C: Clear crop")
|
|
print(" Ctrl+Scroll: Zoom in/out")
|
|
print(" 1: Set cut start point")
|
|
print(" 2: Set cut end point")
|
|
if len(self.video_files) > 1:
|
|
print(" N: Next video")
|
|
print(" n: Previous video")
|
|
print(" Enter: Render video")
|
|
print(" Q/ESC: Quit")
|
|
print()
|
|
|
|
cv2.namedWindow("Video Editor", cv2.WINDOW_NORMAL)
|
|
cv2.resizeWindow("Video Editor", self.window_width, self.window_height)
|
|
cv2.setMouseCallback("Video Editor", self.mouse_callback)
|
|
|
|
self.load_current_frame()
|
|
|
|
while True:
|
|
self.display_current_frame()
|
|
|
|
delay = self.calculate_frame_delay() if self.is_playing else 30
|
|
key = cv2.waitKey(delay) & 0xFF
|
|
|
|
if key == ord('q') or key == 27: # ESC
|
|
break
|
|
elif key == ord(' '):
|
|
self.is_playing = not self.is_playing
|
|
elif key == ord('a'):
|
|
self.seek_video(-1)
|
|
elif key == ord('d'):
|
|
self.seek_video(1)
|
|
elif key == ord('w'):
|
|
self.playback_speed = min(self.MAX_PLAYBACK_SPEED, self.playback_speed + self.SPEED_INCREMENT)
|
|
elif key == ord('s'):
|
|
self.playback_speed = max(self.MIN_PLAYBACK_SPEED, self.playback_speed - self.SPEED_INCREMENT)
|
|
elif key == ord('u'):
|
|
self.undo_crop()
|
|
elif key == ord('c'):
|
|
if self.crop_rect:
|
|
self.crop_history.append(self.crop_rect)
|
|
self.crop_rect = None
|
|
elif key == ord('1'):
|
|
self.cut_start_frame = self.current_frame
|
|
print(f"Set cut start at frame {self.current_frame}")
|
|
elif key == ord('2'):
|
|
self.cut_end_frame = self.current_frame
|
|
print(f"Set cut end at frame {self.current_frame}")
|
|
elif key == ord('n'):
|
|
if len(self.video_files) > 1:
|
|
self.previous_video()
|
|
elif key == ord('N'):
|
|
if len(self.video_files) > 1:
|
|
self.next_video()
|
|
elif key == 13: # Enter
|
|
output_name = f"{self.video_path.stem}_edited.mp4"
|
|
self.render_video(str(self.video_path.parent / output_name))
|
|
|
|
# Auto advance frame when playing
|
|
if self.is_playing:
|
|
self.advance_frame()
|
|
|
|
self.cap.release()
|
|
cv2.destroyAllWindows()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Fast Video Editor - Crop, Zoom, and Cut videos")
|
|
parser.add_argument("video", help="Path to video file or directory containing videos")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not os.path.exists(args.video):
|
|
print(f"Error: {args.video} does not exist")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
editor = VideoEditor(args.video)
|
|
editor.run()
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|