package main import ( "flag" "image" "image/jpeg" "image/png" "os" "path/filepath" "sync" "sync/atomic" logger "git.site.quack-lab.dev/dave/cylogger" "github.com/bmatcuk/doublestar/v4" "github.com/corona10/goimagehash" ) type Config struct { Threshold int Workers int NotifyInterval int NoGroupByFolder bool } type HashResult struct { file string hash *goimagehash.ExtImageHash } type GroupResult struct { mainFile string group []string } func main() { cfg := Config{ Threshold: *flag.Int("thresh", 10, "Threshold for distance"), Workers: *flag.Int("workers", 100, "Number of workers"), NotifyInterval: *flag.Int("notify", 1000, "Notify interval"), NoGroupByFolder: *flag.Bool("nogroup", false, "Don't group by folder"), } flag.Parse() logger.InitFlag() logger.Info("Starting") logger.Info("Threshold: %v", cfg.Threshold) logger.Info("Patterns: %d", len(flag.Args())) logger.Info("Workers: %v", cfg.Workers) logger.Info("Notify interval: %v", cfg.NotifyInterval) logger.Info("Group by folder: %v", !cfg.NoGroupByFolder) // Collect files files := make([]string, 0) for _, pattern := range flag.Args() { base, pattern := doublestar.SplitPattern(pattern) logger.Debug("Globbing %q from %q", pattern, base) matches, err := doublestar.Glob(os.DirFS(base), pattern) if err != nil { logger.Error("Failed to glob pattern: %v", err) continue } logger.Debug("Glob %q in %q got %d matches", pattern, base, len(matches)) for _, match := range matches { match = filepath.Join(base, match) logger.Trace("Adding %q", match) files = append(files, match) } } logger.Info("Patterns expanded to %d files", len(files)) if !cfg.NoGroupByFolder { // Group files by folder folderMap := make(map[string][]string) for _, file := range files { dir := filepath.Dir(file) folderMap[dir] = append(folderMap[dir], file) } logger.Info("Processing %d folders", len(folderMap)) // Process each folder separately for dir, group := range folderMap { logger.Info("Processing folder: %s (%d files)", dir, len(group)) processFiles(group, cfg) } } else { // Process all files together processFiles(files, cfg) } logger.Info("Done") } func processFiles(files []string, cfg Config) { // Phase 1: Hashing hashChan := make(chan string, len(files)) hashResults := make(chan HashResult, len(files)) var hashWg sync.WaitGroup // Start hash workers for i := 0; i < cfg.Workers; i++ { hashWg.Add(1) go hashWorker(hashChan, hashResults, &hashWg, cfg.NotifyInterval, len(files)) } // Send files to hash workers for _, file := range files { hashChan <- file } close(hashChan) // Collect hash results allHashes := make(map[string]*goimagehash.ExtImageHash) go func() { hashWg.Wait() close(hashResults) }() for result := range hashResults { allHashes[result.file] = result.hash } // Phase 2: Grouping groupChan := make(chan HashResult, len(allHashes)) groupResults := make(chan GroupResult, len(allHashes)) var groupWg sync.WaitGroup // Start group workers for i := 0; i < cfg.Workers; i++ { groupWg.Add(1) go groupWorker(groupChan, groupResults, &groupWg, cfg.Threshold, cfg.NotifyInterval, len(allHashes), allHashes) } // Send hash results to group workers for file, hash := range allHashes { groupChan <- HashResult{file: file, hash: hash} } close(groupChan) // Collect group results go func() { groupWg.Wait() close(groupResults) }() // Process results and remove duplicates deletedFiles := make(map[string]bool) for result := range groupResults { allFiles := append([]string{result.mainFile}, result.group...) var largestFile string var largestSize int64 for _, f := range allFiles { if deletedFiles[f] { continue } info, err := os.Stat(f) if err != nil { logger.Error("Failed to get file info for %s: %v", f, err) continue } if info.Size() > largestSize { largestSize = info.Size() largestFile = f } } for _, f := range allFiles { if f != largestFile && !deletedFiles[f] { logger.Info("Removing duplicate: %s (keeping %s)", f, largestFile) if err := os.Remove(f); err != nil { logger.Error("Failed to remove file %s: %v", f, err) } else { deletedFiles[f] = true } } } } } func hashWorker(in <-chan string, out chan<- HashResult, wg *sync.WaitGroup, notifyInterval int, totalFiles int) { defer wg.Done() var processed uint64 for file := range in { log := logger.Default.WithPrefix(file) ext := filepath.Ext(file) if ext != ".jpg" && ext != ".jpeg" && ext != ".png" { log.Debug("Skipping non-image file: %s", file) atomic.AddUint64(&processed, 1) if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 { logger.Info("Processed %d/%d files", atomic.LoadUint64(&processed), totalFiles) } continue } imgfile, err := os.Open(file) if err != nil { log.Error("Failed to open file: %v", err) atomic.AddUint64(&processed, 1) if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 { logger.Info("Processed %d/%d files", atomic.LoadUint64(&processed), totalFiles) } continue } isPng := ext == ".png" var img image.Image if isPng { img, err = png.Decode(imgfile) } else { img, err = jpeg.Decode(imgfile) } imgfile.Close() if err != nil { log.Error("Failed to decode image: %v", err) atomic.AddUint64(&processed, 1) if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 { logger.Info("Processed %d/%d files", atomic.LoadUint64(&processed), totalFiles) } continue } hash, err := goimagehash.ExtPerceptionHash(img, 8, 8) if err != nil { log.Error("Failed to calculate hash: %v", err) atomic.AddUint64(&processed, 1) if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 { logger.Info("Processed %d/%d files", atomic.LoadUint64(&processed), totalFiles) } continue } log.Debug("Hashed: %v", hash) out <- HashResult{file: file, hash: hash} atomic.AddUint64(&processed, 1) if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 { logger.Info("Processed %d/%d files", atomic.LoadUint64(&processed), totalFiles) } } } func groupWorker(in <-chan HashResult, out chan<- GroupResult, wg *sync.WaitGroup, thresh int, notifyInterval int, totalFiles int, allHashes map[string]*goimagehash.ExtImageHash) { defer wg.Done() var processed uint64 processedFiles := make(map[string]bool) for result := range in { filea := result.file hasha := result.hash if processedFiles[filea] { atomic.AddUint64(&processed, 1) if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 { logger.Info("Compared %d/%d files", atomic.LoadUint64(&processed), totalFiles) } continue } var group []string for fileb, hashb := range allHashes { if filea == fileb || processedFiles[fileb] { continue } distance, err := hasha.Distance(hashb) if err != nil { logger.Error("Failed to calculate distance: %v", err) continue } logger.Debug("Distance between %v and %v: %v", filea, fileb, distance) if distance <= thresh { group = append(group, fileb) processedFiles[fileb] = true } } if len(group) > 0 { out <- GroupResult{mainFile: filea, group: group} processedFiles[filea] = true } atomic.AddUint64(&processed, 1) if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 { logger.Info("Compared %d/%d files", atomic.LoadUint64(&processed), totalFiles) } } }