80 lines
1.7 KiB
Go
80 lines
1.7 KiB
Go
package cyutils
|
|
|
|
import (
|
|
"net/http"
|
|
"sync"
|
|
|
|
"golang.org/x/time/rate"
|
|
)
|
|
|
|
func WithWorkers[T any](workers int, arr []T, fn func(worker int, item T)) {
|
|
workersChan := make(chan struct{}, workers)
|
|
ingestChan := make(chan T, len(arr))
|
|
|
|
for _, item := range arr {
|
|
ingestChan <- item
|
|
}
|
|
close(ingestChan)
|
|
|
|
wg := sync.WaitGroup{}
|
|
for i := 0; i < workers; i++ {
|
|
workersChan <- struct{}{}
|
|
wg.Add(1)
|
|
go func(worker int) {
|
|
defer wg.Done()
|
|
defer func() { <-workersChan }()
|
|
for {
|
|
item, ok := <-ingestChan
|
|
// We're done processing totally
|
|
if !ok {
|
|
return
|
|
}
|
|
fn(worker, item)
|
|
}
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
func Batched[T any](arr []T, batchSize int, fn func(batch []T)) {
|
|
for i := 0; i < len(arr); i += batchSize {
|
|
start := i
|
|
end := min(i+batchSize, len(arr))
|
|
batch := arr[start:end]
|
|
fn(batch)
|
|
}
|
|
}
|
|
|
|
type RateLimitedTransport struct {
|
|
base http.RoundTripper // Underlying transport
|
|
limiter *rate.Limiter // Rate limiter
|
|
}
|
|
|
|
// RoundTrip enforces rate limiting before executing the request
|
|
func (t *RateLimitedTransport) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
ctx := req.Context()
|
|
if err := t.limiter.Wait(ctx); err != nil {
|
|
return nil, err // Handle context cancellation/timeout
|
|
}
|
|
return t.base.RoundTrip(req)
|
|
}
|
|
func LimitedHttp(rps float64, burst int) *http.Client {
|
|
baseTransport := &http.Transport{
|
|
MaxIdleConns: 100,
|
|
MaxIdleConnsPerHost: 10,
|
|
}
|
|
|
|
// Initialize rate limiter
|
|
limiter := rate.NewLimiter(rate.Limit(rps), burst)
|
|
|
|
// Wrap transport with rate limiting
|
|
transport := &RateLimitedTransport{
|
|
base: baseTransport,
|
|
limiter: limiter,
|
|
}
|
|
|
|
// Return configured client
|
|
return &http.Client{
|
|
Transport: transport,
|
|
}
|
|
}
|