Files
cyutils/main.go

83 lines
1.8 KiB
Go

package cyutils
import (
"net/http"
"sync"
"time"
"golang.org/x/time/rate"
)
func WithWorkers[T any](workers int, arr []T, fn func(worker int, item T)) {
workersChan := make(chan struct{}, workers)
ingestChan := make(chan T, len(arr))
for _, item := range arr {
ingestChan <- item
}
close(ingestChan)
wg := sync.WaitGroup{}
for i := 0; i < workers; i++ {
workersChan <- struct{}{}
wg.Add(1)
go func(worker int) {
defer wg.Done()
defer func() { <-workersChan }()
for {
item, ok := <-ingestChan
// We're done processing totally
if !ok {
return
}
fn(worker, item)
}
}(i)
}
wg.Wait()
}
func Batched[T any](arr []T, batchSize int, fn func(batch []T)) {
for i := 0; i < len(arr); i += batchSize {
start := i
end := min(i+batchSize, len(arr))
batch := arr[start:end]
fn(batch)
}
}
type RateLimitedTransport struct {
base http.RoundTripper // Underlying transport
limiter *rate.Limiter // Rate limiter
}
// RoundTrip enforces rate limiting before executing the request
func (t *RateLimitedTransport) RoundTrip(req *http.Request) (*http.Response, error) {
ctx := req.Context()
if err := t.limiter.Wait(ctx); err != nil {
return nil, err // Handle context cancellation/timeout
}
return t.base.RoundTrip(req)
}
func LimitedHttp(rps float64, burst int, timeout time.Duration) *http.Client {
baseTransport := &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 10 * time.Second,
}
// Initialize rate limiter
limiter := rate.NewLimiter(rate.Limit(rps), burst)
// Wrap transport with rate limiting
transport := &RateLimitedTransport{
base: baseTransport,
limiter: limiter,
}
// Return configured client
return &http.Client{
Transport: transport,
Timeout: timeout,
}
}