4 Commits

104
main.go
View File

@@ -1,41 +1,58 @@
package cyutils package cyutils
import ( import (
"encoding/json"
"fmt"
"io"
"net/http" "net/http"
"os"
"path/filepath"
"sync" "sync"
"time"
"golang.org/x/time/rate" "golang.org/x/time/rate"
) )
func WithWorkers[T any](workers int, arr []T, fn func(worker int, item T)) { func WithWorkers[K comparable, V any, C []V | map[K]V](
workersChan := make(chan struct{}, workers) workers int,
ingestChan := make(chan T, len(arr)) items C,
fn func(worker int, key K, value V),
) {
type KV struct {
// A bit of type weaving...
Key any
Value V
}
for _, item := range arr { n := len(items)
ingestChan <- item ingestChan := make(chan KV, n)
go func() {
switch v := any(items).(type) {
case []V:
for i, it := range v {
ingestChan <- KV{Key: i, Value: it}
}
case map[K]V:
for key, it := range v {
ingestChan <- KV{Key: key, Value: it}
}
} }
close(ingestChan) close(ingestChan)
}()
wg := sync.WaitGroup{} var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
workersChan <- struct{}{}
wg.Add(1)
go func(worker int) { go func(worker int) {
defer wg.Done() defer wg.Done()
defer func() { <-workersChan }() for v := range ingestChan {
for { fn(worker, v.Key.(K), v.Value)
item, ok := <-ingestChan
// We're done processing totally
if !ok {
return
}
fn(worker, item)
} }
}(i) }(i)
} }
wg.Wait() wg.Wait()
} }
func Batched[T any](arr []T, batchSize int, fn func(batch []T)) { func Batched[T any](arr []T, batchSize int, fn func(batch []T)) {
for i := 0; i < len(arr); i += batchSize { for i := 0; i < len(arr); i += batchSize {
start := i start := i
@@ -78,3 +95,56 @@ func LimitedHttp(rps float64, burst int) *http.Client {
Transport: transport, Transport: transport,
} }
} }
var client = http.DefaultClient
func WithClient(nclient *http.Client) {
client = nclient
}
func RequestCached[T any](req *http.Request, filename string) (T, error) {
var zero T
data, err := os.ReadFile(filename)
if err != nil {
if !os.IsNotExist(err) {
return zero, fmt.Errorf("failed to read cache: %w", err)
}
resp, httpErr := client.Do(req)
if httpErr != nil {
return zero, fmt.Errorf("HTTP request failed: %w", httpErr)
}
defer resp.Body.Close()
body, readErr := io.ReadAll(resp.Body)
if readErr != nil {
return zero, fmt.Errorf("failed to read response body: %w", readErr)
}
dir := filepath.Dir(filename)
if dir != "." && dir != "" {
if mkErr := os.MkdirAll(dir, 0o755); mkErr != nil {
return zero, fmt.Errorf("failed to create cache dir %s: %w", dir, mkErr)
}
}
if writeErr := os.WriteFile(filename, body, 0o644); writeErr != nil {
return zero, fmt.Errorf("failed to write cache %s: %w", filename, writeErr)
}
data = body
}
if _, ok := any(zero).([]byte); ok {
return any(data).(T), nil
}
var out T
if err := json.Unmarshal(data, &out); err != nil {
return zero, fmt.Errorf("failed to unmarshal cached response: %w", err)
}
return out, nil
}
func RequestCachedBytes(req *http.Request, filename string) ([]byte, error) {
return RequestCached[[]byte](req, filename)
}