Make recursive much much faster
By virtue of parallelism
This commit is contained in:
		
							
								
								
									
										67
									
								
								util.go
									
									
									
									
									
								
							
							
						
						
									
										67
									
								
								util.go
									
									
									
									
									
								
							@@ -2,10 +2,12 @@ package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io/fs"
 | 
			
		||||
	"log"
 | 
			
		||||
	"os"
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"sync/atomic"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -67,29 +69,62 @@ func GetSyncFilesRecursively(input string, output chan string, status chan error
 | 
			
		||||
	defer close(status)
 | 
			
		||||
 | 
			
		||||
	var filesProcessed int32
 | 
			
		||||
	var foldersProcessed int32
 | 
			
		||||
	progressTicker := time.NewTicker(200 * time.Millisecond)
 | 
			
		||||
	defer progressTicker.Stop()
 | 
			
		||||
	go func() {
 | 
			
		||||
		for {
 | 
			
		||||
			fmt.Printf("\rFiles processed: %d; Folders processed: %d;", filesProcessed, foldersProcessed)
 | 
			
		||||
			<-progressTicker.C
 | 
			
		||||
			fmt.Printf("\rFiles processed: %d", filesProcessed)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	err := filepath.WalkDir(input, func(path string, file fs.DirEntry, err error) error {
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	wg.Add(1)
 | 
			
		||||
	var initial sync.Once
 | 
			
		||||
	directories := make(chan string, 10000)
 | 
			
		||||
	workerPool := make(chan struct{}, 10000)
 | 
			
		||||
	directories <- input
 | 
			
		||||
 | 
			
		||||
		// Effectively only find files named "sync" (with no extension!!)
 | 
			
		||||
		if !file.IsDir() && DirRegex.MatchString(path) {
 | 
			
		||||
			output <- path
 | 
			
		||||
		}
 | 
			
		||||
		filesProcessed++
 | 
			
		||||
	log.Printf("%+v", len(workerPool))
 | 
			
		||||
	go func() {
 | 
			
		||||
		for directory := range directories {
 | 
			
		||||
			workerPool <- struct{}{}
 | 
			
		||||
			wg.Add(1)
 | 
			
		||||
			go func(directory string) {
 | 
			
		||||
				atomic.AddInt32(&foldersProcessed, 1)
 | 
			
		||||
				defer wg.Done()
 | 
			
		||||
				defer func() { <-workerPool }()
 | 
			
		||||
 | 
			
		||||
		return nil
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		status <- err
 | 
			
		||||
	}
 | 
			
		||||
				// log.Printf("Reading directory %s", directory)
 | 
			
		||||
				files, err := os.ReadDir(directory)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					log.Printf("Error reading directory %s: %+v", directory, err)
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				for _, file := range files {
 | 
			
		||||
					// log.Printf("Processing file %s", file.Name())
 | 
			
		||||
					if file.IsDir() {
 | 
			
		||||
						directories <- filepath.Join(directory, file.Name())
 | 
			
		||||
					} else {
 | 
			
		||||
						// log.Println(file.Name(), DirRegex.MatchString(file.Name()))
 | 
			
		||||
						if FileRegex.MatchString(file.Name()) {
 | 
			
		||||
							log.Printf("Writing")
 | 
			
		||||
							output <- filepath.Join(directory, file.Name())
 | 
			
		||||
						}
 | 
			
		||||
						atomic.AddInt32(&filesProcessed, 1)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				// log.Printf("Done reading directory %s", directory)
 | 
			
		||||
			
 | 
			
		||||
				initial.Do(func() {
 | 
			
		||||
					wg.Done()
 | 
			
		||||
				})
 | 
			
		||||
			}(directory)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
	log.Printf("Done")
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user