Rework all readers to be async

This commit is contained in:
2024-07-01 12:52:04 +02:00
parent 1788e1f1e8
commit c6611877df

116
main.go
View File

@@ -41,17 +41,30 @@ func main() {
log.Printf("Recurse: %s", *recurse) log.Printf("Recurse: %s", *recurse)
log.Printf("File: %s", *file) log.Printf("File: %s", *file)
var instructions []LinkInstruction instructions := make(chan LinkInstruction, 1000)
status := make(chan error)
if *recurse != "" { if *recurse != "" {
instructions, _ = ReadFromFilesRecursively(*recurse) ReadFromFilesRecursively(*recurse, instructions, status)
} else if *file != "" { } else if *file != "" {
instructions, _ = ReadFromFile(*file) ReadFromFile(*file, instructions, status, true)
} else if len(os.Args) > 1 { } else if len(os.Args) > 1 {
instructions, _ = ReadFromArgs() ReadFromArgs(instructions, status)
} else { } else {
instructions, _ = ReadFromStdin() ReadFromStdin(instructions, status)
} }
go func() {
for {
err, ok := <-status
if !ok {
break
}
if err != nil {
log.Println(err)
}
}
}()
if len(instructions) == 0 { if len(instructions) == 0 {
log.Printf("No input provided") log.Printf("No input provided")
log.Printf("Provide input as: ") log.Printf("Provide input as: ")
@@ -62,7 +75,12 @@ func main() {
os.Exit(1) os.Exit(1)
} }
var wg sync.WaitGroup var wg sync.WaitGroup
for _, instruction := range instructions { for {
instruction, ok := <-instructions
if !ok {
log.Printf("No more instructions to process")
break
}
log.Printf("Processing: %s", instruction.String()) log.Printf("Processing: %s", instruction.String())
status := make(chan error) status := make(chan error)
go instruction.RunAsync(status) go instruction.RunAsync(status)
@@ -74,46 +92,61 @@ func main() {
wg.Done() wg.Done()
} }
wg.Wait() wg.Wait()
log.Println("All done")
} }
func ReadFromFilesRecursively(input string) ([]LinkInstruction, error) { func ReadFromFilesRecursively(input string, output chan LinkInstruction, status chan error) {
defer close(output)
defer close(status)
input = NormalizePath(input) input = NormalizePath(input)
log.Printf("Reading input from files recursively starting in %s%s%s", PathColor, input, DefaultColor) log.Printf("Reading input from files recursively starting in %s%s%s", PathColor, input, DefaultColor)
var instructions []LinkInstruction
files, err := GetSyncFilesRecursively(input) files, err := GetSyncFilesRecursively(input)
if err != nil { if err != nil {
log.Fatalf("Failed to get sync files recursively: %s%+v%s", ErrorColor, err, DefaultColor) log.Printf("Failed to get sync files recursively: %s%+v%s", ErrorColor, err, DefaultColor)
status <- err
return
} }
var wg sync.WaitGroup
for _, file := range files { for _, file := range files {
file = NormalizePath(file) fileCopy := file
wg.Add(1)
go func() {
defer wg.Done()
fileCopy = NormalizePath(fileCopy)
// This "has" to be done because instructions are resolved in relation to cwd // This "has" to be done because instructions are resolved in relation to cwd
fileDir := DirRegex.FindStringSubmatch(file) fileDir := DirRegex.FindStringSubmatch(fileCopy)
if fileDir == nil { if fileDir == nil {
log.Printf("Failed to extract directory from %s%s%s", SourceColor, file, DefaultColor) log.Printf("Failed to extract directory from %s%s%s", SourceColor, fileCopy, DefaultColor)
continue return
} }
log.Printf("Changing directory to %s%s%s (for %s%s%s)", PathColor, fileDir[1], DefaultColor, PathColor, file, DefaultColor) log.Printf("Changing directory to %s%s%s (for %s%s%s)", PathColor, fileDir[1], DefaultColor, PathColor, fileCopy, DefaultColor)
err := os.Chdir(fileDir[1]) err := os.Chdir(fileDir[1])
if err != nil { if err != nil {
log.Printf("Failed to change directory to %s%s%s: %s%+v%s", SourceColor, fileDir[1], DefaultColor, ErrorColor, err, DefaultColor) log.Printf("Failed to change directory to %s%s%s: %s%+v%s", SourceColor, fileDir[1], DefaultColor, ErrorColor, err, DefaultColor)
continue return
} }
fileInstructions, _ := ReadFromFile(file) ReadFromFile(fileCopy, output, status, false)
instructions = append(instructions, fileInstructions...) }()
} }
return instructions, nil wg.Wait()
} }
func ReadFromFile(input string) ([]LinkInstruction, error) { func ReadFromFile(input string, output chan LinkInstruction, status chan error, doclose bool) {
if doclose {
defer close(output)
defer close(status)
}
input = NormalizePath(input) input = NormalizePath(input)
log.Printf("Reading input from file: %s%s%s", PathColor, input, DefaultColor) log.Printf("Reading input from file: %s%s%s", PathColor, input, DefaultColor)
var instructions []LinkInstruction
file, err := os.Open(input) file, err := os.Open(input)
if err != nil { if err != nil {
log.Fatalf("Failed to open file %s%s%s: %s%+v%s", SourceColor, input, DefaultColor, ErrorColor, err, DefaultColor) log.Fatalf("Failed to open file %s%s%s: %s%+v%s", SourceColor, input, DefaultColor, ErrorColor, err, DefaultColor)
return
} }
defer file.Close() defer file.Close()
@@ -125,32 +158,34 @@ func ReadFromFile(input string) ([]LinkInstruction, error) {
log.Printf("Error parsing line: %s'%s'%s, error: %s%+v%s", SourceColor, line, DefaultColor, ErrorColor, err, DefaultColor) log.Printf("Error parsing line: %s'%s'%s, error: %s%+v%s", SourceColor, line, DefaultColor, ErrorColor, err, DefaultColor)
continue continue
} }
instructions = append(instructions, instruction) output <- instruction
} }
log.Printf("Read %d instructions from file", len(instructions))
return instructions, nil
} }
func ReadFromArgs() ([]LinkInstruction, error) { func ReadFromArgs(output chan LinkInstruction, status chan error) {
defer close(output)
defer close(status)
log.Printf("Reading input from args") log.Printf("Reading input from args")
var instructions []LinkInstruction
for _, arg := range os.Args[1:] { for _, arg := range os.Args[1:] {
instruction, err := ParseInstruction(arg) instruction, err := ParseInstruction(arg)
if err != nil { if err != nil {
log.Printf("Error parsing arg: %s'%s'%s, error: %s%+v%s", SourceColor, arg, DefaultColor, ErrorColor, err, DefaultColor) log.Printf("Error parsing arg: %s'%s'%s, error: %s%+v%s", SourceColor, arg, DefaultColor, ErrorColor, err, DefaultColor)
continue continue
} }
instructions = append(instructions, instruction) output <- instruction
} }
log.Printf("Read %d instructions from args", len(instructions))
return instructions, nil
} }
func ReadFromStdin() ([]LinkInstruction, error) { func ReadFromStdin(output chan LinkInstruction, status chan error) {
defer close(output)
defer close(status)
log.Printf("Reading input from stdin") log.Printf("Reading input from stdin")
var instructions []LinkInstruction
info, err := os.Stdin.Stat() info, err := os.Stdin.Stat()
if err != nil { if err != nil {
log.Fatalf("Failed to stat stdin: %s%+v%s", ErrorColor, err, DefaultColor) log.Fatalf("Failed to stat stdin: %s%+v%s", ErrorColor, err, DefaultColor)
status <- err
return
} }
if info.Mode()&os.ModeNamedPipe != 0 { if info.Mode()&os.ModeNamedPipe != 0 {
scanner := bufio.NewScanner(os.Stdin) scanner := bufio.NewScanner(os.Stdin)
@@ -161,13 +196,12 @@ func ReadFromStdin() ([]LinkInstruction, error) {
log.Printf("Error parsing line: %s'%s'%s, error: %s%+v%s", SourceColor, line, DefaultColor, ErrorColor, err, DefaultColor) log.Printf("Error parsing line: %s'%s'%s, error: %s%+v%s", SourceColor, line, DefaultColor, ErrorColor, err, DefaultColor)
continue continue
} }
instructions = append(instructions, instruction) output <- instruction
} }
if err := scanner.Err(); err != nil { if err := scanner.Err(); err != nil {
log.Fatalf("Error reading from stdin: %s%+v%s", ErrorColor, err, DefaultColor) log.Fatalf("Error reading from stdin: %s%+v%s", ErrorColor, err, DefaultColor)
status <- err
return
} }
} }
log.Printf("Read %d instructions from stdin", len(instructions))
return instructions, nil
} }