From fff7e62cd19e2119e1245b72bca63c74f22f54b5 Mon Sep 17 00:00:00 2001 From: PhatPhuckDave Date: Sat, 8 Mar 2025 10:53:18 +0100 Subject: [PATCH] Implement walkdir --- main.go | 141 ++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 128 insertions(+), 13 deletions(-) diff --git a/main.go b/main.go index 199b142..53ddf19 100644 --- a/main.go +++ b/main.go @@ -2,33 +2,148 @@ package main import ( "fmt" - "io" "log" "os" + "path/filepath" + "strings" + "sync" + "sync/atomic" + "time" ) var Error *log.Logger var Warning *log.Logger + func init() { log.SetFlags(log.Lmicroseconds | log.Lshortfile) - logFile, err := os.Create("main.log") - if err != nil { - log.Printf("Error creating log file: %v", err) - os.Exit(1) - } - logger := io.MultiWriter(os.Stdout, logFile) - log.SetOutput(logger) + log.SetOutput(os.Stdout) - Error = log.New(io.MultiWriter(logFile, os.Stderr, os.Stdout), + Error = log.New(os.Stderr, fmt.Sprintf("%sERROR:%s ", "\033[0;101m", "\033[0m"), log.Lmicroseconds|log.Lshortfile) - Warning = log.New(io.MultiWriter(logFile, os.Stdout), + Warning = log.New(os.Stdout, fmt.Sprintf("%sWarning:%s ", "\033[0;93m", "\033[0m"), log.Lmicroseconds|log.Lshortfile) } func main() { - log.Println("Hello, World!") - Warning.Println("Hello, World!") - Error.Println("Hello, World!") + files := make(chan string, 10000) + status := make(chan error) + + go GetSyncFilesRecursively(".", files, status) + + for file := range files { + log.Println(file) + } +} + +func GetSyncFilesRecursively(input string, output chan string, status chan error) { + defer close(output) + defer close(status) + + var filesProcessed int32 + var foldersProcessed int32 + var activeWorkers int32 + + progressTicker := time.NewTicker(200 * time.Millisecond) + defer progressTicker.Stop() + + done := make(chan struct{}) + defer close(done) + + directories := make(chan string, 100000) + workerPool := make(chan struct{}, 4000) + directories <- input + + go func() { + for { + select { + case <-progressTicker.C: + dirCount := len(directories) + workers := atomic.LoadInt32(&activeWorkers) + fmt.Printf("\rFiles processed: %8d; Folders processed: %8d; Active workers: %8d; Directory queue: %8d", + atomic.LoadInt32(&filesProcessed), + atomic.LoadInt32(&foldersProcessed), + workers, + dirCount) + case <-done: + // Final progress update + fmt.Printf("\nFiles processed: %8d; Folders processed: %8d; Completed successfully\n", + atomic.LoadInt32(&filesProcessed), + atomic.LoadInt32(&foldersProcessed)) + return + } + } + }() + + allDone := make(chan struct{}) + + go func() { + var wg sync.WaitGroup + + go func() { + for { + if atomic.LoadInt32(&activeWorkers) == 0 && len(directories) == 0 { + time.Sleep(10 * time.Millisecond) + if atomic.LoadInt32(&activeWorkers) == 0 && len(directories) == 0 { + close(allDone) + return + } + } + time.Sleep(50 * time.Millisecond) + } + }() + + for { + select { + case directory, ok := <-directories: + if !ok { + wg.Wait() + return + } + + atomic.AddInt32(&activeWorkers, 1) + + go func(dir string) { + workerPool <- struct{}{} + + atomic.AddInt32(&foldersProcessed, 1) + processDirectory(dir, directories, output, &filesProcessed) + + <-workerPool + atomic.AddInt32(&activeWorkers, -1) + }(directory) + } + } + }() + + <-allDone + + log.Printf("Files processed: %d; Folders processed: %d", + atomic.LoadInt32(&filesProcessed), + atomic.LoadInt32(&foldersProcessed)) +} + +func processDirectory(directory string, directories chan<- string, output chan<- string, filesProcessed *int32) { + files, err := os.ReadDir(directory) + if err != nil { + log.Printf("Error reading directory %s: %+v", directory, err) + return + } + + for _, file := range files { + if file.IsDir() { + directories <- filepath.Join(directory, file.Name()) + } else { + output <- filepath.Join(directory, file.Name()) + atomic.AddInt32(filesProcessed, 1) + } + } +} + +func NormalizePath(input, workdir string) string { + input = filepath.Clean(input) + input = filepath.ToSlash(input) + input = strings.ReplaceAll(input, "\"", "") + return input }