// Package cyutils provides utility functions for concurrent processing, HTTP requests with caching, and rate limiting. package cyutils import ( "encoding/json" "fmt" "io" "net/http" "os" "path/filepath" "sync" "golang.org/x/time/rate" ) // WithWorkers processes items concurrently using a specified number of worker goroutines. // It accepts either a slice or map of items and applies the provided function to each item. // The function receives the worker ID, key, and value for each item. // // Parameters: // - workers: Number of concurrent workers to spawn // - items: Collection of items to process (slice or map) // - fn: Function to apply to each item, receives (workerID, key, value) // // Example: // // WithWorkers(3, []string{"a", "b", "c"}, func(worker, key, value string) { // fmt.Printf("Worker %d processing %s\n", worker, value) // }) func WithWorkers[K comparable, V any, C []V | map[K]V]( workers int, items C, fn func(worker int, key K, value V), ) { type KV struct { // A bit of type weaving... Key any Value V } n := len(items) ingestChan := make(chan KV, n) go func() { switch v := any(items).(type) { case []V: for i, it := range v { ingestChan <- KV{Key: i, Value: it} } case map[K]V: for key, it := range v { ingestChan <- KV{Key: key, Value: it} } } close(ingestChan) }() var wg sync.WaitGroup wg.Add(workers) for i := 0; i < workers; i++ { go func(worker int) { defer wg.Done() for v := range ingestChan { fn(worker, v.Key.(K), v.Value) } }(i) } wg.Wait() } // Batched processes an array in batches of the specified size. // The provided function is called for each batch with the batch slice. // // Parameters: // - arr: Array to process in batches // - batchSize: Maximum number of items per batch // - fn: Function to call for each batch // // Example: // // Batched([]int{1, 2, 3, 4, 5}, 2, func(batch []int) { // fmt.Printf("Processing batch: %v\n", batch) // }) func Batched[T any](arr []T, batchSize int, fn func(batch []T)) { for i := 0; i < len(arr); i += batchSize { start := i end := min(i+batchSize, len(arr)) batch := arr[start:end] fn(batch) } } // RateLimitedTransport wraps an HTTP transport with rate limiting capabilities. // It enforces request rate limits before executing HTTP requests. type RateLimitedTransport struct { base http.RoundTripper // Underlying transport limiter *rate.Limiter // Rate limiter } // RoundTrip enforces rate limiting before executing the HTTP request. // It waits for rate limiter permission before delegating to the underlying transport. // Returns an error if the context is cancelled or times out while waiting. func (t *RateLimitedTransport) RoundTrip(req *http.Request) (*http.Response, error) { ctx := req.Context() if err := t.limiter.Wait(ctx); err != nil { return nil, err // Handle context cancellation/timeout } return t.base.RoundTrip(req) } // LimitedHTTP creates an HTTP client with rate limiting capabilities. // The client will respect the specified requests per second (rps) and burst limits. // // Parameters: // - rps: Maximum requests per second // - burst: Maximum burst capacity for rate limiting // // Returns an HTTP client configured with rate limiting and connection pooling. func LimitedHTTP(rps float64, burst int) *http.Client { baseTransport := &http.Transport{ MaxIdleConns: 100, MaxIdleConnsPerHost: 10, } // Initialize rate limiter limiter := rate.NewLimiter(rate.Limit(rps), burst) // Wrap transport with rate limiting transport := &RateLimitedTransport{ base: baseTransport, limiter: limiter, } // Return configured client return &http.Client{ Transport: transport, } } var client = http.DefaultClient // WithClient sets the default HTTP client used by caching functions. // This allows customization of the HTTP client for all subsequent cached requests. // // Parameters: // - nclient: The HTTP client to use for future requests func WithClient(nclient *http.Client) { client = nclient } // RequestCached performs an HTTP request with file-based caching. // If the cache file exists, it returns the cached data. Otherwise, it makes the HTTP request, // caches the response, and returns the result. For non-[]byte types, it attempts JSON unmarshaling. // // Parameters: // - req: HTTP request to execute // - filename: Path to cache file // // Returns the cached or fetched data of type T, or an error if the request fails. // // Example: // // var result MyStruct // req, _ := http.NewRequest("GET", "https://api.example.com/data", nil) // err := RequestCached(req, "cache/data.json", &result) func RequestCached[T any](req *http.Request, filename string) (T, error) { var zero T data, err := os.ReadFile(filename) if err != nil { if !os.IsNotExist(err) { return zero, fmt.Errorf("failed to read cache: %w", err) } resp, httpErr := client.Do(req) if httpErr != nil { return zero, fmt.Errorf("HTTP request failed: %w", httpErr) } defer resp.Body.Close() body, readErr := io.ReadAll(resp.Body) if readErr != nil { return zero, fmt.Errorf("failed to read response body: %w", readErr) } dir := filepath.Dir(filename) if dir != "." && dir != "" { if mkErr := os.MkdirAll(dir, 0o755); mkErr != nil { return zero, fmt.Errorf("failed to create cache dir %s: %w", dir, mkErr) } } if writeErr := os.WriteFile(filename, body, 0o644); writeErr != nil { return zero, fmt.Errorf("failed to write cache %s: %w", filename, writeErr) } data = body } if _, ok := any(zero).([]byte); ok { return any(data).(T), nil } var out T if err := json.Unmarshal(data, &out); err != nil { return zero, fmt.Errorf("failed to unmarshal cached response: %w", err) } return out, nil } // RequestCachedBytes is a convenience function for RequestCached that returns raw bytes. // It's equivalent to calling RequestCached[[]byte] but provides a cleaner API for byte data. // // Parameters: // - req: HTTP request to execute // - filename: Path to cache file // // Returns the cached or fetched data as []byte, or an error if the request fails. func RequestCachedBytes(req *http.Request, filename string) ([]byte, error) { return RequestCached[[]byte](req, filename) }