From 7946eb43c50b0529aa9204a4df689417611de36a Mon Sep 17 00:00:00 2001 From: PhatPhuckDave Date: Fri, 23 May 2025 13:33:19 +0200 Subject: [PATCH] Refactor image processing into separate hash and group workers for improved concurrency and clarity --- main.go | 288 ++++++++++++++++++++++++++++++++------------------------ 1 file changed, 165 insertions(+), 123 deletions(-) diff --git a/main.go b/main.go index 8d27867..21a558d 100644 --- a/main.go +++ b/main.go @@ -15,17 +15,27 @@ import ( "github.com/corona10/goimagehash" ) +type HashResult struct { + file string + hash *goimagehash.ExtImageHash +} + +type GroupResult struct { + mainFile string + group []string +} + func main() { thresh := flag.Int("thresh", 10, "Threshold for distance") workers := flag.Int("workers", 100, "Number of workers") notifyInterval := flag.Int("notify", 1000, "Notify interval") flag.Parse() logger.InitFlag() - hashes := &sync.Map{} logger.Info("Starting") logger.Info("Threshold: %v", *thresh) logger.Info("Patterns: %d", len(flag.Args())) + // Collect files files := make([]string, 0) for _, pattern := range flag.Args() { base, pattern := doublestar.SplitPattern(pattern) @@ -44,139 +54,63 @@ func main() { } logger.Info("Patterns expanded to %d files", len(files)) - workerChan := make(chan struct{}, *workers) - wg := sync.WaitGroup{} + // Phase 1: Hashing + hashChan := make(chan string, len(files)) + hashResults := make(chan HashResult, len(files)) + var hashWg sync.WaitGroup - var processedFiles uint64 - for _, file := range files { - workerChan <- struct{}{} - wg.Add(1) - go func(file string) { - defer wg.Done() - defer func() { <-workerChan }() - log := logger.Default.WithPrefix(file) - ext := filepath.Ext(file) - if ext != ".jpg" && ext != ".jpeg" && ext != ".png" { - log.Debug("Skipping non-image file: %s", file) - atomic.AddUint64(&processedFiles, 1) - if atomic.LoadUint64(&processedFiles)%uint64(*notifyInterval) == 0 { - logger.Info("Processed %d/%d files", atomic.LoadUint64(&processedFiles), len(files)) - } - return - } - - imgfile, err := os.Open(file) - if err != nil { - log.Error("Failed to open file: %v", err) - atomic.AddUint64(&processedFiles, 1) - if atomic.LoadUint64(&processedFiles)%uint64(*notifyInterval) == 0 { - logger.Info("Processed %d/%d files", atomic.LoadUint64(&processedFiles), len(files)) - } - return - } - defer imgfile.Close() - - isPng := ext == ".png" - var img image.Image - if isPng { - img, err = png.Decode(imgfile) - } else { - img, err = jpeg.Decode(imgfile) - } - if err != nil { - log.Error("Failed to decode image: %v", err) - atomic.AddUint64(&processedFiles, 1) - if atomic.LoadUint64(&processedFiles)%uint64(*notifyInterval) == 0 { - logger.Info("Processed %d/%d files", atomic.LoadUint64(&processedFiles), len(files)) - } - return - } - - hash, err := goimagehash.ExtPerceptionHash(img, 8, 8) - if err != nil { - log.Error("Failed to calculate hash: %v", err) - atomic.AddUint64(&processedFiles, 1) - if atomic.LoadUint64(&processedFiles)%uint64(*notifyInterval) == 0 { - logger.Info("Processed %d/%d files", atomic.LoadUint64(&processedFiles), len(files)) - } - return - } - log.Debug("Hashed: %v", hash) - hashes.Store(file, hash) - atomic.AddUint64(&processedFiles, 1) - if atomic.LoadUint64(&processedFiles)%uint64(*notifyInterval) == 0 { - logger.Info("Processed %d/%d files", atomic.LoadUint64(&processedFiles), len(files)) - } - }(file) + // Start hash workers + for i := 0; i < *workers; i++ { + hashWg.Add(1) + go hashWorker(hashChan, hashResults, &hashWg, *notifyInterval, len(files)) } - groupedImages := make(map[string][]string) - wg.Wait() + // Send files to hash workers + for _, file := range files { + hashChan <- file + } + close(hashChan) - processed := &sync.Map{} + // Collect hash results + allHashes := make(map[string]*goimagehash.ExtImageHash) + go func() { + hashWg.Wait() + close(hashResults) + }() - // Add progress counter for comparison stage - var comparedFiles uint64 - totalFiles := len(files) + for result := range hashResults { + allHashes[result.file] = result.hash + } - hashes.Range(func(key, value interface{}) bool { - workerChan <- struct{}{} - wg.Add(1) - go func(key, value interface{}) { - defer wg.Done() - defer func() { <-workerChan }() - filea := key.(string) - hasha := value.(*goimagehash.ExtImageHash) + // Phase 2: Grouping + groupChan := make(chan HashResult, len(allHashes)) + groupResults := make(chan GroupResult, len(allHashes)) + var groupWg sync.WaitGroup - if _, ok := processed.Load(filea); ok { - atomic.AddUint64(&comparedFiles, 1) - if atomic.LoadUint64(&comparedFiles)%uint64(*notifyInterval) == 0 { - logger.Info("Compared %d/%d files", atomic.LoadUint64(&comparedFiles), totalFiles) - } - return - } + // Start group workers + for i := 0; i < *workers; i++ { + groupWg.Add(1) + go groupWorker(groupChan, groupResults, &groupWg, *thresh, *notifyInterval, len(allHashes), allHashes) + } - var group []string - hashes.Range(func(key, value interface{}) bool { - fileb := key.(string) - hashb := value.(*goimagehash.ExtImageHash) - if filea == fileb { - return true - } - distance, err := hasha.Distance(hashb) - if err != nil { - logger.Error("Failed to calculate distance: %v", err) - return true - } - logger.Debug("Distance between %v and %v: %v", filea, fileb, distance) - if distance <= *thresh { - group = append(group, fileb) - processed.Store(fileb, true) - } - return true - }) + // Send hash results to group workers + for file, hash := range allHashes { + groupChan <- HashResult{file: file, hash: hash} + } + close(groupChan) - if len(group) > 0 { - groupedImages[filea] = group - processed.Store(filea, true) - } - atomic.AddUint64(&comparedFiles, 1) - if atomic.LoadUint64(&comparedFiles)%uint64(*notifyInterval) == 0 { - logger.Info("Compared %d/%d files", atomic.LoadUint64(&comparedFiles), totalFiles) - } - }(key, value) - return true - }) - wg.Wait() + // Collect group results + go func() { + groupWg.Wait() + close(groupResults) + }() - // Deduplicate by keeping the largest file in each group - for file, group := range groupedImages { - // Add the main file to the group for size comparison - allFiles := append([]string{file}, group...) - - // Find the largest file + // Process results and remove duplicates + for result := range groupResults { + allFiles := append([]string{result.mainFile}, result.group...) var largestFile string var largestSize int64 + for _, f := range allFiles { info, err := os.Stat(f) if err != nil { @@ -189,7 +123,6 @@ func main() { } } - // Remove all files except the largest one for _, f := range allFiles { if f != largestFile { logger.Info("Removing duplicate: %s (keeping %s)", f, largestFile) @@ -199,5 +132,114 @@ func main() { } } } + logger.Info("Done") } + +func hashWorker(in <-chan string, out chan<- HashResult, wg *sync.WaitGroup, notifyInterval int, totalFiles int) { + defer wg.Done() + var processed uint64 + + for file := range in { + log := logger.Default.WithPrefix(file) + ext := filepath.Ext(file) + if ext != ".jpg" && ext != ".jpeg" && ext != ".png" { + log.Debug("Skipping non-image file: %s", file) + atomic.AddUint64(&processed, 1) + if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 { + logger.Info("Processed %d/%d files", atomic.LoadUint64(&processed), totalFiles) + } + continue + } + + imgfile, err := os.Open(file) + if err != nil { + log.Error("Failed to open file: %v", err) + atomic.AddUint64(&processed, 1) + if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 { + logger.Info("Processed %d/%d files", atomic.LoadUint64(&processed), totalFiles) + } + continue + } + + isPng := ext == ".png" + var img image.Image + if isPng { + img, err = png.Decode(imgfile) + } else { + img, err = jpeg.Decode(imgfile) + } + imgfile.Close() + + if err != nil { + log.Error("Failed to decode image: %v", err) + atomic.AddUint64(&processed, 1) + if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 { + logger.Info("Processed %d/%d files", atomic.LoadUint64(&processed), totalFiles) + } + continue + } + + hash, err := goimagehash.ExtPerceptionHash(img, 8, 8) + if err != nil { + log.Error("Failed to calculate hash: %v", err) + atomic.AddUint64(&processed, 1) + if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 { + logger.Info("Processed %d/%d files", atomic.LoadUint64(&processed), totalFiles) + } + continue + } + + log.Debug("Hashed: %v", hash) + out <- HashResult{file: file, hash: hash} + atomic.AddUint64(&processed, 1) + if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 { + logger.Info("Processed %d/%d files", atomic.LoadUint64(&processed), totalFiles) + } + } +} + +func groupWorker(in <-chan HashResult, out chan<- GroupResult, wg *sync.WaitGroup, thresh int, notifyInterval int, totalFiles int, allHashes map[string]*goimagehash.ExtImageHash) { + defer wg.Done() + var processed uint64 + processedFiles := make(map[string]bool) + + for result := range in { + filea := result.file + hasha := result.hash + + if processedFiles[filea] { + atomic.AddUint64(&processed, 1) + if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 { + logger.Info("Compared %d/%d files", atomic.LoadUint64(&processed), totalFiles) + } + continue + } + + var group []string + for fileb, hashb := range allHashes { + if filea == fileb { + continue + } + distance, err := hasha.Distance(hashb) + if err != nil { + logger.Error("Failed to calculate distance: %v", err) + continue + } + logger.Debug("Distance between %v and %v: %v", filea, fileb, distance) + if distance <= thresh { + group = append(group, fileb) + processedFiles[fileb] = true + } + } + + if len(group) > 0 { + out <- GroupResult{mainFile: filea, group: group} + processedFiles[filea] = true + } + atomic.AddUint64(&processed, 1) + if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 { + logger.Info("Compared %d/%d files", atomic.LoadUint64(&processed), totalFiles) + } + } +}