Try refactor everything into separate files
This commit is contained in:
68
croppa/capture.py
Normal file
68
croppa/capture.py
Normal file
@@ -0,0 +1,68 @@
|
||||
import cv2
|
||||
from collections import OrderedDict
|
||||
|
||||
|
||||
class Cv2BufferedCap:
|
||||
"""Buffered wrapper around cv2.VideoCapture that handles frame loading, seeking, and caching correctly"""
|
||||
|
||||
def __init__(self, video_path, backend=None, cache_size=10000):
|
||||
self.video_path = video_path
|
||||
self.cap = cv2.VideoCapture(str(video_path), backend)
|
||||
if not self.cap.isOpened():
|
||||
raise ValueError(f"Could not open video: {video_path}")
|
||||
|
||||
# Video properties
|
||||
self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
|
||||
self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
|
||||
# Current position tracking
|
||||
self.current_frame = 0
|
||||
|
||||
# Frame cache (LRU)
|
||||
self.cache_size = cache_size
|
||||
self.frame_cache = OrderedDict()
|
||||
|
||||
def get_frame(self, frame_number):
|
||||
"""Get frame at specific index - always accurate"""
|
||||
# Clamp frame number to valid range
|
||||
frame_number = max(0, min(frame_number, self.total_frames - 1))
|
||||
|
||||
# Check cache first
|
||||
if frame_number in self.frame_cache:
|
||||
self.frame_cache.move_to_end(frame_number)
|
||||
return self.frame_cache[frame_number]
|
||||
|
||||
# Optimize for sequential reading (next frame)
|
||||
if frame_number == self.current_frame + 1:
|
||||
ret, frame = self.cap.read()
|
||||
else:
|
||||
# Seek for non-sequential access
|
||||
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
|
||||
ret, frame = self.cap.read()
|
||||
|
||||
if ret:
|
||||
self.current_frame = frame_number
|
||||
# Store in cache, evict least recently used if cache is full
|
||||
if len(self.frame_cache) >= self.cache_size:
|
||||
self.frame_cache.popitem(last=False)
|
||||
self.frame_cache[frame_number] = frame
|
||||
self.frame_cache.move_to_end(frame_number)
|
||||
return frame
|
||||
else:
|
||||
raise ValueError(f"Failed to read frame {frame_number}")
|
||||
|
||||
def advance_frame(self, frames=1):
|
||||
"""Advance by specified number of frames"""
|
||||
new_frame = self.current_frame + frames
|
||||
return self.get_frame(new_frame)
|
||||
|
||||
def release(self):
|
||||
"""Release the video capture"""
|
||||
if self.cap:
|
||||
self.cap.release()
|
||||
|
||||
def isOpened(self):
|
||||
"""Check if capture is opened"""
|
||||
return self.cap and self.cap.isOpened()
|
||||
697
croppa/main.py
697
croppa/main.py
@@ -16,698 +16,12 @@ from collections import OrderedDict
|
||||
from datetime import datetime
|
||||
from PIL import Image
|
||||
|
||||
def load_image_utf8(image_path):
|
||||
"""Load image with UTF-8 path support using PIL, then convert to OpenCV format"""
|
||||
try:
|
||||
# Use PIL to load image with UTF-8 support
|
||||
pil_image = Image.open(image_path)
|
||||
# Convert PIL image to OpenCV format (BGR)
|
||||
cv_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
|
||||
return cv_image
|
||||
except Exception as e:
|
||||
raise ValueError(f"Could not load image file: {image_path} - {e}")
|
||||
from croppa.utils import load_image_utf8, get_active_window_title
|
||||
from croppa.tracking import FeatureTracker
|
||||
from croppa.capture import Cv2BufferedCap
|
||||
from croppa.project_view import ProjectView
|
||||
|
||||
|
||||
class FeatureTracker:
|
||||
"""Semi-automatic feature tracking with SIFT/SURF/ORB support and full state serialization"""
|
||||
|
||||
def __init__(self):
|
||||
# Feature detection parameters
|
||||
self.detector_type = 'SIFT' # 'SIFT', 'SURF', 'ORB'
|
||||
self.max_features = 1000
|
||||
self.match_threshold = 0.7
|
||||
|
||||
# Tracking state
|
||||
self.features = {} # {frame_number: {'keypoints': [...], 'descriptors': [...], 'positions': [...]}}
|
||||
self.tracking_enabled = False
|
||||
self.auto_tracking = False
|
||||
|
||||
# Initialize detectors
|
||||
self._init_detectors()
|
||||
|
||||
def _init_detectors(self):
|
||||
"""Initialize feature detectors based on type"""
|
||||
try:
|
||||
if self.detector_type == 'SIFT':
|
||||
self.detector = cv2.SIFT_create(nfeatures=self.max_features)
|
||||
elif self.detector_type == 'SURF':
|
||||
# SURF requires opencv-contrib-python, fallback to SIFT
|
||||
print("Warning: SURF requires opencv-contrib-python package. Using SIFT instead.")
|
||||
self.detector = cv2.SIFT_create(nfeatures=self.max_features)
|
||||
self.detector_type = 'SIFT'
|
||||
elif self.detector_type == 'ORB':
|
||||
self.detector = cv2.ORB_create(nfeatures=self.max_features)
|
||||
else:
|
||||
raise ValueError(f"Unknown detector type: {self.detector_type}")
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not initialize {self.detector_type} detector: {e}")
|
||||
# Fallback to ORB
|
||||
self.detector_type = 'ORB'
|
||||
self.detector = cv2.ORB_create(nfeatures=self.max_features)
|
||||
|
||||
def set_detector_type(self, detector_type: str):
|
||||
"""Change detector type and reinitialize"""
|
||||
if detector_type in ['SIFT', 'SURF', 'ORB']:
|
||||
self.detector_type = detector_type
|
||||
self._init_detectors()
|
||||
print(f"Switched to {detector_type} detector")
|
||||
else:
|
||||
print(f"Invalid detector type: {detector_type}")
|
||||
|
||||
def extract_features(self, frame: np.ndarray, frame_number: int, coord_mapper=None) -> bool:
|
||||
"""Extract features from a frame and store them"""
|
||||
try:
|
||||
# Convert to grayscale if needed
|
||||
if len(frame.shape) == 3:
|
||||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
else:
|
||||
gray = frame
|
||||
|
||||
# Extract keypoints and descriptors
|
||||
keypoints, descriptors = self.detector.detectAndCompute(gray, None)
|
||||
|
||||
if keypoints is None or descriptors is None:
|
||||
return False
|
||||
|
||||
# Map coordinates back to original frame space if mapper provided
|
||||
if coord_mapper:
|
||||
mapped_positions = []
|
||||
for kp in keypoints:
|
||||
orig_x, orig_y = coord_mapper(kp.pt[0], kp.pt[1])
|
||||
mapped_positions.append((int(orig_x), int(orig_y)))
|
||||
else:
|
||||
mapped_positions = [(int(kp.pt[0]), int(kp.pt[1])) for kp in keypoints]
|
||||
|
||||
# Store features
|
||||
self.features[frame_number] = {
|
||||
'keypoints': keypoints,
|
||||
'descriptors': descriptors,
|
||||
'positions': mapped_positions
|
||||
}
|
||||
|
||||
print(f"Extracted {len(keypoints)} features from frame {frame_number}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error extracting features from frame {frame_number}: {e}")
|
||||
return False
|
||||
|
||||
def extract_features_from_region(self, frame: np.ndarray, frame_number: int, coord_mapper=None) -> bool:
|
||||
"""Extract features from a frame and ADD them to existing features"""
|
||||
try:
|
||||
# Convert to grayscale if needed
|
||||
if len(frame.shape) == 3:
|
||||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
else:
|
||||
gray = frame
|
||||
|
||||
# Extract keypoints and descriptors
|
||||
keypoints, descriptors = self.detector.detectAndCompute(gray, None)
|
||||
|
||||
if keypoints is None or descriptors is None:
|
||||
return False
|
||||
|
||||
# Map coordinates back to original frame space if mapper provided
|
||||
if coord_mapper:
|
||||
mapped_positions = []
|
||||
for kp in keypoints:
|
||||
orig_x, orig_y = coord_mapper(kp.pt[0], kp.pt[1])
|
||||
mapped_positions.append((int(orig_x), int(orig_y)))
|
||||
else:
|
||||
mapped_positions = [(int(kp.pt[0]), int(kp.pt[1])) for kp in keypoints]
|
||||
|
||||
# Add to existing features or create new entry
|
||||
if frame_number in self.features:
|
||||
# Check if descriptor dimensions match
|
||||
existing_features = self.features[frame_number]
|
||||
if existing_features['descriptors'].shape[1] != descriptors.shape[1]:
|
||||
print(f"Warning: Descriptor dimension mismatch ({existing_features['descriptors'].shape[1]} vs {descriptors.shape[1]}). Cannot concatenate. Replacing features.")
|
||||
# Replace instead of concatenate when dimensions don't match
|
||||
existing_features['keypoints'] = keypoints
|
||||
existing_features['descriptors'] = descriptors
|
||||
existing_features['positions'] = mapped_positions
|
||||
else:
|
||||
# Append to existing features
|
||||
existing_features['keypoints'] = np.concatenate([existing_features['keypoints'], keypoints])
|
||||
existing_features['descriptors'] = np.concatenate([existing_features['descriptors'], descriptors])
|
||||
existing_features['positions'].extend(mapped_positions)
|
||||
print(f"Added {len(keypoints)} features to frame {frame_number} (total: {len(existing_features['positions'])})")
|
||||
else:
|
||||
# Create new features entry
|
||||
self.features[frame_number] = {
|
||||
'keypoints': keypoints,
|
||||
'descriptors': descriptors,
|
||||
'positions': mapped_positions
|
||||
}
|
||||
print(f"Extracted {len(keypoints)} features from frame {frame_number}")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error extracting features from frame {frame_number}: {e}")
|
||||
return False
|
||||
|
||||
def track_features_optical_flow(self, prev_frame, curr_frame, prev_points):
|
||||
"""Track features using Lucas-Kanade optical flow"""
|
||||
try:
|
||||
# Convert to grayscale if needed
|
||||
if len(prev_frame.shape) == 3:
|
||||
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
|
||||
else:
|
||||
prev_gray = prev_frame
|
||||
|
||||
if len(curr_frame.shape) == 3:
|
||||
curr_gray = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY)
|
||||
else:
|
||||
curr_gray = curr_frame
|
||||
|
||||
# Parameters for Lucas-Kanade optical flow
|
||||
lk_params = dict(winSize=(15, 15),
|
||||
maxLevel=2,
|
||||
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))
|
||||
|
||||
# Calculate optical flow
|
||||
new_points, status, _ = cv2.calcOpticalFlowPyrLK(prev_gray, curr_gray, prev_points, None, **lk_params)
|
||||
|
||||
# Filter out bad tracks
|
||||
good_new = new_points[status == 1]
|
||||
good_old = prev_points[status == 1]
|
||||
|
||||
return good_new, good_old, status
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error in optical flow tracking: {e}")
|
||||
return None, None, None
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def clear_features(self):
|
||||
"""Clear all stored features"""
|
||||
self.features.clear()
|
||||
print("All features cleared")
|
||||
|
||||
def get_feature_count(self, frame_number: int) -> int:
|
||||
"""Get number of features for a frame"""
|
||||
if frame_number in self.features:
|
||||
return len(self.features[frame_number]['positions'])
|
||||
return 0
|
||||
|
||||
def serialize_features(self) -> Dict[str, Any]:
|
||||
"""Serialize features for state saving"""
|
||||
serialized = {}
|
||||
|
||||
for frame_num, frame_data in self.features.items():
|
||||
frame_key = str(frame_num)
|
||||
serialized[frame_key] = {
|
||||
'positions': frame_data['positions'],
|
||||
'keypoints': None, # Keypoints are not serialized (too large)
|
||||
'descriptors': None # Descriptors are not serialized (too large)
|
||||
}
|
||||
|
||||
return serialized
|
||||
|
||||
def deserialize_features(self, serialized_data: Dict[str, Any]):
|
||||
"""Deserialize features from state loading"""
|
||||
self.features.clear()
|
||||
|
||||
for frame_key, frame_data in serialized_data.items():
|
||||
frame_num = int(frame_key)
|
||||
self.features[frame_num] = {
|
||||
'positions': frame_data['positions'],
|
||||
'keypoints': None,
|
||||
'descriptors': None
|
||||
}
|
||||
|
||||
print(f"Deserialized features for {len(self.features)} frames")
|
||||
|
||||
def get_state_dict(self) -> Dict[str, Any]:
|
||||
"""Get complete state for serialization"""
|
||||
return {
|
||||
'detector_type': self.detector_type,
|
||||
'max_features': self.max_features,
|
||||
'match_threshold': self.match_threshold,
|
||||
'tracking_enabled': self.tracking_enabled,
|
||||
'auto_tracking': self.auto_tracking,
|
||||
'features': self.serialize_features()
|
||||
}
|
||||
|
||||
def load_state_dict(self, state_dict: Dict[str, Any]):
|
||||
"""Load complete state from serialization"""
|
||||
if 'detector_type' in state_dict:
|
||||
self.detector_type = state_dict['detector_type']
|
||||
self._init_detectors()
|
||||
|
||||
if 'max_features' in state_dict:
|
||||
self.max_features = state_dict['max_features']
|
||||
|
||||
if 'match_threshold' in state_dict:
|
||||
self.match_threshold = state_dict['match_threshold']
|
||||
|
||||
if 'tracking_enabled' in state_dict:
|
||||
self.tracking_enabled = state_dict['tracking_enabled']
|
||||
|
||||
if 'auto_tracking' in state_dict:
|
||||
self.auto_tracking = state_dict['auto_tracking']
|
||||
|
||||
if 'features' in state_dict:
|
||||
self.deserialize_features(state_dict['features'])
|
||||
|
||||
print("Feature tracker state loaded")
|
||||
|
||||
|
||||
class Cv2BufferedCap:
|
||||
"""Buffered wrapper around cv2.VideoCapture that handles frame loading, seeking, and caching correctly"""
|
||||
|
||||
def __init__(self, video_path, backend=None, cache_size=10000):
|
||||
self.video_path = video_path
|
||||
self.cap = cv2.VideoCapture(str(video_path), backend)
|
||||
if not self.cap.isOpened():
|
||||
raise ValueError(f"Could not open video: {video_path}")
|
||||
|
||||
# Video properties
|
||||
self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
|
||||
self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
|
||||
# Current position tracking
|
||||
self.current_frame = 0
|
||||
|
||||
# Frame cache (LRU)
|
||||
self.cache_size = cache_size
|
||||
self.frame_cache = OrderedDict()
|
||||
|
||||
|
||||
|
||||
def get_frame(self, frame_number):
|
||||
"""Get frame at specific index - always accurate"""
|
||||
# Clamp frame number to valid range
|
||||
frame_number = max(0, min(frame_number, self.total_frames - 1))
|
||||
|
||||
# Check cache first
|
||||
if frame_number in self.frame_cache:
|
||||
self.frame_cache.move_to_end(frame_number)
|
||||
return self.frame_cache[frame_number]
|
||||
|
||||
# Optimize for sequential reading (next frame)
|
||||
if frame_number == self.current_frame + 1:
|
||||
ret, frame = self.cap.read()
|
||||
else:
|
||||
# Seek for non-sequential access
|
||||
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
|
||||
ret, frame = self.cap.read()
|
||||
|
||||
if ret:
|
||||
self.current_frame = frame_number
|
||||
# Store in cache, evict least recently used if cache is full
|
||||
if len(self.frame_cache) >= self.cache_size:
|
||||
self.frame_cache.popitem(last=False)
|
||||
self.frame_cache[frame_number] = frame
|
||||
self.frame_cache.move_to_end(frame_number)
|
||||
return frame
|
||||
else:
|
||||
raise ValueError(f"Failed to read frame {frame_number}")
|
||||
|
||||
def advance_frame(self, frames=1):
|
||||
"""Advance by specified number of frames"""
|
||||
new_frame = self.current_frame + frames
|
||||
return self.get_frame(new_frame)
|
||||
|
||||
def release(self):
|
||||
"""Release the video capture"""
|
||||
if self.cap:
|
||||
self.cap.release()
|
||||
|
||||
def isOpened(self):
|
||||
"""Check if capture is opened"""
|
||||
return self.cap and self.cap.isOpened()
|
||||
|
||||
def get_active_window_title():
|
||||
"""Get the title of the currently active window"""
|
||||
try:
|
||||
# Get handle to foreground window
|
||||
hwnd = ctypes.windll.user32.GetForegroundWindow()
|
||||
|
||||
# Get window title length
|
||||
length = ctypes.windll.user32.GetWindowTextLengthW(hwnd)
|
||||
|
||||
# Create buffer and get window title
|
||||
buffer = ctypes.create_unicode_buffer(length + 1)
|
||||
ctypes.windll.user32.GetWindowTextW(hwnd, buffer, length + 1)
|
||||
|
||||
return buffer.value
|
||||
except:
|
||||
return ""
|
||||
|
||||
class ProjectView:
|
||||
"""Project view that displays videos in current directory with progress bars"""
|
||||
|
||||
# Project view configuration
|
||||
THUMBNAIL_SIZE = (200, 150) # Width, Height
|
||||
THUMBNAIL_MARGIN = 20
|
||||
PROGRESS_BAR_HEIGHT = 8
|
||||
TEXT_HEIGHT = 30
|
||||
|
||||
# Colors
|
||||
BG_COLOR = (40, 40, 40)
|
||||
THUMBNAIL_BG_COLOR = (60, 60, 60)
|
||||
PROGRESS_BG_COLOR = (80, 80, 80)
|
||||
PROGRESS_FILL_COLOR = (0, 120, 255)
|
||||
TEXT_COLOR = (255, 255, 255)
|
||||
SELECTED_COLOR = (255, 165, 0)
|
||||
|
||||
def __init__(self, directory: Path, video_editor):
|
||||
self.directory = directory
|
||||
self.video_editor = video_editor
|
||||
self.video_files = []
|
||||
self.thumbnails = {}
|
||||
self.progress_data = {}
|
||||
self.selected_index = 0
|
||||
self.scroll_offset = 0
|
||||
self.items_per_row = 2 # Default to 2 items per row
|
||||
self.window_width = 1920 # Increased to accommodate 1080p videos
|
||||
self.window_height = 1200
|
||||
|
||||
self._load_video_files()
|
||||
self._load_progress_data()
|
||||
|
||||
def _calculate_thumbnail_size(self, window_width: int) -> tuple:
|
||||
"""Calculate thumbnail size based on items per row and window width"""
|
||||
available_width = window_width - self.THUMBNAIL_MARGIN
|
||||
item_width = (available_width - (self.items_per_row - 1) * self.THUMBNAIL_MARGIN) // self.items_per_row
|
||||
thumbnail_width = max(50, item_width) # Minimum 50px width
|
||||
thumbnail_height = int(thumbnail_width * self.THUMBNAIL_SIZE[1] / self.THUMBNAIL_SIZE[0]) # Maintain aspect ratio
|
||||
return (thumbnail_width, thumbnail_height)
|
||||
|
||||
def _load_video_files(self):
|
||||
"""Load all video files from directory"""
|
||||
self.video_files = []
|
||||
for file_path in self.directory.iterdir():
|
||||
if (file_path.is_file() and
|
||||
file_path.suffix.lower() in self.video_editor.VIDEO_EXTENSIONS):
|
||||
self.video_files.append(file_path)
|
||||
self.video_files.sort(key=lambda x: x.name)
|
||||
|
||||
def _load_progress_data(self):
|
||||
"""Load progress data from JSON state files"""
|
||||
self.progress_data = {}
|
||||
for video_path in self.video_files:
|
||||
state_file = video_path.with_suffix('.json')
|
||||
if state_file.exists():
|
||||
try:
|
||||
with open(state_file, 'r') as f:
|
||||
state = json.load(f)
|
||||
current_frame = state.get('current_frame', 0)
|
||||
|
||||
# Get total frames from video
|
||||
cap = cv2.VideoCapture(str(video_path))
|
||||
if cap.isOpened():
|
||||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
cap.release()
|
||||
|
||||
if total_frames > 0:
|
||||
progress = current_frame / (total_frames - 1)
|
||||
self.progress_data[video_path] = {
|
||||
'current_frame': current_frame,
|
||||
'total_frames': total_frames,
|
||||
'progress': progress
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"Error loading progress for {video_path.name}: {e}")
|
||||
|
||||
def refresh_progress_data(self):
|
||||
"""Refresh progress data from JSON files (call when editor state changes)"""
|
||||
self._load_progress_data()
|
||||
|
||||
def get_progress_for_video(self, video_path: Path) -> float:
|
||||
"""Get progress (0.0 to 1.0) for a video"""
|
||||
if video_path in self.progress_data:
|
||||
return self.progress_data[video_path]['progress']
|
||||
return 0.0
|
||||
|
||||
def get_thumbnail_for_video(self, video_path: Path, size: tuple = None) -> np.ndarray:
|
||||
"""Get thumbnail for a video, generating it if needed"""
|
||||
if size is None:
|
||||
size = self.THUMBNAIL_SIZE
|
||||
|
||||
# Cache the original thumbnail by video path only (not size)
|
||||
if video_path in self.thumbnails:
|
||||
original_thumbnail = self.thumbnails[video_path]
|
||||
# Resize the cached thumbnail to the requested size
|
||||
return cv2.resize(original_thumbnail, size)
|
||||
|
||||
# Generate original thumbnail on demand (only once per video)
|
||||
try:
|
||||
cap = cv2.VideoCapture(str(video_path))
|
||||
if cap.isOpened():
|
||||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
if total_frames > 0:
|
||||
middle_frame = total_frames // 2
|
||||
cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame)
|
||||
ret, frame = cap.read()
|
||||
if ret:
|
||||
# Store original thumbnail at original size
|
||||
original_thumbnail = cv2.resize(frame, self.THUMBNAIL_SIZE)
|
||||
self.thumbnails[video_path] = original_thumbnail
|
||||
cap.release()
|
||||
# Return resized version
|
||||
return cv2.resize(original_thumbnail, size)
|
||||
cap.release()
|
||||
except Exception as e:
|
||||
print(f"Error generating thumbnail for {video_path.name}: {e}")
|
||||
|
||||
# Return a placeholder if thumbnail generation failed
|
||||
placeholder = np.full((size[1], size[0], 3),
|
||||
self.THUMBNAIL_BG_COLOR, dtype=np.uint8)
|
||||
return placeholder
|
||||
|
||||
def draw(self) -> np.ndarray:
|
||||
"""Draw the project view"""
|
||||
# Get actual window size dynamically
|
||||
try:
|
||||
# Try to get the actual window size from OpenCV
|
||||
window_rect = cv2.getWindowImageRect("Project View")
|
||||
if window_rect[2] > 0 and window_rect[3] > 0: # width and height > 0
|
||||
actual_width = window_rect[2]
|
||||
actual_height = window_rect[3]
|
||||
else:
|
||||
# Fallback to default size
|
||||
actual_width = self.window_width
|
||||
actual_height = self.window_height
|
||||
except:
|
||||
# Fallback to default size
|
||||
actual_width = self.window_width
|
||||
actual_height = self.window_height
|
||||
|
||||
canvas = np.full((actual_height, actual_width, 3), self.BG_COLOR, dtype=np.uint8)
|
||||
|
||||
if not self.video_files:
|
||||
# No videos message
|
||||
text = "No videos found in directory"
|
||||
font = cv2.FONT_HERSHEY_SIMPLEX
|
||||
text_size = cv2.getTextSize(text, font, 1.0, 2)[0]
|
||||
text_x = (actual_width - text_size[0]) // 2
|
||||
text_y = (actual_height - text_size[1]) // 2
|
||||
cv2.putText(canvas, text, (text_x, text_y), font, 1.0, self.TEXT_COLOR, 2)
|
||||
return canvas
|
||||
|
||||
# Calculate layout - use fixed items_per_row and calculate thumbnail size to fit
|
||||
items_per_row = min(self.items_per_row, len(self.video_files)) # Don't exceed number of videos
|
||||
|
||||
# Calculate thumbnail size to fit the desired number of items per row
|
||||
thumbnail_width, thumbnail_height = self._calculate_thumbnail_size(actual_width)
|
||||
|
||||
# Calculate item height dynamically based on thumbnail size
|
||||
item_height = thumbnail_height + self.PROGRESS_BAR_HEIGHT + self.TEXT_HEIGHT + self.THUMBNAIL_MARGIN
|
||||
|
||||
item_width = (actual_width - (items_per_row + 1) * self.THUMBNAIL_MARGIN) // items_per_row
|
||||
|
||||
# Draw videos in grid
|
||||
for i, video_path in enumerate(self.video_files):
|
||||
row = i // items_per_row
|
||||
col = i % items_per_row
|
||||
|
||||
# Skip if scrolled out of view
|
||||
if row < self.scroll_offset:
|
||||
continue
|
||||
if row > self.scroll_offset + (actual_height // item_height):
|
||||
break
|
||||
|
||||
# Calculate position
|
||||
x = self.THUMBNAIL_MARGIN + col * (item_width + self.THUMBNAIL_MARGIN)
|
||||
y = self.THUMBNAIL_MARGIN + (row - self.scroll_offset) * item_height
|
||||
|
||||
# Draw thumbnail background
|
||||
cv2.rectangle(canvas,
|
||||
(x, y),
|
||||
(x + thumbnail_width, y + thumbnail_height),
|
||||
self.THUMBNAIL_BG_COLOR, -1)
|
||||
|
||||
# Draw selection highlight
|
||||
if i == self.selected_index:
|
||||
cv2.rectangle(canvas,
|
||||
(x - 2, y - 2),
|
||||
(x + thumbnail_width + 2, y + thumbnail_height + 2),
|
||||
self.SELECTED_COLOR, 3)
|
||||
|
||||
# Draw thumbnail
|
||||
thumbnail = self.get_thumbnail_for_video(video_path, (thumbnail_width, thumbnail_height))
|
||||
# Thumbnail is already the correct size, no need to resize
|
||||
resized_thumbnail = thumbnail
|
||||
|
||||
# Ensure thumbnail doesn't exceed canvas bounds
|
||||
end_y = min(y + thumbnail_height, actual_height)
|
||||
end_x = min(x + thumbnail_width, actual_width)
|
||||
thumb_height = end_y - y
|
||||
thumb_width = end_x - x
|
||||
|
||||
if thumb_height > 0 and thumb_width > 0:
|
||||
# Resize thumbnail to fit within bounds if necessary
|
||||
if thumb_height != thumbnail_height or thumb_width != thumbnail_width:
|
||||
resized_thumbnail = cv2.resize(thumbnail, (thumb_width, thumb_height))
|
||||
|
||||
canvas[y:end_y, x:end_x] = resized_thumbnail
|
||||
|
||||
# Draw progress bar
|
||||
progress_y = y + thumbnail_height + 5
|
||||
progress_width = thumbnail_width
|
||||
progress = self.get_progress_for_video(video_path)
|
||||
|
||||
# Progress background
|
||||
cv2.rectangle(canvas,
|
||||
(x, progress_y),
|
||||
(x + progress_width, progress_y + self.PROGRESS_BAR_HEIGHT),
|
||||
self.PROGRESS_BG_COLOR, -1)
|
||||
|
||||
# Progress fill
|
||||
if progress > 0:
|
||||
fill_width = int(progress_width * progress)
|
||||
cv2.rectangle(canvas,
|
||||
(x, progress_y),
|
||||
(x + fill_width, progress_y + self.PROGRESS_BAR_HEIGHT),
|
||||
self.PROGRESS_FILL_COLOR, -1)
|
||||
|
||||
# Draw filename
|
||||
filename = video_path.name
|
||||
# Truncate if too long
|
||||
if len(filename) > 25:
|
||||
filename = filename[:22] + "..."
|
||||
|
||||
text_y = progress_y + self.PROGRESS_BAR_HEIGHT + 20
|
||||
cv2.putText(canvas, filename, (x, text_y),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.6, self.TEXT_COLOR, 2)
|
||||
|
||||
# Draw progress percentage
|
||||
if video_path in self.progress_data:
|
||||
progress_text = f"{progress * 100:.0f}%"
|
||||
text_size = cv2.getTextSize(progress_text, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)[0]
|
||||
progress_text_x = x + progress_width - text_size[0]
|
||||
cv2.putText(canvas, progress_text, (progress_text_x, text_y),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.TEXT_COLOR, 1)
|
||||
|
||||
# Draw instructions
|
||||
instructions = [
|
||||
"Project View - Videos in current directory",
|
||||
"WASD: Navigate | E: Open video | Q: Fewer items per row | Y: More items per row | q: Quit | ESC: Back to editor",
|
||||
f"Showing {len(self.video_files)} videos | {items_per_row} per row | Thumbnail: {thumbnail_width}x{thumbnail_height}"
|
||||
]
|
||||
|
||||
for i, instruction in enumerate(instructions):
|
||||
y_pos = actual_height - 60 + i * 20
|
||||
cv2.putText(canvas, instruction, (10, y_pos),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.TEXT_COLOR, 1)
|
||||
|
||||
return canvas
|
||||
|
||||
def handle_key(self, key: int) -> str:
|
||||
"""Handle keyboard input, returns action taken"""
|
||||
if key == 27: # ESC
|
||||
return "back_to_editor"
|
||||
elif key == ord('q'): # lowercase q - Quit
|
||||
return "quit"
|
||||
elif key == ord('e') or key == ord('E'): # E - Open video
|
||||
if self.video_files and 0 <= self.selected_index < len(self.video_files):
|
||||
return f"open_video:{self.video_files[self.selected_index]}"
|
||||
elif key == ord('w') or key == ord('W'): # W - Up
|
||||
current_items_per_row = min(self.items_per_row, len(self.video_files))
|
||||
if self.selected_index >= current_items_per_row:
|
||||
self.selected_index -= current_items_per_row
|
||||
else:
|
||||
self.selected_index = 0
|
||||
self._update_scroll()
|
||||
elif key == ord('s') or key == ord('S'): # S - Down
|
||||
current_items_per_row = min(self.items_per_row, len(self.video_files))
|
||||
if self.selected_index + current_items_per_row < len(self.video_files):
|
||||
self.selected_index += current_items_per_row
|
||||
else:
|
||||
self.selected_index = len(self.video_files) - 1
|
||||
self._update_scroll()
|
||||
elif key == ord('a') or key == ord('A'): # A - Left
|
||||
if self.selected_index > 0:
|
||||
self.selected_index -= 1
|
||||
self._update_scroll()
|
||||
elif key == ord('d') or key == ord('D'): # D - Right
|
||||
if self.selected_index < len(self.video_files) - 1:
|
||||
self.selected_index += 1
|
||||
self._update_scroll()
|
||||
elif key == ord('Q'): # uppercase Q - Fewer items per row (larger thumbnails)
|
||||
if self.items_per_row > 1:
|
||||
self.items_per_row -= 1
|
||||
print(f"Items per row: {self.items_per_row}")
|
||||
elif key == ord('y') or key == ord('Y'): # Y - More items per row (smaller thumbnails)
|
||||
self.items_per_row += 1
|
||||
print(f"Items per row: {self.items_per_row}")
|
||||
|
||||
return "none"
|
||||
|
||||
def _update_scroll(self):
|
||||
"""Update scroll offset based on selected item"""
|
||||
if not self.video_files:
|
||||
return
|
||||
|
||||
# Use fixed items per row
|
||||
items_per_row = min(self.items_per_row, len(self.video_files))
|
||||
|
||||
# Get window dimensions for calculations
|
||||
try:
|
||||
window_rect = cv2.getWindowImageRect("Project View")
|
||||
if window_rect[2] > 0 and window_rect[3] > 0:
|
||||
window_width = window_rect[2]
|
||||
window_height = window_rect[3]
|
||||
else:
|
||||
window_width = self.window_width
|
||||
window_height = self.window_height
|
||||
except:
|
||||
window_width = self.window_width
|
||||
window_height = self.window_height
|
||||
|
||||
# Calculate thumbnail size and item height dynamically
|
||||
thumbnail_width, thumbnail_height = self._calculate_thumbnail_size(window_width)
|
||||
item_height = thumbnail_height + self.PROGRESS_BAR_HEIGHT + self.TEXT_HEIGHT + self.THUMBNAIL_MARGIN
|
||||
|
||||
selected_row = self.selected_index // items_per_row
|
||||
visible_rows = max(1, window_height // item_height)
|
||||
|
||||
# Calculate how many rows we can actually show
|
||||
total_rows = (len(self.video_files) + items_per_row - 1) // items_per_row
|
||||
|
||||
# If we can show all rows, no scrolling needed
|
||||
if total_rows <= visible_rows:
|
||||
self.scroll_offset = 0
|
||||
return
|
||||
|
||||
# Update scroll to keep selected item visible
|
||||
if selected_row < self.scroll_offset:
|
||||
self.scroll_offset = selected_row
|
||||
elif selected_row >= self.scroll_offset + visible_rows:
|
||||
self.scroll_offset = selected_row - visible_rows + 1
|
||||
|
||||
# Ensure scroll offset doesn't go negative or beyond available content
|
||||
self.scroll_offset = max(0, min(self.scroll_offset, total_rows - visible_rows))
|
||||
|
||||
class VideoEditor:
|
||||
# Configuration constants
|
||||
TARGET_FPS = 80 # Target FPS for speed calculations
|
||||
@@ -1947,7 +1261,6 @@ class VideoEditor:
|
||||
|
||||
# Calculate display scaling (how much the frame is scaled to fit on screen)
|
||||
available_height = self.window_height - (0 if self.is_image_mode else self.TIMELINE_HEIGHT)
|
||||
scale_x = frame_width / self.window_width # This is wrong - need to calculate actual display scale
|
||||
|
||||
# Let's use a simpler approach - just proportionally map screen coords to frame coords
|
||||
# This assumes the frame is centered and scaled to fit
|
||||
@@ -3147,7 +2460,7 @@ class VideoEditor:
|
||||
# Draw progress percentage on the left
|
||||
percentage_text = f"{self.progress_bar_progress * 100:.1f}%"
|
||||
text_color = tuple(int(255 * fade_alpha) for _ in range(3))
|
||||
cv2.putText(
|
||||
cv2.putText(
|
||||
frame,
|
||||
percentage_text,
|
||||
(bar_x + 12, bar_y + 22),
|
||||
|
||||
351
croppa/project_view.py
Normal file
351
croppa/project_view.py
Normal file
@@ -0,0 +1,351 @@
|
||||
import cv2
|
||||
import json
|
||||
import numpy as np
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class ProjectView:
|
||||
"""Project view that displays videos in current directory with progress bars"""
|
||||
|
||||
# Project view configuration
|
||||
THUMBNAIL_SIZE = (200, 150) # Width, Height
|
||||
THUMBNAIL_MARGIN = 20
|
||||
PROGRESS_BAR_HEIGHT = 8
|
||||
TEXT_HEIGHT = 30
|
||||
|
||||
# Colors
|
||||
BG_COLOR = (40, 40, 40)
|
||||
THUMBNAIL_BG_COLOR = (60, 60, 60)
|
||||
PROGRESS_BG_COLOR = (80, 80, 80)
|
||||
PROGRESS_FILL_COLOR = (0, 120, 255)
|
||||
TEXT_COLOR = (255, 255, 255)
|
||||
SELECTED_COLOR = (255, 165, 0)
|
||||
|
||||
def __init__(self, directory: Path, video_editor):
|
||||
self.directory = directory
|
||||
self.video_editor = video_editor
|
||||
self.video_files = []
|
||||
self.thumbnails = {}
|
||||
self.progress_data = {}
|
||||
self.selected_index = 0
|
||||
self.scroll_offset = 0
|
||||
self.items_per_row = 2 # Default to 2 items per row
|
||||
self.window_width = 1920 # Increased to accommodate 1080p videos
|
||||
self.window_height = 1200
|
||||
|
||||
self._load_video_files()
|
||||
self._load_progress_data()
|
||||
|
||||
def _calculate_thumbnail_size(self, window_width: int) -> tuple:
|
||||
"""Calculate thumbnail size based on items per row and window width"""
|
||||
available_width = window_width - self.THUMBNAIL_MARGIN
|
||||
item_width = (available_width - (self.items_per_row - 1) * self.THUMBNAIL_MARGIN) // self.items_per_row
|
||||
thumbnail_width = max(50, item_width) # Minimum 50px width
|
||||
thumbnail_height = int(thumbnail_width * self.THUMBNAIL_SIZE[1] / self.THUMBNAIL_SIZE[0]) # Maintain aspect ratio
|
||||
return (thumbnail_width, thumbnail_height)
|
||||
|
||||
def _load_video_files(self):
|
||||
"""Load all video files from directory"""
|
||||
self.video_files = []
|
||||
for file_path in self.directory.iterdir():
|
||||
if (file_path.is_file() and
|
||||
file_path.suffix.lower() in self.video_editor.VIDEO_EXTENSIONS):
|
||||
self.video_files.append(file_path)
|
||||
self.video_files.sort(key=lambda x: x.name)
|
||||
|
||||
def _load_progress_data(self):
|
||||
"""Load progress data from JSON state files"""
|
||||
self.progress_data = {}
|
||||
for video_path in self.video_files:
|
||||
state_file = video_path.with_suffix('.json')
|
||||
if state_file.exists():
|
||||
try:
|
||||
with open(state_file, 'r') as f:
|
||||
state = json.load(f)
|
||||
current_frame = state.get('current_frame', 0)
|
||||
|
||||
# Get total frames from video
|
||||
cap = cv2.VideoCapture(str(video_path))
|
||||
if cap.isOpened():
|
||||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
cap.release()
|
||||
|
||||
if total_frames > 0:
|
||||
progress = current_frame / (total_frames - 1)
|
||||
self.progress_data[video_path] = {
|
||||
'current_frame': current_frame,
|
||||
'total_frames': total_frames,
|
||||
'progress': progress
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"Error loading progress for {video_path.name}: {e}")
|
||||
|
||||
def refresh_progress_data(self):
|
||||
"""Refresh progress data from JSON files (call when editor state changes)"""
|
||||
self._load_progress_data()
|
||||
|
||||
def get_progress_for_video(self, video_path: Path) -> float:
|
||||
"""Get progress (0.0 to 1.0) for a video"""
|
||||
if video_path in self.progress_data:
|
||||
return self.progress_data[video_path]['progress']
|
||||
return 0.0
|
||||
|
||||
def get_thumbnail_for_video(self, video_path: Path, size: tuple = None) -> np.ndarray:
|
||||
"""Get thumbnail for a video, generating it if needed"""
|
||||
if size is None:
|
||||
size = self.THUMBNAIL_SIZE
|
||||
|
||||
# Cache the original thumbnail by video path only (not size)
|
||||
if video_path in self.thumbnails:
|
||||
original_thumbnail = self.thumbnails[video_path]
|
||||
# Resize the cached thumbnail to the requested size
|
||||
return cv2.resize(original_thumbnail, size)
|
||||
|
||||
# Generate original thumbnail on demand (only once per video)
|
||||
try:
|
||||
cap = cv2.VideoCapture(str(video_path))
|
||||
if cap.isOpened():
|
||||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
if total_frames > 0:
|
||||
middle_frame = total_frames // 2
|
||||
cap.set(cv2.CAP_PROP_POS_FRAMES, middle_frame)
|
||||
ret, frame = cap.read()
|
||||
if ret:
|
||||
# Store original thumbnail at original size
|
||||
original_thumbnail = cv2.resize(frame, self.THUMBNAIL_SIZE)
|
||||
self.thumbnails[video_path] = original_thumbnail
|
||||
cap.release()
|
||||
# Return resized version
|
||||
return cv2.resize(original_thumbnail, size)
|
||||
cap.release()
|
||||
except Exception as e:
|
||||
print(f"Error generating thumbnail for {video_path.name}: {e}")
|
||||
|
||||
# Return a placeholder if thumbnail generation failed
|
||||
placeholder = np.full((size[1], size[0], 3),
|
||||
self.THUMBNAIL_BG_COLOR, dtype=np.uint8)
|
||||
return placeholder
|
||||
|
||||
def draw(self) -> np.ndarray:
|
||||
"""Draw the project view"""
|
||||
# Get actual window size dynamically
|
||||
try:
|
||||
# Try to get the actual window size from OpenCV
|
||||
window_rect = cv2.getWindowImageRect("Project View")
|
||||
if window_rect[2] > 0 and window_rect[3] > 0: # width and height > 0
|
||||
actual_width = window_rect[2]
|
||||
actual_height = window_rect[3]
|
||||
else:
|
||||
# Fallback to default size
|
||||
actual_width = self.window_width
|
||||
actual_height = self.window_height
|
||||
except:
|
||||
# Fallback to default size
|
||||
actual_width = self.window_width
|
||||
actual_height = self.window_height
|
||||
|
||||
canvas = np.full((actual_height, actual_width, 3), self.BG_COLOR, dtype=np.uint8)
|
||||
|
||||
if not self.video_files:
|
||||
# No videos message
|
||||
text = "No videos found in directory"
|
||||
font = cv2.FONT_HERSHEY_SIMPLEX
|
||||
text_size = cv2.getTextSize(text, font, 1.0, 2)[0]
|
||||
text_x = (actual_width - text_size[0]) // 2
|
||||
text_y = (actual_height - text_size[1]) // 2
|
||||
cv2.putText(canvas, text, (text_x, text_y), font, 1.0, self.TEXT_COLOR, 2)
|
||||
return canvas
|
||||
|
||||
# Calculate layout - use fixed items_per_row and calculate thumbnail size to fit
|
||||
items_per_row = min(self.items_per_row, len(self.video_files)) # Don't exceed number of videos
|
||||
|
||||
# Calculate thumbnail size to fit the desired number of items per row
|
||||
thumbnail_width, thumbnail_height = self._calculate_thumbnail_size(actual_width)
|
||||
|
||||
# Calculate item height dynamically based on thumbnail size
|
||||
item_height = thumbnail_height + self.PROGRESS_BAR_HEIGHT + self.TEXT_HEIGHT + self.THUMBNAIL_MARGIN
|
||||
|
||||
item_width = (actual_width - (items_per_row + 1) * self.THUMBNAIL_MARGIN) // items_per_row
|
||||
|
||||
# Draw videos in grid
|
||||
for i, video_path in enumerate(self.video_files):
|
||||
row = i // items_per_row
|
||||
col = i % items_per_row
|
||||
|
||||
# Skip if scrolled out of view
|
||||
if row < self.scroll_offset:
|
||||
continue
|
||||
if row > self.scroll_offset + (actual_height // item_height):
|
||||
break
|
||||
|
||||
# Calculate position
|
||||
x = self.THUMBNAIL_MARGIN + col * (item_width + self.THUMBNAIL_MARGIN)
|
||||
y = self.THUMBNAIL_MARGIN + (row - self.scroll_offset) * item_height
|
||||
|
||||
# Draw thumbnail background
|
||||
cv2.rectangle(canvas,
|
||||
(x, y),
|
||||
(x + thumbnail_width, y + thumbnail_height),
|
||||
self.THUMBNAIL_BG_COLOR, -1)
|
||||
|
||||
# Draw selection highlight
|
||||
if i == self.selected_index:
|
||||
cv2.rectangle(canvas,
|
||||
(x - 2, y - 2),
|
||||
(x + thumbnail_width + 2, y + thumbnail_height + 2),
|
||||
self.SELECTED_COLOR, 3)
|
||||
|
||||
# Draw thumbnail
|
||||
thumbnail = self.get_thumbnail_for_video(video_path, (thumbnail_width, thumbnail_height))
|
||||
# Thumbnail is already the correct size, no need to resize
|
||||
resized_thumbnail = thumbnail
|
||||
|
||||
# Ensure thumbnail doesn't exceed canvas bounds
|
||||
end_y = min(y + thumbnail_height, actual_height)
|
||||
end_x = min(x + thumbnail_width, actual_width)
|
||||
thumb_height = end_y - y
|
||||
thumb_width = end_x - x
|
||||
|
||||
if thumb_height > 0 and thumb_width > 0:
|
||||
# Resize thumbnail to fit within bounds if necessary
|
||||
if thumb_height != thumbnail_height or thumb_width != thumbnail_width:
|
||||
resized_thumbnail = cv2.resize(thumbnail, (thumb_width, thumb_height))
|
||||
|
||||
canvas[y:end_y, x:end_x] = resized_thumbnail
|
||||
|
||||
# Draw progress bar
|
||||
progress_y = y + thumbnail_height + 5
|
||||
progress_width = thumbnail_width
|
||||
progress = self.get_progress_for_video(video_path)
|
||||
|
||||
# Progress background
|
||||
cv2.rectangle(canvas,
|
||||
(x, progress_y),
|
||||
(x + progress_width, progress_y + self.PROGRESS_BAR_HEIGHT),
|
||||
self.PROGRESS_BG_COLOR, -1)
|
||||
|
||||
# Progress fill
|
||||
if progress > 0:
|
||||
fill_width = int(progress_width * progress)
|
||||
cv2.rectangle(canvas,
|
||||
(x, progress_y),
|
||||
(x + fill_width, progress_y + self.PROGRESS_BAR_HEIGHT),
|
||||
self.PROGRESS_FILL_COLOR, -1)
|
||||
|
||||
# Draw filename
|
||||
filename = video_path.name
|
||||
# Truncate if too long
|
||||
if len(filename) > 25:
|
||||
filename = filename[:22] + "..."
|
||||
|
||||
text_y = progress_y + self.PROGRESS_BAR_HEIGHT + 20
|
||||
cv2.putText(canvas, filename, (x, text_y),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.6, self.TEXT_COLOR, 2)
|
||||
|
||||
# Draw progress percentage
|
||||
if video_path in self.progress_data:
|
||||
progress_text = f"{progress * 100:.0f}%"
|
||||
text_size = cv2.getTextSize(progress_text, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)[0]
|
||||
progress_text_x = x + progress_width - text_size[0]
|
||||
cv2.putText(canvas, progress_text, (progress_text_x, text_y),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.TEXT_COLOR, 1)
|
||||
|
||||
# Draw instructions
|
||||
instructions = [
|
||||
"Project View - Videos in current directory",
|
||||
"WASD: Navigate | E: Open video | Q: Fewer items per row | Y: More items per row | q: Quit | ESC: Back to editor",
|
||||
f"Showing {len(self.video_files)} videos | {items_per_row} per row | Thumbnail: {thumbnail_width}x{thumbnail_height}"
|
||||
]
|
||||
|
||||
for i, instruction in enumerate(instructions):
|
||||
y_pos = actual_height - 60 + i * 20
|
||||
cv2.putText(canvas, instruction, (10, y_pos),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.5, self.TEXT_COLOR, 1)
|
||||
|
||||
return canvas
|
||||
|
||||
def handle_key(self, key: int) -> str:
|
||||
"""Handle keyboard input, returns action taken"""
|
||||
if key == 27: # ESC
|
||||
return "back_to_editor"
|
||||
elif key == ord('q'): # lowercase q - Quit
|
||||
return "quit"
|
||||
elif key == ord('e') or key == ord('E'): # E - Open video
|
||||
if self.video_files and 0 <= self.selected_index < len(self.video_files):
|
||||
return f"open_video:{self.video_files[self.selected_index]}"
|
||||
elif key == ord('w') or key == ord('W'): # W - Up
|
||||
current_items_per_row = min(self.items_per_row, len(self.video_files))
|
||||
if self.selected_index >= current_items_per_row:
|
||||
self.selected_index -= current_items_per_row
|
||||
else:
|
||||
self.selected_index = 0
|
||||
self._update_scroll()
|
||||
elif key == ord('s') or key == ord('S'): # S - Down
|
||||
current_items_per_row = min(self.items_per_row, len(self.video_files))
|
||||
if self.selected_index + current_items_per_row < len(self.video_files):
|
||||
self.selected_index += current_items_per_row
|
||||
else:
|
||||
self.selected_index = len(self.video_files) - 1
|
||||
self._update_scroll()
|
||||
elif key == ord('a') or key == ord('A'): # A - Left
|
||||
if self.selected_index > 0:
|
||||
self.selected_index -= 1
|
||||
self._update_scroll()
|
||||
elif key == ord('d') or key == ord('D'): # D - Right
|
||||
if self.selected_index < len(self.video_files) - 1:
|
||||
self.selected_index += 1
|
||||
self._update_scroll()
|
||||
elif key == ord('Q'): # uppercase Q - Fewer items per row (larger thumbnails)
|
||||
if self.items_per_row > 1:
|
||||
self.items_per_row -= 1
|
||||
print(f"Items per row: {self.items_per_row}")
|
||||
elif key == ord('y') or key == ord('Y'): # Y - More items per row (smaller thumbnails)
|
||||
self.items_per_row += 1
|
||||
print(f"Items per row: {self.items_per_row}")
|
||||
|
||||
return "none"
|
||||
|
||||
def _update_scroll(self):
|
||||
"""Update scroll offset based on selected item"""
|
||||
if not self.video_files:
|
||||
return
|
||||
|
||||
# Use fixed items per row
|
||||
items_per_row = min(self.items_per_row, len(self.video_files))
|
||||
|
||||
# Get window dimensions for calculations
|
||||
try:
|
||||
window_rect = cv2.getWindowImageRect("Project View")
|
||||
if window_rect[2] > 0 and window_rect[3] > 0:
|
||||
window_width = window_rect[2]
|
||||
window_height = window_rect[3]
|
||||
else:
|
||||
window_width = self.window_width
|
||||
window_height = self.window_height
|
||||
except:
|
||||
window_width = self.window_width
|
||||
window_height = self.window_height
|
||||
|
||||
# Calculate thumbnail size and item height dynamically
|
||||
thumbnail_width, thumbnail_height = self._calculate_thumbnail_size(window_width)
|
||||
item_height = thumbnail_height + self.PROGRESS_BAR_HEIGHT + self.TEXT_HEIGHT + self.THUMBNAIL_MARGIN
|
||||
|
||||
selected_row = self.selected_index // items_per_row
|
||||
visible_rows = max(1, window_height // item_height)
|
||||
|
||||
# Calculate how many rows we can actually show
|
||||
total_rows = (len(self.video_files) + items_per_row - 1) // items_per_row
|
||||
|
||||
# If we can show all rows, no scrolling needed
|
||||
if total_rows <= visible_rows:
|
||||
self.scroll_offset = 0
|
||||
return
|
||||
|
||||
# Update scroll to keep selected item visible
|
||||
if selected_row < self.scroll_offset:
|
||||
self.scroll_offset = selected_row
|
||||
elif selected_row >= self.scroll_offset + visible_rows:
|
||||
self.scroll_offset = selected_row - visible_rows + 1
|
||||
|
||||
# Ensure scroll offset doesn't go negative or beyond available content
|
||||
self.scroll_offset = max(0, min(self.scroll_offset, total_rows - visible_rows))
|
||||
248
croppa/tracking.py
Normal file
248
croppa/tracking.py
Normal file
@@ -0,0 +1,248 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
from typing import Dict, Any
|
||||
|
||||
|
||||
class FeatureTracker:
|
||||
"""Semi-automatic feature tracking with SIFT/SURF/ORB support and full state serialization"""
|
||||
|
||||
def __init__(self):
|
||||
# Feature detection parameters
|
||||
self.detector_type = 'SIFT' # 'SIFT', 'SURF', 'ORB'
|
||||
self.max_features = 1000
|
||||
self.match_threshold = 0.7
|
||||
|
||||
# Tracking state
|
||||
self.features = {} # {frame_number: {'keypoints': [...], 'descriptors': [...], 'positions': [...]}}
|
||||
self.tracking_enabled = False
|
||||
self.auto_tracking = False
|
||||
|
||||
# Initialize detectors
|
||||
self._init_detectors()
|
||||
|
||||
def _init_detectors(self):
|
||||
"""Initialize feature detectors based on type"""
|
||||
try:
|
||||
if self.detector_type == 'SIFT':
|
||||
self.detector = cv2.SIFT_create(nfeatures=self.max_features)
|
||||
elif self.detector_type == 'SURF':
|
||||
# SURF requires opencv-contrib-python, fallback to SIFT
|
||||
print("Warning: SURF requires opencv-contrib-python package. Using SIFT instead.")
|
||||
self.detector = cv2.SIFT_create(nfeatures=self.max_features)
|
||||
self.detector_type = 'SIFT'
|
||||
elif self.detector_type == 'ORB':
|
||||
self.detector = cv2.ORB_create(nfeatures=self.max_features)
|
||||
else:
|
||||
raise ValueError(f"Unknown detector type: {self.detector_type}")
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not initialize {self.detector_type} detector: {e}")
|
||||
# Fallback to ORB
|
||||
self.detector_type = 'ORB'
|
||||
self.detector = cv2.ORB_create(nfeatures=self.max_features)
|
||||
|
||||
def set_detector_type(self, detector_type: str):
|
||||
"""Change detector type and reinitialize"""
|
||||
if detector_type in ['SIFT', 'SURF', 'ORB']:
|
||||
self.detector_type = detector_type
|
||||
self._init_detectors()
|
||||
print(f"Switched to {detector_type} detector")
|
||||
else:
|
||||
print(f"Invalid detector type: {detector_type}")
|
||||
|
||||
def extract_features(self, frame: np.ndarray, frame_number: int, coord_mapper=None) -> bool:
|
||||
"""Extract features from a frame and store them"""
|
||||
try:
|
||||
# Convert to grayscale if needed
|
||||
if len(frame.shape) == 3:
|
||||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
else:
|
||||
gray = frame
|
||||
|
||||
# Extract keypoints and descriptors
|
||||
keypoints, descriptors = self.detector.detectAndCompute(gray, None)
|
||||
|
||||
if keypoints is None or descriptors is None:
|
||||
return False
|
||||
|
||||
# Map coordinates back to original frame space if mapper provided
|
||||
if coord_mapper:
|
||||
mapped_positions = []
|
||||
for kp in keypoints:
|
||||
orig_x, orig_y = coord_mapper(kp.pt[0], kp.pt[1])
|
||||
mapped_positions.append((int(orig_x), int(orig_y)))
|
||||
else:
|
||||
mapped_positions = [(int(kp.pt[0]), int(kp.pt[1])) for kp in keypoints]
|
||||
|
||||
# Store features
|
||||
self.features[frame_number] = {
|
||||
'keypoints': keypoints,
|
||||
'descriptors': descriptors,
|
||||
'positions': mapped_positions
|
||||
}
|
||||
|
||||
print(f"Extracted {len(keypoints)} features from frame {frame_number}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error extracting features from frame {frame_number}: {e}")
|
||||
return False
|
||||
|
||||
def extract_features_from_region(self, frame: np.ndarray, frame_number: int, coord_mapper=None) -> bool:
|
||||
"""Extract features from a frame and ADD them to existing features"""
|
||||
try:
|
||||
# Convert to grayscale if needed
|
||||
if len(frame.shape) == 3:
|
||||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
else:
|
||||
gray = frame
|
||||
|
||||
# Extract keypoints and descriptors
|
||||
keypoints, descriptors = self.detector.detectAndCompute(gray, None)
|
||||
|
||||
if keypoints is None or descriptors is None:
|
||||
return False
|
||||
|
||||
# Map coordinates back to original frame space if mapper provided
|
||||
if coord_mapper:
|
||||
mapped_positions = []
|
||||
for kp in keypoints:
|
||||
orig_x, orig_y = coord_mapper(kp.pt[0], kp.pt[1])
|
||||
mapped_positions.append((int(orig_x), int(orig_y)))
|
||||
else:
|
||||
mapped_positions = [(int(kp.pt[0]), int(kp.pt[1])) for kp in keypoints]
|
||||
|
||||
# Add to existing features or create new entry
|
||||
if frame_number in self.features:
|
||||
# Check if descriptor dimensions match
|
||||
existing_features = self.features[frame_number]
|
||||
if existing_features['descriptors'].shape[1] != descriptors.shape[1]:
|
||||
print(f"Warning: Descriptor dimension mismatch ({existing_features['descriptors'].shape[1]} vs {descriptors.shape[1]}). Cannot concatenate. Replacing features.")
|
||||
# Replace instead of concatenate when dimensions don't match
|
||||
existing_features['keypoints'] = keypoints
|
||||
existing_features['descriptors'] = descriptors
|
||||
existing_features['positions'] = mapped_positions
|
||||
else:
|
||||
# Append to existing features
|
||||
existing_features['keypoints'] = np.concatenate([existing_features['keypoints'], keypoints])
|
||||
existing_features['descriptors'] = np.concatenate([existing_features['descriptors'], descriptors])
|
||||
existing_features['positions'].extend(mapped_positions)
|
||||
print(f"Added {len(keypoints)} features to frame {frame_number} (total: {len(existing_features['positions'])})")
|
||||
else:
|
||||
# Create new features entry
|
||||
self.features[frame_number] = {
|
||||
'keypoints': keypoints,
|
||||
'descriptors': descriptors,
|
||||
'positions': mapped_positions
|
||||
}
|
||||
print(f"Extracted {len(keypoints)} features from frame {frame_number}")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error extracting features from frame {frame_number}: {e}")
|
||||
return False
|
||||
|
||||
def track_features_optical_flow(self, prev_frame, curr_frame, prev_points):
|
||||
"""Track features using Lucas-Kanade optical flow"""
|
||||
try:
|
||||
# Convert to grayscale if needed
|
||||
if len(prev_frame.shape) == 3:
|
||||
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
|
||||
else:
|
||||
prev_gray = prev_frame
|
||||
|
||||
if len(curr_frame.shape) == 3:
|
||||
curr_gray = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY)
|
||||
else:
|
||||
curr_gray = curr_frame
|
||||
|
||||
# Parameters for Lucas-Kanade optical flow
|
||||
lk_params = dict(winSize=(15, 15),
|
||||
maxLevel=2,
|
||||
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))
|
||||
|
||||
# Calculate optical flow
|
||||
new_points, status, _ = cv2.calcOpticalFlowPyrLK(prev_gray, curr_gray, prev_points, None, **lk_params)
|
||||
|
||||
# Filter out bad tracks
|
||||
good_new = new_points[status == 1]
|
||||
good_old = prev_points[status == 1]
|
||||
|
||||
return good_new, good_old, status
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error in optical flow tracking: {e}")
|
||||
return None, None, None
|
||||
|
||||
def clear_features(self):
|
||||
"""Clear all stored features"""
|
||||
self.features.clear()
|
||||
print("All features cleared")
|
||||
|
||||
def get_feature_count(self, frame_number: int) -> int:
|
||||
"""Get number of features for a frame"""
|
||||
if frame_number in self.features:
|
||||
return len(self.features[frame_number]['positions'])
|
||||
return 0
|
||||
|
||||
def serialize_features(self) -> Dict[str, Any]:
|
||||
"""Serialize features for state saving"""
|
||||
serialized = {}
|
||||
|
||||
for frame_num, frame_data in self.features.items():
|
||||
frame_key = str(frame_num)
|
||||
serialized[frame_key] = {
|
||||
'positions': frame_data['positions'],
|
||||
'keypoints': None, # Keypoints are not serialized (too large)
|
||||
'descriptors': None # Descriptors are not serialized (too large)
|
||||
}
|
||||
|
||||
return serialized
|
||||
|
||||
def deserialize_features(self, serialized_data: Dict[str, Any]):
|
||||
"""Deserialize features from state loading"""
|
||||
self.features.clear()
|
||||
|
||||
for frame_key, frame_data in serialized_data.items():
|
||||
frame_num = int(frame_key)
|
||||
self.features[frame_num] = {
|
||||
'positions': frame_data['positions'],
|
||||
'keypoints': None,
|
||||
'descriptors': None
|
||||
}
|
||||
|
||||
print(f"Deserialized features for {len(self.features)} frames")
|
||||
|
||||
def get_state_dict(self) -> Dict[str, Any]:
|
||||
"""Get complete state for serialization"""
|
||||
return {
|
||||
'detector_type': self.detector_type,
|
||||
'max_features': self.max_features,
|
||||
'match_threshold': self.match_threshold,
|
||||
'tracking_enabled': self.tracking_enabled,
|
||||
'auto_tracking': self.auto_tracking,
|
||||
'features': self.serialize_features()
|
||||
}
|
||||
|
||||
def load_state_dict(self, state_dict: Dict[str, Any]):
|
||||
"""Load complete state from serialization"""
|
||||
if 'detector_type' in state_dict:
|
||||
self.detector_type = state_dict['detector_type']
|
||||
self._init_detectors()
|
||||
|
||||
if 'max_features' in state_dict:
|
||||
self.max_features = state_dict['max_features']
|
||||
|
||||
if 'match_threshold' in state_dict:
|
||||
self.match_threshold = state_dict['match_threshold']
|
||||
|
||||
if 'tracking_enabled' in state_dict:
|
||||
self.tracking_enabled = state_dict['tracking_enabled']
|
||||
|
||||
if 'auto_tracking' in state_dict:
|
||||
self.auto_tracking = state_dict['auto_tracking']
|
||||
|
||||
if 'features' in state_dict:
|
||||
self.deserialize_features(state_dict['features'])
|
||||
|
||||
print("Feature tracker state loaded")
|
||||
34
croppa/utils.py
Normal file
34
croppa/utils.py
Normal file
@@ -0,0 +1,34 @@
|
||||
import cv2
|
||||
import ctypes
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
|
||||
def load_image_utf8(image_path):
|
||||
"""Load image with UTF-8 path support using PIL, then convert to OpenCV format"""
|
||||
try:
|
||||
# Use PIL to load image with UTF-8 support
|
||||
pil_image = Image.open(image_path)
|
||||
# Convert PIL image to OpenCV format (BGR)
|
||||
cv_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
|
||||
return cv_image
|
||||
except Exception as e:
|
||||
raise ValueError(f"Could not load image file: {image_path} - {e}")
|
||||
|
||||
|
||||
def get_active_window_title():
|
||||
"""Get the title of the currently active window"""
|
||||
try:
|
||||
# Get handle to foreground window
|
||||
hwnd = ctypes.windll.user32.GetForegroundWindow()
|
||||
|
||||
# Get window title length
|
||||
length = ctypes.windll.user32.GetWindowTextLengthW(hwnd)
|
||||
|
||||
# Create buffer and get window title
|
||||
buffer = ctypes.create_unicode_buffer(length + 1)
|
||||
ctypes.windll.user32.GetWindowTextW(hwnd, buffer, length + 1)
|
||||
|
||||
return buffer.value
|
||||
except:
|
||||
return ""
|
||||
Reference in New Issue
Block a user