diff --git a/main.go b/main.go index fadf173..90aa3aa 100644 --- a/main.go +++ b/main.go @@ -12,34 +12,47 @@ import ( "golang.org/x/time/rate" ) -func WithWorkers[T any](workers int, arr []T, fn func(worker int, item T)) { - workersChan := make(chan struct{}, workers) - ingestChan := make(chan T, len(arr)) - - for _, item := range arr { - ingestChan <- item +func WithWorkers[K comparable, V any, C []V | map[K]V]( + workers int, + items C, + fn func(worker int, key K, value V), +) { + type KV struct { + // A bit of type weaving... + Key any + Value V } - close(ingestChan) - wg := sync.WaitGroup{} + n := len(items) + ingestChan := make(chan KV, n) + + go func() { + switch v := any(items).(type) { + case []V: + for i, it := range v { + ingestChan <- KV{Key: i, Value: it} + } + case map[K]V: + for key, it := range v { + ingestChan <- KV{Key: key, Value: it} + } + } + close(ingestChan) + }() + + var wg sync.WaitGroup + wg.Add(workers) for i := 0; i < workers; i++ { - workersChan <- struct{}{} - wg.Add(1) go func(worker int) { defer wg.Done() - defer func() { <-workersChan }() - for { - item, ok := <-ingestChan - // We're done processing totally - if !ok { - return - } - fn(worker, item) + for v := range ingestChan { + fn(worker, v.Key.(K), v.Value) } }(i) } wg.Wait() } + func Batched[T any](arr []T, batchSize int, fn func(batch []T)) { for i := 0; i < len(arr); i += batchSize { start := i