Refactor everything to use nsq instead of hosting a http server
The point of this is (hopefully) some sort of resiliency I do not want to lose any messages ever And I want to be able to kill this process whenever it is misbehaving Hopefully this achieves that goal
This commit is contained in:
@@ -5,12 +5,17 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/nsqio/go-nsq"
|
||||
)
|
||||
|
||||
var Error *log.Logger
|
||||
var Warning *log.Logger
|
||||
|
||||
func init() {
|
||||
log.SetFlags(log.Lmicroseconds | log.Lshortfile)
|
||||
logFile, err := os.Create("main.log")
|
||||
@@ -30,50 +35,56 @@ func init() {
|
||||
}
|
||||
|
||||
const DOWNLOAD_WORKERS = 4
|
||||
var downloadQueue = make(chan *DownloadTask, 100)
|
||||
|
||||
func enableCORS(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
|
||||
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
|
||||
type DLHandler struct{}
|
||||
|
||||
if r.Method == http.MethodOptions {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
}
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
func handleDownload(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
|
||||
return
|
||||
func (*DLHandler) HandleMessage(message *nsq.Message) error {
|
||||
log.Printf("Received message '%s' with %d attempts", message.Body, message.Attempts)
|
||||
data := DownloadRequest{}
|
||||
err := json.Unmarshal(message.Body, &data)
|
||||
if err != nil {
|
||||
Error.Printf("Error unmarshalling message: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
var req DownloadRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
log.Printf("Error parsing JSON: %v", err)
|
||||
http.Error(w, "Error parsing JSON", http.StatusBadRequest)
|
||||
return
|
||||
log.Printf("Downloading %s", data.Link)
|
||||
err = Download(data.Link)
|
||||
if err != nil {
|
||||
Error.Printf("Error downloading %s: %v", data.Link, err)
|
||||
return err
|
||||
}
|
||||
|
||||
downloadQueue <- &DownloadTask{Url: req.Link}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
message.Finish()
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
config := nsq.NewConfig()
|
||||
config.MaxAttempts = 5
|
||||
config.MaxInFlight = DOWNLOAD_WORKERS
|
||||
config.MsgTimeout = 10 * time.Second
|
||||
|
||||
consumer, err := nsq.NewConsumer("ytdqueue", "dl", config)
|
||||
if err != nil {
|
||||
Error.Printf("Error creating consumer: %v", err)
|
||||
return
|
||||
}
|
||||
for i := 0; i < DOWNLOAD_WORKERS; i++ {
|
||||
worker := &DownloadWorker{id: i, input: downloadQueue}
|
||||
go worker.Run()
|
||||
consumer.AddHandler(&DLHandler{})
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/download", enableCORS(http.HandlerFunc(handleDownload)))
|
||||
log.Println("Server starting on :5000")
|
||||
err := http.ListenAndServe(":5000", mux)
|
||||
err = consumer.ConnectToNSQD("nsq.site.quack-lab.dev:41505")
|
||||
if err != nil {
|
||||
log.Println("Error starting server:", err)
|
||||
Error.Printf("Error connecting to nsqlookupd: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
<-sigChan
|
||||
log.Println("Received signal to terminate. Initiating graceful shutdown...")
|
||||
|
||||
consumer.Stop()
|
||||
<-consumer.StopChan
|
||||
|
||||
log.Println("Graceful shutdown completed.")
|
||||
}
|
||||
|
Reference in New Issue
Block a user