Files
cyutils/main.go
2025-10-16 15:25:28 +02:00

220 lines
6.2 KiB
Go

// Package cyutils provides utility functions for concurrent processing, HTTP requests with caching, and rate limiting.
package cyutils
import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"sync"
"golang.org/x/time/rate"
)
// WithWorkers processes items concurrently using a specified number of worker goroutines.
// It accepts either a slice or map of items and applies the provided function to each item.
// The function receives the worker ID, key, and value for each item.
//
// Parameters:
// - workers: Number of concurrent workers to spawn
// - items: Collection of items to process (slice or map)
// - fn: Function to apply to each item, receives (workerID, key, value)
//
// Example:
//
// WithWorkers(3, []string{"a", "b", "c"}, func(worker, key, value string) {
// fmt.Printf("Worker %d processing %s\n", worker, value)
// })
func WithWorkers[K comparable, V any, C []V | map[K]V](
workers int,
items C,
fn func(worker int, key K, value V),
) {
type KV struct {
// A bit of type weaving...
Key any
Value V
}
n := len(items)
ingestChan := make(chan KV, n)
go func() {
switch v := any(items).(type) {
case []V:
for i, it := range v {
ingestChan <- KV{Key: i, Value: it}
}
case map[K]V:
for key, it := range v {
ingestChan <- KV{Key: key, Value: it}
}
}
close(ingestChan)
}()
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(worker int) {
defer wg.Done()
for v := range ingestChan {
fn(worker, v.Key.(K), v.Value)
}
}(i)
}
wg.Wait()
}
// Batched processes an array in batches of the specified size.
// The provided function is called for each batch with the batch slice.
//
// Parameters:
// - arr: Array to process in batches
// - batchSize: Maximum number of items per batch
// - fn: Function to call for each batch
//
// Example:
//
// Batched([]int{1, 2, 3, 4, 5}, 2, func(batch []int) {
// fmt.Printf("Processing batch: %v\n", batch)
// })
func Batched[T any](arr []T, batchSize int, fn func(batch []T)) {
for i := 0; i < len(arr); i += batchSize {
start := i
end := min(i+batchSize, len(arr))
batch := arr[start:end]
fn(batch)
}
}
// RateLimitedTransport wraps an HTTP transport with rate limiting capabilities.
// It enforces request rate limits before executing HTTP requests.
type RateLimitedTransport struct {
base http.RoundTripper // Underlying transport
limiter *rate.Limiter // Rate limiter
}
// RoundTrip enforces rate limiting before executing the HTTP request.
// It waits for rate limiter permission before delegating to the underlying transport.
// Returns an error if the context is cancelled or times out while waiting.
func (t *RateLimitedTransport) RoundTrip(req *http.Request) (*http.Response, error) {
ctx := req.Context()
if err := t.limiter.Wait(ctx); err != nil {
return nil, err // Handle context cancellation/timeout
}
return t.base.RoundTrip(req)
}
// LimitedHTTP creates an HTTP client with rate limiting capabilities.
// The client will respect the specified requests per second (rps) and burst limits.
//
// Parameters:
// - rps: Maximum requests per second
// - burst: Maximum burst capacity for rate limiting
//
// Returns an HTTP client configured with rate limiting and connection pooling.
func LimitedHTTP(rps float64, burst int) *http.Client {
baseTransport := &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
}
// Initialize rate limiter
limiter := rate.NewLimiter(rate.Limit(rps), burst)
// Wrap transport with rate limiting
transport := &RateLimitedTransport{
base: baseTransport,
limiter: limiter,
}
// Return configured client
return &http.Client{
Transport: transport,
}
}
var client = http.DefaultClient
// WithClient sets the default HTTP client used by caching functions.
// This allows customization of the HTTP client for all subsequent cached requests.
//
// Parameters:
// - nclient: The HTTP client to use for future requests
func WithClient(nclient *http.Client) {
client = nclient
}
// RequestCached performs an HTTP request with file-based caching.
// If the cache file exists, it returns the cached data. Otherwise, it makes the HTTP request,
// caches the response, and returns the result. For non-[]byte types, it attempts JSON unmarshaling.
//
// Parameters:
// - req: HTTP request to execute
// - filename: Path to cache file
//
// Returns the cached or fetched data of type T, or an error if the request fails.
//
// Example:
//
// var result MyStruct
// req, _ := http.NewRequest("GET", "https://api.example.com/data", nil)
// err := RequestCached(req, "cache/data.json", &result)
func RequestCached[T any](req *http.Request, filename string) (T, error) {
var zero T
data, err := os.ReadFile(filename)
if err != nil {
if !os.IsNotExist(err) {
return zero, fmt.Errorf("failed to read cache: %w", err)
}
resp, httpErr := client.Do(req)
if httpErr != nil {
return zero, fmt.Errorf("HTTP request failed: %w", httpErr)
}
defer resp.Body.Close()
body, readErr := io.ReadAll(resp.Body)
if readErr != nil {
return zero, fmt.Errorf("failed to read response body: %w", readErr)
}
dir := filepath.Dir(filename)
if dir != "." && dir != "" {
if mkErr := os.MkdirAll(dir, 0o755); mkErr != nil {
return zero, fmt.Errorf("failed to create cache dir %s: %w", dir, mkErr)
}
}
if writeErr := os.WriteFile(filename, body, 0o644); writeErr != nil {
return zero, fmt.Errorf("failed to write cache %s: %w", filename, writeErr)
}
data = body
}
if _, ok := any(zero).([]byte); ok {
return any(data).(T), nil
}
var out T
if err := json.Unmarshal(data, &out); err != nil {
return zero, fmt.Errorf("failed to unmarshal cached response: %w", err)
}
return out, nil
}
// RequestCachedBytes is a convenience function for RequestCached that returns raw bytes.
// It's equivalent to calling RequestCached[[]byte] but provides a cleaner API for byte data.
//
// Parameters:
// - req: HTTP request to execute
// - filename: Path to cache file
//
// Returns the cached or fetched data as []byte, or an error if the request fails.
func RequestCachedBytes(req *http.Request, filename string) ([]byte, error) {
return RequestCached[[]byte](req, filename)
}