Compare commits
7 Commits
Author | SHA1 | Date | |
---|---|---|---|
c8d3e5f9ca | |||
96bd7a110d | |||
7b94462388 | |||
b2c5d8f41c | |||
4d2c1622d3 | |||
82d1c5a8c8 | |||
c07fb20a8a |
2
go.mod
2
go.mod
@@ -1,3 +1,5 @@
|
||||
module git.site.quack-lab.dev/dave/cyutils
|
||||
|
||||
go 1.23.6
|
||||
|
||||
require golang.org/x/time v0.12.0
|
||||
|
2
go.sum
Normal file
2
go.sum
Normal file
@@ -0,0 +1,2 @@
|
||||
golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE=
|
||||
golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
|
140
main.go
140
main.go
@@ -1,37 +1,58 @@
|
||||
package cyutils
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
func WithWorkers[T any](workers int, arr []T, fn func(worker int, item T)) {
|
||||
workersChan := make(chan struct{}, workers)
|
||||
ingestChan := make(chan T, len(arr))
|
||||
func WithWorkers[K comparable, V any, C []V | map[K]V](
|
||||
workers int,
|
||||
items C,
|
||||
fn func(worker int, key K, value V),
|
||||
) {
|
||||
type KV struct {
|
||||
// A bit of type weaving...
|
||||
Key any
|
||||
Value V
|
||||
}
|
||||
|
||||
for _, item := range arr {
|
||||
ingestChan <- item
|
||||
n := len(items)
|
||||
ingestChan := make(chan KV, n)
|
||||
|
||||
go func() {
|
||||
switch v := any(items).(type) {
|
||||
case []V:
|
||||
for i, it := range v {
|
||||
ingestChan <- KV{Key: i, Value: it}
|
||||
}
|
||||
case map[K]V:
|
||||
for key, it := range v {
|
||||
ingestChan <- KV{Key: key, Value: it}
|
||||
}
|
||||
}
|
||||
close(ingestChan)
|
||||
}()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(workers)
|
||||
for i := 0; i < workers; i++ {
|
||||
workersChan <- struct{}{}
|
||||
wg.Add(1)
|
||||
go func(worker int) {
|
||||
defer wg.Done()
|
||||
defer func() { <-workersChan }()
|
||||
for {
|
||||
item, ok := <-ingestChan
|
||||
// We're done processing totally
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
fn(worker, item)
|
||||
for v := range ingestChan {
|
||||
fn(worker, v.Key.(K), v.Value)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func Batched[T any](arr []T, batchSize int, fn func(batch []T)) {
|
||||
for i := 0; i < len(arr); i += batchSize {
|
||||
start := i
|
||||
@@ -40,3 +61,90 @@ func Batched[T any](arr []T, batchSize int, fn func(batch []T)) {
|
||||
fn(batch)
|
||||
}
|
||||
}
|
||||
|
||||
type RateLimitedTransport struct {
|
||||
base http.RoundTripper // Underlying transport
|
||||
limiter *rate.Limiter // Rate limiter
|
||||
}
|
||||
|
||||
// RoundTrip enforces rate limiting before executing the request
|
||||
func (t *RateLimitedTransport) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
ctx := req.Context()
|
||||
if err := t.limiter.Wait(ctx); err != nil {
|
||||
return nil, err // Handle context cancellation/timeout
|
||||
}
|
||||
return t.base.RoundTrip(req)
|
||||
}
|
||||
func LimitedHttp(rps float64, burst int) *http.Client {
|
||||
baseTransport := &http.Transport{
|
||||
MaxIdleConns: 100,
|
||||
MaxIdleConnsPerHost: 10,
|
||||
}
|
||||
|
||||
// Initialize rate limiter
|
||||
limiter := rate.NewLimiter(rate.Limit(rps), burst)
|
||||
|
||||
// Wrap transport with rate limiting
|
||||
transport := &RateLimitedTransport{
|
||||
base: baseTransport,
|
||||
limiter: limiter,
|
||||
}
|
||||
|
||||
// Return configured client
|
||||
return &http.Client{
|
||||
Transport: transport,
|
||||
}
|
||||
}
|
||||
|
||||
var client = http.DefaultClient
|
||||
|
||||
func WithClient(nclient *http.Client) {
|
||||
client = nclient
|
||||
}
|
||||
|
||||
func RequestCached[T any](req *http.Request, filename string) (T, error) {
|
||||
var zero T
|
||||
|
||||
data, err := os.ReadFile(filename)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return zero, fmt.Errorf("failed to read cache: %w", err)
|
||||
}
|
||||
|
||||
resp, httpErr := client.Do(req)
|
||||
if httpErr != nil {
|
||||
return zero, fmt.Errorf("HTTP request failed: %w", httpErr)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, readErr := io.ReadAll(resp.Body)
|
||||
if readErr != nil {
|
||||
return zero, fmt.Errorf("failed to read response body: %w", readErr)
|
||||
}
|
||||
|
||||
dir := filepath.Dir(filename)
|
||||
if dir != "." && dir != "" {
|
||||
if mkErr := os.MkdirAll(dir, 0o755); mkErr != nil {
|
||||
return zero, fmt.Errorf("failed to create cache dir %s: %w", dir, mkErr)
|
||||
}
|
||||
}
|
||||
if writeErr := os.WriteFile(filename, body, 0o644); writeErr != nil {
|
||||
return zero, fmt.Errorf("failed to write cache %s: %w", filename, writeErr)
|
||||
}
|
||||
data = body
|
||||
}
|
||||
|
||||
if _, ok := any(zero).([]byte); ok {
|
||||
return any(data).(T), nil
|
||||
}
|
||||
|
||||
var out T
|
||||
if err := json.Unmarshal(data, &out); err != nil {
|
||||
return zero, fmt.Errorf("failed to unmarshal cached response: %w", err)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func RequestCachedBytes(req *http.Request, filename string) ([]byte, error) {
|
||||
return RequestCached[[]byte](req, filename)
|
||||
}
|
||||
|
Reference in New Issue
Block a user