From cdeaf1de787951b01252845545d91dde2d4f8461 Mon Sep 17 00:00:00 2001 From: PhatPhuckDave Date: Tue, 3 Jun 2025 16:10:46 +0200 Subject: [PATCH] Initial commit --- go.mod | 3 +++ main.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) create mode 100644 go.mod create mode 100644 main.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..20eac22 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module git.site.quack-lab.dev/dave/cyutils + +go 1.23.6 diff --git a/main.go b/main.go new file mode 100644 index 0000000..099d7c6 --- /dev/null +++ b/main.go @@ -0,0 +1,42 @@ +package cyutils + +import ( + "sync" +) + +func WithWorkers[T any](workers int, arr []T, fn func(worker int, item T)) { + workersChan := make(chan struct{}, workers) + ingestChan := make(chan T, len(arr)) + + for _, item := range arr { + ingestChan <- item + } + close(ingestChan) + + wg := sync.WaitGroup{} + for i := 0; i < workers; i++ { + workersChan <- struct{}{} + wg.Add(1) + go func(worker int) { + defer wg.Done() + defer func() { <-workersChan }() + for { + item, ok := <-ingestChan + // We're done processing totally + if !ok { + return + } + fn(worker, item) + } + }(i) + } + wg.Wait() +} +func Batched[T any](arr []T, batchSize int, fn func(batch []T)) { + for i := 0; i < len(arr); i += batchSize { + start := i + end := min(i+batchSize, len(arr)) + batch := arr[start:end] + fn(batch) + } +}