import cv2 class Cv2BufferedCap: """Buffered wrapper around cv2.VideoCapture that handles frame loading, seeking, and caching correctly""" def __init__(self, video_path, backend=None): self.video_path = video_path self.cap = cv2.VideoCapture(str(video_path), backend) if not self.cap.isOpened(): raise ValueError(f"Could not open video: {video_path}") # Video properties self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.fps = self.cap.get(cv2.CAP_PROP_FPS) self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Frame cache self.frame_cache = {} self.cache_access_order = [] self.MAX_CACHE_FRAMES = 3000 # Current position tracking self.current_frame = 0 def _manage_cache(self): """Manage cache size using LRU eviction""" while len(self.frame_cache) > self.MAX_CACHE_FRAMES: oldest_frame = self.cache_access_order.pop(0) if oldest_frame in self.frame_cache: del self.frame_cache[oldest_frame] def _add_to_cache(self, frame_number, frame): """Add frame to cache""" self.frame_cache[frame_number] = frame.copy() if frame_number in self.cache_access_order: self.cache_access_order.remove(frame_number) self.cache_access_order.append(frame_number) self._manage_cache() def _get_from_cache(self, frame_number): """Get frame from cache and update LRU""" if frame_number in self.frame_cache: if frame_number in self.cache_access_order: self.cache_access_order.remove(frame_number) self.cache_access_order.append(frame_number) return self.frame_cache[frame_number].copy() return None def get_frame(self, frame_number): """Get frame at specific index - always accurate""" # Clamp frame number to valid range frame_number = max(0, min(frame_number, self.total_frames - 1)) # Check cache first cached_frame = self._get_from_cache(frame_number) if cached_frame is not None: self.current_frame = frame_number return cached_frame # Not in cache, seek to frame and read self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) ret, frame = self.cap.read() if ret: self._add_to_cache(frame_number, frame) self.current_frame = frame_number return frame else: raise ValueError(f"Failed to read frame {frame_number}") def advance_frame(self, frames=1): """Advance by specified number of frames""" new_frame = self.current_frame + frames return self.get_frame(new_frame) def release(self): """Release the video capture""" if self.cap: self.cap.release() def isOpened(self): """Check if capture is opened""" return self.cap and self.cap.isOpened()