package cyutils import ( "encoding/json" "fmt" "io" "net/http" "os" "path/filepath" "sync" "golang.org/x/time/rate" ) func WithWorkers[T any](workers int, arr []T, fn func(worker int, item T)) { workersChan := make(chan struct{}, workers) ingestChan := make(chan T, len(arr)) for _, item := range arr { ingestChan <- item } close(ingestChan) wg := sync.WaitGroup{} for i := 0; i < workers; i++ { workersChan <- struct{}{} wg.Add(1) go func(worker int) { defer wg.Done() defer func() { <-workersChan }() for { item, ok := <-ingestChan // We're done processing totally if !ok { return } fn(worker, item) } }(i) } wg.Wait() } func Batched[T any](arr []T, batchSize int, fn func(batch []T)) { for i := 0; i < len(arr); i += batchSize { start := i end := min(i+batchSize, len(arr)) batch := arr[start:end] fn(batch) } } type RateLimitedTransport struct { base http.RoundTripper // Underlying transport limiter *rate.Limiter // Rate limiter } // RoundTrip enforces rate limiting before executing the request func (t *RateLimitedTransport) RoundTrip(req *http.Request) (*http.Response, error) { ctx := req.Context() if err := t.limiter.Wait(ctx); err != nil { return nil, err // Handle context cancellation/timeout } return t.base.RoundTrip(req) } func LimitedHttp(rps float64, burst int) *http.Client { baseTransport := &http.Transport{ MaxIdleConns: 100, MaxIdleConnsPerHost: 10, } // Initialize rate limiter limiter := rate.NewLimiter(rate.Limit(rps), burst) // Wrap transport with rate limiting transport := &RateLimitedTransport{ base: baseTransport, limiter: limiter, } // Return configured client return &http.Client{ Transport: transport, } } func RequestCached[T any](req *http.Request, filename string) (T, error) { var zero T data, err := os.ReadFile(filename) if err != nil { if !os.IsNotExist(err) { return zero, fmt.Errorf("failed to read cache: %w", err) } resp, httpErr := http.DefaultClient.Do(req) if httpErr != nil { return zero, fmt.Errorf("HTTP request failed: %w", httpErr) } defer resp.Body.Close() body, readErr := io.ReadAll(resp.Body) if readErr != nil { return zero, fmt.Errorf("failed to read response body: %w", readErr) } dir := filepath.Dir(filename) if dir != "." && dir != "" { if mkErr := os.MkdirAll(dir, 0o755); mkErr != nil { return zero, fmt.Errorf("failed to create cache dir %s: %w", dir, mkErr) } } if writeErr := os.WriteFile(filename, body, 0o644); writeErr != nil { return zero, fmt.Errorf("failed to write cache %s: %w", filename, writeErr) } data = body } if _, ok := any(zero).([]byte); ok { return any(data).(T), nil } var out T if err := json.Unmarshal(data, &out); err != nil { return zero, fmt.Errorf("failed to unmarshal cached response: %w", err) } return out, nil } func RequestCachedBytes(req *http.Request, filename string) ([]byte, error) { return RequestCached[[]byte](req, filename) }