Refactor image processing into separate hash and group workers for improved concurrency and clarity

This commit is contained in:
2025-05-23 13:33:19 +02:00
parent 552d0d4057
commit 7946eb43c5

286
main.go
View File

@@ -15,17 +15,27 @@ import (
"github.com/corona10/goimagehash" "github.com/corona10/goimagehash"
) )
type HashResult struct {
file string
hash *goimagehash.ExtImageHash
}
type GroupResult struct {
mainFile string
group []string
}
func main() { func main() {
thresh := flag.Int("thresh", 10, "Threshold for distance") thresh := flag.Int("thresh", 10, "Threshold for distance")
workers := flag.Int("workers", 100, "Number of workers") workers := flag.Int("workers", 100, "Number of workers")
notifyInterval := flag.Int("notify", 1000, "Notify interval") notifyInterval := flag.Int("notify", 1000, "Notify interval")
flag.Parse() flag.Parse()
logger.InitFlag() logger.InitFlag()
hashes := &sync.Map{}
logger.Info("Starting") logger.Info("Starting")
logger.Info("Threshold: %v", *thresh) logger.Info("Threshold: %v", *thresh)
logger.Info("Patterns: %d", len(flag.Args())) logger.Info("Patterns: %d", len(flag.Args()))
// Collect files
files := make([]string, 0) files := make([]string, 0)
for _, pattern := range flag.Args() { for _, pattern := range flag.Args() {
base, pattern := doublestar.SplitPattern(pattern) base, pattern := doublestar.SplitPattern(pattern)
@@ -44,139 +54,63 @@ func main() {
} }
logger.Info("Patterns expanded to %d files", len(files)) logger.Info("Patterns expanded to %d files", len(files))
workerChan := make(chan struct{}, *workers) // Phase 1: Hashing
wg := sync.WaitGroup{} hashChan := make(chan string, len(files))
hashResults := make(chan HashResult, len(files))
var hashWg sync.WaitGroup
var processedFiles uint64 // Start hash workers
for i := 0; i < *workers; i++ {
hashWg.Add(1)
go hashWorker(hashChan, hashResults, &hashWg, *notifyInterval, len(files))
}
// Send files to hash workers
for _, file := range files { for _, file := range files {
workerChan <- struct{}{} hashChan <- file
wg.Add(1)
go func(file string) {
defer wg.Done()
defer func() { <-workerChan }()
log := logger.Default.WithPrefix(file)
ext := filepath.Ext(file)
if ext != ".jpg" && ext != ".jpeg" && ext != ".png" {
log.Debug("Skipping non-image file: %s", file)
atomic.AddUint64(&processedFiles, 1)
if atomic.LoadUint64(&processedFiles)%uint64(*notifyInterval) == 0 {
logger.Info("Processed %d/%d files", atomic.LoadUint64(&processedFiles), len(files))
} }
return close(hashChan)
// Collect hash results
allHashes := make(map[string]*goimagehash.ExtImageHash)
go func() {
hashWg.Wait()
close(hashResults)
}()
for result := range hashResults {
allHashes[result.file] = result.hash
} }
imgfile, err := os.Open(file) // Phase 2: Grouping
if err != nil { groupChan := make(chan HashResult, len(allHashes))
log.Error("Failed to open file: %v", err) groupResults := make(chan GroupResult, len(allHashes))
atomic.AddUint64(&processedFiles, 1) var groupWg sync.WaitGroup
if atomic.LoadUint64(&processedFiles)%uint64(*notifyInterval) == 0 {
logger.Info("Processed %d/%d files", atomic.LoadUint64(&processedFiles), len(files))
}
return
}
defer imgfile.Close()
isPng := ext == ".png" // Start group workers
var img image.Image for i := 0; i < *workers; i++ {
if isPng { groupWg.Add(1)
img, err = png.Decode(imgfile) go groupWorker(groupChan, groupResults, &groupWg, *thresh, *notifyInterval, len(allHashes), allHashes)
} else {
img, err = jpeg.Decode(imgfile)
}
if err != nil {
log.Error("Failed to decode image: %v", err)
atomic.AddUint64(&processedFiles, 1)
if atomic.LoadUint64(&processedFiles)%uint64(*notifyInterval) == 0 {
logger.Info("Processed %d/%d files", atomic.LoadUint64(&processedFiles), len(files))
}
return
} }
hash, err := goimagehash.ExtPerceptionHash(img, 8, 8) // Send hash results to group workers
if err != nil { for file, hash := range allHashes {
log.Error("Failed to calculate hash: %v", err) groupChan <- HashResult{file: file, hash: hash}
atomic.AddUint64(&processedFiles, 1)
if atomic.LoadUint64(&processedFiles)%uint64(*notifyInterval) == 0 {
logger.Info("Processed %d/%d files", atomic.LoadUint64(&processedFiles), len(files))
}
return
}
log.Debug("Hashed: %v", hash)
hashes.Store(file, hash)
atomic.AddUint64(&processedFiles, 1)
if atomic.LoadUint64(&processedFiles)%uint64(*notifyInterval) == 0 {
logger.Info("Processed %d/%d files", atomic.LoadUint64(&processedFiles), len(files))
}
}(file)
} }
close(groupChan)
groupedImages := make(map[string][]string) // Collect group results
wg.Wait() go func() {
groupWg.Wait()
close(groupResults)
}()
processed := &sync.Map{} // Process results and remove duplicates
for result := range groupResults {
// Add progress counter for comparison stage allFiles := append([]string{result.mainFile}, result.group...)
var comparedFiles uint64
totalFiles := len(files)
hashes.Range(func(key, value interface{}) bool {
workerChan <- struct{}{}
wg.Add(1)
go func(key, value interface{}) {
defer wg.Done()
defer func() { <-workerChan }()
filea := key.(string)
hasha := value.(*goimagehash.ExtImageHash)
if _, ok := processed.Load(filea); ok {
atomic.AddUint64(&comparedFiles, 1)
if atomic.LoadUint64(&comparedFiles)%uint64(*notifyInterval) == 0 {
logger.Info("Compared %d/%d files", atomic.LoadUint64(&comparedFiles), totalFiles)
}
return
}
var group []string
hashes.Range(func(key, value interface{}) bool {
fileb := key.(string)
hashb := value.(*goimagehash.ExtImageHash)
if filea == fileb {
return true
}
distance, err := hasha.Distance(hashb)
if err != nil {
logger.Error("Failed to calculate distance: %v", err)
return true
}
logger.Debug("Distance between %v and %v: %v", filea, fileb, distance)
if distance <= *thresh {
group = append(group, fileb)
processed.Store(fileb, true)
}
return true
})
if len(group) > 0 {
groupedImages[filea] = group
processed.Store(filea, true)
}
atomic.AddUint64(&comparedFiles, 1)
if atomic.LoadUint64(&comparedFiles)%uint64(*notifyInterval) == 0 {
logger.Info("Compared %d/%d files", atomic.LoadUint64(&comparedFiles), totalFiles)
}
}(key, value)
return true
})
wg.Wait()
// Deduplicate by keeping the largest file in each group
for file, group := range groupedImages {
// Add the main file to the group for size comparison
allFiles := append([]string{file}, group...)
// Find the largest file
var largestFile string var largestFile string
var largestSize int64 var largestSize int64
for _, f := range allFiles { for _, f := range allFiles {
info, err := os.Stat(f) info, err := os.Stat(f)
if err != nil { if err != nil {
@@ -189,7 +123,6 @@ func main() {
} }
} }
// Remove all files except the largest one
for _, f := range allFiles { for _, f := range allFiles {
if f != largestFile { if f != largestFile {
logger.Info("Removing duplicate: %s (keeping %s)", f, largestFile) logger.Info("Removing duplicate: %s (keeping %s)", f, largestFile)
@@ -199,5 +132,114 @@ func main() {
} }
} }
} }
logger.Info("Done") logger.Info("Done")
} }
func hashWorker(in <-chan string, out chan<- HashResult, wg *sync.WaitGroup, notifyInterval int, totalFiles int) {
defer wg.Done()
var processed uint64
for file := range in {
log := logger.Default.WithPrefix(file)
ext := filepath.Ext(file)
if ext != ".jpg" && ext != ".jpeg" && ext != ".png" {
log.Debug("Skipping non-image file: %s", file)
atomic.AddUint64(&processed, 1)
if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 {
logger.Info("Processed %d/%d files", atomic.LoadUint64(&processed), totalFiles)
}
continue
}
imgfile, err := os.Open(file)
if err != nil {
log.Error("Failed to open file: %v", err)
atomic.AddUint64(&processed, 1)
if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 {
logger.Info("Processed %d/%d files", atomic.LoadUint64(&processed), totalFiles)
}
continue
}
isPng := ext == ".png"
var img image.Image
if isPng {
img, err = png.Decode(imgfile)
} else {
img, err = jpeg.Decode(imgfile)
}
imgfile.Close()
if err != nil {
log.Error("Failed to decode image: %v", err)
atomic.AddUint64(&processed, 1)
if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 {
logger.Info("Processed %d/%d files", atomic.LoadUint64(&processed), totalFiles)
}
continue
}
hash, err := goimagehash.ExtPerceptionHash(img, 8, 8)
if err != nil {
log.Error("Failed to calculate hash: %v", err)
atomic.AddUint64(&processed, 1)
if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 {
logger.Info("Processed %d/%d files", atomic.LoadUint64(&processed), totalFiles)
}
continue
}
log.Debug("Hashed: %v", hash)
out <- HashResult{file: file, hash: hash}
atomic.AddUint64(&processed, 1)
if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 {
logger.Info("Processed %d/%d files", atomic.LoadUint64(&processed), totalFiles)
}
}
}
func groupWorker(in <-chan HashResult, out chan<- GroupResult, wg *sync.WaitGroup, thresh int, notifyInterval int, totalFiles int, allHashes map[string]*goimagehash.ExtImageHash) {
defer wg.Done()
var processed uint64
processedFiles := make(map[string]bool)
for result := range in {
filea := result.file
hasha := result.hash
if processedFiles[filea] {
atomic.AddUint64(&processed, 1)
if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 {
logger.Info("Compared %d/%d files", atomic.LoadUint64(&processed), totalFiles)
}
continue
}
var group []string
for fileb, hashb := range allHashes {
if filea == fileb {
continue
}
distance, err := hasha.Distance(hashb)
if err != nil {
logger.Error("Failed to calculate distance: %v", err)
continue
}
logger.Debug("Distance between %v and %v: %v", filea, fileb, distance)
if distance <= thresh {
group = append(group, fileb)
processedFiles[fileb] = true
}
}
if len(group) > 0 {
out <- GroupResult{mainFile: filea, group: group}
processedFiles[filea] = true
}
atomic.AddUint64(&processed, 1)
if atomic.LoadUint64(&processed)%uint64(notifyInterval) == 0 {
logger.Info("Compared %d/%d files", atomic.LoadUint64(&processed), totalFiles)
}
}
}