import cv2 from collections import OrderedDict class Cv2BufferedCap: """Buffered wrapper around cv2.VideoCapture that handles frame loading, seeking, and caching correctly""" def __init__(self, video_path, backend=None, cache_size=10000): self.video_path = video_path self.cap = cv2.VideoCapture(str(video_path), backend) if not self.cap.isOpened(): raise ValueError(f"Could not open video: {video_path}") # Video properties self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.fps = self.cap.get(cv2.CAP_PROP_FPS) self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Current position tracking self.current_frame = 0 # Frame cache (LRU) self.cache_size = cache_size self.frame_cache = OrderedDict() def get_frame(self, frame_number): """Get frame at specific index - always accurate""" # Clamp frame number to valid range frame_number = max(0, min(frame_number, self.total_frames - 1)) # Check cache first if frame_number in self.frame_cache: self.frame_cache.move_to_end(frame_number) return self.frame_cache[frame_number] # Optimize for sequential reading (next frame) if frame_number == self.current_frame + 1: ret, frame = self.cap.read() else: # Seek for non-sequential access self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) ret, frame = self.cap.read() if ret: self.current_frame = frame_number # Store in cache, evict least recently used if cache is full if len(self.frame_cache) >= self.cache_size: self.frame_cache.popitem(last=False) self.frame_cache[frame_number] = frame self.frame_cache.move_to_end(frame_number) return frame else: raise ValueError(f"Failed to read frame {frame_number}") def advance_frame(self, frames=1): """Advance by specified number of frames""" new_frame = self.current_frame + frames return self.get_frame(new_frame) def release(self): """Release the video capture""" if self.cap: self.cap.release() def isOpened(self): """Check if capture is opened""" return self.cap and self.cap.isOpened()