// Package cyutils provides utility functions for concurrent processing, HTTP requests with caching, and rate limiting. package cyutils import ( "encoding/json" "fmt" "io" "net/http" "os" "path/filepath" "reflect" "sync" "github.com/bits-and-blooms/bloom/v3" "golang.org/x/time/rate" ) // WithWorkers processes items concurrently using a specified number of worker goroutines. // It accepts either a slice or map of items and applies the provided function to each item. // The function receives the worker ID, key, and value for each item. // // Parameters: // - workers: Number of concurrent workers to spawn // - items: Collection of items to process (slice or map) // - fn: Function to apply to each item, receives (workerID, key, value) // // Example: // // WithWorkers(3, []string{"a", "b", "c"}, func(worker, key, value string) { // fmt.Printf("Worker %d processing %s\n", worker, value) // }) func WithWorkers[K comparable, V any, C []V | map[K]V]( workers int, items C, fn func(worker int, key K, value V), ) { type KV struct { // A bit of type weaving... Key any Value V } n := len(items) ingestChan := make(chan KV, n) go func() { switch v := any(items).(type) { case []V: for i, it := range v { ingestChan <- KV{Key: i, Value: it} } case map[K]V: for key, it := range v { ingestChan <- KV{Key: key, Value: it} } } close(ingestChan) }() var wg sync.WaitGroup wg.Add(workers) for i := 0; i < workers; i++ { go func(worker int) { defer wg.Done() for v := range ingestChan { fn(worker, v.Key.(K), v.Value) } }(i) } wg.Wait() } // Batched processes an array in batches of the specified size. // The provided function is called for each batch with the batch slice. // // Parameters: // - arr: Array to process in batches // - batchSize: Maximum number of items per batch // - fn: Function to call for each batch // // Example: // // Batched([]int{1, 2, 3, 4, 5}, 2, func(batch []int) { // fmt.Printf("Processing batch: %v\n", batch) // }) func Batched[T any](arr []T, batchSize int, fn func(batch []T)) { for i := 0; i < len(arr); i += batchSize { start := i end := min(i+batchSize, len(arr)) batch := arr[start:end] fn(batch) } } // RateLimitedTransport wraps an HTTP transport with rate limiting capabilities. // It enforces request rate limits before executing HTTP requests. type RateLimitedTransport struct { base http.RoundTripper // Underlying transport limiter *rate.Limiter // Rate limiter } // RoundTrip enforces rate limiting before executing the HTTP request. // It waits for rate limiter permission before delegating to the underlying transport. // Returns an error if the context is cancelled or times out while waiting. func (t *RateLimitedTransport) RoundTrip(req *http.Request) (*http.Response, error) { ctx := req.Context() if err := t.limiter.Wait(ctx); err != nil { return nil, err // Handle context cancellation/timeout } return t.base.RoundTrip(req) } // LimitedHTTP creates an HTTP client with rate limiting capabilities. // The client will respect the specified requests per second (rps) and burst limits. // // Parameters: // - rps: Maximum requests per second // - burst: Maximum burst capacity for rate limiting // // Returns an HTTP client configured with rate limiting and connection pooling. func LimitedHTTP(rps float64, burst int) *http.Client { baseTransport := &http.Transport{ MaxIdleConns: 100, MaxIdleConnsPerHost: 10, } // Initialize rate limiter limiter := rate.NewLimiter(rate.Limit(rps), burst) // Wrap transport with rate limiting transport := &RateLimitedTransport{ base: baseTransport, limiter: limiter, } // Return configured client return &http.Client{ Transport: transport, } } var client = http.DefaultClient // WithClient sets the default HTTP client used by caching functions. // This allows customization of the HTTP client for all subsequent cached requests. // // Parameters: // - nclient: The HTTP client to use for future requests func WithClient(nclient *http.Client) { client = nclient } // RequestCached performs an HTTP request with file-based caching. // If the cache file exists, it returns the cached data. Otherwise, it makes the HTTP request, // caches the response, and returns the result. For non-[]byte types, it attempts JSON unmarshaling. // // Parameters: // - req: HTTP request to execute // - filename: Path to cache file // // Returns the cached or fetched data of type T, or an error if the request fails. // // Example: // // var result MyStruct // req, _ := http.NewRequest("GET", "https://api.example.com/data", nil) // err := RequestCached(req, "cache/data.json", &result) func RequestCached[T any](req *http.Request, filename string) (T, error) { var zero T data, err := os.ReadFile(filename) if err != nil { if !os.IsNotExist(err) { return zero, fmt.Errorf("failed to read cache: %w", err) } resp, httpErr := client.Do(req) if httpErr != nil { return zero, fmt.Errorf("HTTP request failed: %w", httpErr) } defer resp.Body.Close() body, readErr := io.ReadAll(resp.Body) if readErr != nil { return zero, fmt.Errorf("failed to read response body: %w", readErr) } dir := filepath.Dir(filename) if dir != "." && dir != "" { if mkErr := os.MkdirAll(dir, 0o755); mkErr != nil { return zero, fmt.Errorf("failed to create cache dir %s: %w", dir, mkErr) } } if writeErr := os.WriteFile(filename, body, 0o644); writeErr != nil { return zero, fmt.Errorf("failed to write cache %s: %w", filename, writeErr) } data = body } if _, ok := any(zero).([]byte); ok { return any(data).(T), nil } var out T if err := json.Unmarshal(data, &out); err != nil { return zero, fmt.Errorf("failed to unmarshal cached response: %w", err) } return out, nil } // RequestCachedBytes is a convenience function for RequestCached that returns raw bytes. // It's equivalent to calling RequestCached[[]byte] but provides a cleaner API for byte data. // // Parameters: // - req: HTTP request to execute // - filename: Path to cache file // // Returns the cached or fetched data as []byte, or an error if the request fails. func RequestCachedBytes(req *http.Request, filename string) ([]byte, error) { return RequestCached[[]byte](req, filename) } // Memoized creates a memoized version of the provided function using reflection. // The returned function caches results based on the string representation of arguments, // returning cached results for repeated calls with the same arguments. // // Parameters: // - fn: The function to memoize (must be a function type) // // Returns a memoized version of the function with the same signature. // // Example: // // func expensive(a string, b int) (string, error) { // fmt.Println("Computing...") // return fmt.Sprintf("Result: %s, %d", a, b), nil // } // // memoized := Memoized(expensive).(func(string, int) (string, error)) // result, _ := memoized("hello", 42) // Computes // result, _ = memoized("hello", 42) // Returns cached result // // Note: The cache key is generated using fmt.Sprintf("%v", args), which may not // uniquely identify all argument combinations. This function is not thread-safe. func Memoized(fn interface{}) interface{} { fnVal := reflect.ValueOf(fn) fnType := fnVal.Type() cache := make(map[string][]reflect.Value) return reflect.MakeFunc(fnType, func(args []reflect.Value) []reflect.Value { key := fmt.Sprintf("%v", args) if cached, ok := cache[key]; ok { return cached } results := fnVal.Call(args) cache[key] = results return results }).Interface() } // MemoizedBloom creates a memoized version of the provided function using a Bloom filter // to avoid caching "one hit wonders". Results are only cached on the second invocation // with the same arguments, reducing memory usage for functions with many unique calls. // // The function uses a Bloom filter to track which arguments have been seen: // - First call: Executes function, adds to Bloom filter, returns result (not cached) // - Second call: Executes function, caches result, returns result // - Subsequent calls: Returns cached result // // Parameters: // - fn: The function to memoize (must be a function type) // - capacity: Expected number of unique argument combinations (default: 10000) // - falsePositiveRate: Desired false positive rate for Bloom filter (default: 0.01) // // Returns a memoized version of the function with the same signature. // // Example: // // func expensive(a string, b int) (string, error) { // fmt.Println("Computing...") // return fmt.Sprintf("Result: %s, %d", a, b), nil // } // // memoized := MemoizedBloom(expensive, 10000, 0.01).(func(string, int) (string, error)) // result, _ := memoized("hello", 42) // Computes, adds to bloom filter // result, _ = memoized("hello", 42) // Computes again, caches result // result, _ = memoized("hello", 42) // Returns cached result // // Note: The cache key is generated using fmt.Sprintf("%v", args), which may not // uniquely identify all argument combinations. This function is thread-safe. func MemoizedBloom(fn interface{}, capacity uint, falsePositiveRate float64) interface{} { fnVal := reflect.ValueOf(fn) fnType := fnVal.Type() if capacity == 0 { capacity = 10000 } if falsePositiveRate == 0 { falsePositiveRate = 0.01 } bf := bloom.NewWithEstimates(capacity, falsePositiveRate) cache := make(map[string][]reflect.Value) var mu sync.Mutex return reflect.MakeFunc(fnType, func(args []reflect.Value) []reflect.Value { key := fmt.Sprintf("%v", args) keyBytes := []byte(key) mu.Lock() defer mu.Unlock() if cached, ok := cache[key]; ok { return cached } results := fnVal.Call(args) if bf.Test(keyBytes) { cache[key] = results } else { bf.Add(keyBytes) } return results }).Interface() }