Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
c8d3e5f9ca | |||
96bd7a110d |
57
main.go
57
main.go
@@ -12,34 +12,47 @@ import (
|
|||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
func WithWorkers[T any](workers int, arr []T, fn func(worker int, item T)) {
|
func WithWorkers[K comparable, V any, C []V | map[K]V](
|
||||||
workersChan := make(chan struct{}, workers)
|
workers int,
|
||||||
ingestChan := make(chan T, len(arr))
|
items C,
|
||||||
|
fn func(worker int, key K, value V),
|
||||||
for _, item := range arr {
|
) {
|
||||||
ingestChan <- item
|
type KV struct {
|
||||||
|
// A bit of type weaving...
|
||||||
|
Key any
|
||||||
|
Value V
|
||||||
}
|
}
|
||||||
close(ingestChan)
|
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
n := len(items)
|
||||||
|
ingestChan := make(chan KV, n)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
switch v := any(items).(type) {
|
||||||
|
case []V:
|
||||||
|
for i, it := range v {
|
||||||
|
ingestChan <- KV{Key: i, Value: it}
|
||||||
|
}
|
||||||
|
case map[K]V:
|
||||||
|
for key, it := range v {
|
||||||
|
ingestChan <- KV{Key: key, Value: it}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(ingestChan)
|
||||||
|
}()
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(workers)
|
||||||
for i := 0; i < workers; i++ {
|
for i := 0; i < workers; i++ {
|
||||||
workersChan <- struct{}{}
|
|
||||||
wg.Add(1)
|
|
||||||
go func(worker int) {
|
go func(worker int) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
defer func() { <-workersChan }()
|
for v := range ingestChan {
|
||||||
for {
|
fn(worker, v.Key.(K), v.Value)
|
||||||
item, ok := <-ingestChan
|
|
||||||
// We're done processing totally
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
fn(worker, item)
|
|
||||||
}
|
}
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func Batched[T any](arr []T, batchSize int, fn func(batch []T)) {
|
func Batched[T any](arr []T, batchSize int, fn func(batch []T)) {
|
||||||
for i := 0; i < len(arr); i += batchSize {
|
for i := 0; i < len(arr); i += batchSize {
|
||||||
start := i
|
start := i
|
||||||
@@ -83,6 +96,12 @@ func LimitedHttp(rps float64, burst int) *http.Client {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var client = http.DefaultClient
|
||||||
|
|
||||||
|
func WithClient(nclient *http.Client) {
|
||||||
|
client = nclient
|
||||||
|
}
|
||||||
|
|
||||||
func RequestCached[T any](req *http.Request, filename string) (T, error) {
|
func RequestCached[T any](req *http.Request, filename string) (T, error) {
|
||||||
var zero T
|
var zero T
|
||||||
|
|
||||||
@@ -92,7 +111,7 @@ func RequestCached[T any](req *http.Request, filename string) (T, error) {
|
|||||||
return zero, fmt.Errorf("failed to read cache: %w", err)
|
return zero, fmt.Errorf("failed to read cache: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, httpErr := http.DefaultClient.Do(req)
|
resp, httpErr := client.Do(req)
|
||||||
if httpErr != nil {
|
if httpErr != nil {
|
||||||
return zero, fmt.Errorf("HTTP request failed: %w", httpErr)
|
return zero, fmt.Errorf("HTTP request failed: %w", httpErr)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user