package cyutils import ( "net/http" "sync" "time" "golang.org/x/time/rate" ) func WithWorkers[T any](workers int, arr []T, fn func(worker int, item T)) { workersChan := make(chan struct{}, workers) ingestChan := make(chan T, len(arr)) for _, item := range arr { ingestChan <- item } close(ingestChan) wg := sync.WaitGroup{} for i := 0; i < workers; i++ { workersChan <- struct{}{} wg.Add(1) go func(worker int) { defer wg.Done() defer func() { <-workersChan }() for { item, ok := <-ingestChan // We're done processing totally if !ok { return } fn(worker, item) } }(i) } wg.Wait() } func Batched[T any](arr []T, batchSize int, fn func(batch []T)) { for i := 0; i < len(arr); i += batchSize { start := i end := min(i+batchSize, len(arr)) batch := arr[start:end] fn(batch) } } type RateLimitedTransport struct { base http.RoundTripper // Underlying transport limiter *rate.Limiter // Rate limiter } // RoundTrip enforces rate limiting before executing the request func (t *RateLimitedTransport) RoundTrip(req *http.Request) (*http.Response, error) { ctx := req.Context() if err := t.limiter.Wait(ctx); err != nil { return nil, err // Handle context cancellation/timeout } return t.base.RoundTrip(req) } func LimitedHttp(rps float64, burst int, timeout time.Duration) *http.Client { baseTransport := &http.Transport{ MaxIdleConns: 100, MaxIdleConnsPerHost: 10, IdleConnTimeout: 10 * time.Second, } // Initialize rate limiter limiter := rate.NewLimiter(rate.Limit(rps), burst) // Wrap transport with rate limiting transport := &RateLimitedTransport{ base: baseTransport, limiter: limiter, } // Return configured client return &http.Client{ Transport: transport, Timeout: timeout, } }