package main import ( "fmt" "log" "os" "path/filepath" "strings" "sync" "sync/atomic" "time" ) func IsSymlink(path string) (bool, error) { fileInfo, err := os.Lstat(path) if err != nil { return false, err } // os.ModeSymlink is a bitmask that identifies the symlink mode. // If the file mode & os.ModeSymlink is non-zero, the file is a symlink. return fileInfo.Mode()&os.ModeSymlink != 0, nil } func FileExists(path string) bool { _, err := os.Lstat(path) return err == nil } func NormalizePath(input, workdir string) string { input = filepath.Clean(input) input = filepath.ToSlash(input) input = strings.ReplaceAll(input, "\"", "") if !filepath.IsAbs(input) { log.Printf("Input '%s' is not absolute, prepending work dir '%s'", input, workdir) var err error input = filepath.Join(workdir, input) input, err = filepath.Abs(input) if err != nil { log.Printf("Failed to get absolute path for %s%s%s: %s%+v%s", SourceColor, input, DefaultColor, ErrorColor, err, DefaultColor) return input } } input = filepath.Clean(input) input = filepath.ToSlash(input) return input } func AreSame(lhs string, rhs string) bool { lhsinfo, err := os.Stat(lhs) if err != nil { return false } rhsinfo, err := os.Stat(rhs) if err != nil { return false } return os.SameFile(lhsinfo, rhsinfo) } func ConvertHome(input string) (string, error) { if strings.HasPrefix(input, "~/") { homedir, err := os.UserHomeDir() if err != nil { return input, fmt.Errorf("unable to convert ~ to user directory with error %+v", err) } return strings.Replace(input, "~", homedir, 1), nil } return input, nil } func GetSyncFilesRecursively(input string, output chan string, status chan error) { defer close(output) defer close(status) var filesProcessed int32 var foldersProcessed int32 var activeWorkers int32 progressTicker := time.NewTicker(200 * time.Millisecond) defer progressTicker.Stop() done := make(chan struct{}) defer close(done) directories := make(chan string, 100000) workerPool := make(chan struct{}, 4000) directories <- input go func() { for { select { case <-progressTicker.C: dirCount := len(directories) workers := atomic.LoadInt32(&activeWorkers) fmt.Printf("\rFiles processed: %d; Folders processed: %d; Active workers: %d; Directory queue: %d", atomic.LoadInt32(&filesProcessed), atomic.LoadInt32(&foldersProcessed), workers, dirCount) case <-done: // Final progress update fmt.Printf("\rFiles processed: %d; Folders processed: %d; Completed successfully\n", atomic.LoadInt32(&filesProcessed), atomic.LoadInt32(&foldersProcessed)) return } } }() allDone := make(chan struct{}) go func() { var wg sync.WaitGroup go func() { for { if atomic.LoadInt32(&activeWorkers) == 0 && len(directories) == 0 { time.Sleep(10 * time.Millisecond) if atomic.LoadInt32(&activeWorkers) == 0 && len(directories) == 0 { close(allDone) return } } time.Sleep(50 * time.Millisecond) } }() for { select { case directory, ok := <-directories: if !ok { wg.Wait() return } atomic.AddInt32(&activeWorkers, 1) go func(dir string) { workerPool <- struct{}{} atomic.AddInt32(&foldersProcessed, 1) processDirectory(dir, directories, output, &filesProcessed) <-workerPool atomic.AddInt32(&activeWorkers, -1) }(directory) } } }() <-allDone log.Printf("Files processed: %d; Folders processed: %d", atomic.LoadInt32(&filesProcessed), atomic.LoadInt32(&foldersProcessed)) } func processDirectory(directory string, directories chan<- string, output chan<- string, filesProcessed *int32) { files, err := os.ReadDir(directory) if err != nil { log.Printf("Error reading directory %s: %+v", directory, err) return } for _, file := range files { if file.IsDir() { directories <- filepath.Join(directory, file.Name()) } else { if FileRegex.MatchString(file.Name()) || IsYAMLSyncFile(file.Name()) { output <- filepath.Join(directory, file.Name()) } atomic.AddInt32(filesProcessed, 1) } } } func IsYAMLSyncFile(filename string) bool { return filename == "sync.yaml" || filename == "sync.yml" }