Implement orchestrator to handle the alerting
This commit is contained in:
6
main.go
6
main.go
@@ -17,7 +17,7 @@ import (
|
||||
"github.com/valyala/fasthttp"
|
||||
)
|
||||
|
||||
var webhook wh.Webhook
|
||||
var webhook *wh.ZulipWebhook
|
||||
|
||||
func main() {
|
||||
// Add flag for generating .env.example
|
||||
@@ -72,6 +72,10 @@ func main() {
|
||||
routeHandler := routes.NewRouteHandler(sso, webhook, cachedESI)
|
||||
routeHandler.SetupRoutes(r)
|
||||
|
||||
// Create and start orchestrator
|
||||
orchestrator := NewOrchestrator(cachedESI, sso, database, webhook)
|
||||
orchestrator.Start()
|
||||
|
||||
logger.Info("Starting web server on 0.0.0.0:%s", options.GlobalOptions.Port)
|
||||
go func() {
|
||||
if err := fasthttp.ListenAndServe("0.0.0.0:"+options.GlobalOptions.Port, r.Handler); err != nil {
|
||||
|
||||
221
orchestrator.go
Normal file
221
orchestrator.go
Normal file
@@ -0,0 +1,221 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"go-eve-pi/esi"
|
||||
"go-eve-pi/options"
|
||||
"go-eve-pi/repositories"
|
||||
"go-eve-pi/routes"
|
||||
"go-eve-pi/types"
|
||||
wh "go-eve-pi/webhook"
|
||||
|
||||
logger "git.site.quack-lab.dev/dave/cylogger"
|
||||
)
|
||||
|
||||
// Orchestrator manages periodic data refetching and monitoring
|
||||
type Orchestrator struct {
|
||||
esiClient esi.ESIInterface
|
||||
ssoClient routes.SSOInterface
|
||||
database repositories.DatabaseInterface
|
||||
webhook *wh.ZulipWebhook
|
||||
expiryTimers map[string]time.Time // Track extractor expiry times
|
||||
}
|
||||
|
||||
// NewOrchestrator creates a new orchestrator instance
|
||||
func NewOrchestrator(esiClient esi.ESIInterface, ssoClient routes.SSOInterface, database repositories.DatabaseInterface, webhook *wh.ZulipWebhook) *Orchestrator {
|
||||
return &Orchestrator{
|
||||
esiClient: esiClient,
|
||||
ssoClient: ssoClient,
|
||||
database: database,
|
||||
webhook: webhook,
|
||||
expiryTimers: make(map[string]time.Time),
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the orchestrator's periodic operations
|
||||
func (o *Orchestrator) Start() {
|
||||
logger.Info("Starting orchestrator with cache validity: %s", options.GlobalOptions.CacheValidity)
|
||||
|
||||
// Parse cache validity duration
|
||||
cacheValidity, err := time.ParseDuration(options.GlobalOptions.CacheValidity)
|
||||
if err != nil {
|
||||
logger.Error("Invalid cache validity duration %s: %v", options.GlobalOptions.CacheValidity, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Parse expiry warning and critical durations
|
||||
expiryWarning, err := time.ParseDuration(options.GlobalOptions.ExpiryWarning)
|
||||
if err != nil {
|
||||
logger.Error("Invalid expiry warning duration %s: %v", options.GlobalOptions.ExpiryWarning, err)
|
||||
return
|
||||
}
|
||||
|
||||
expiryCritical, err := time.ParseDuration(options.GlobalOptions.ExpiryCritical)
|
||||
if err != nil {
|
||||
logger.Error("Invalid expiry critical duration %s: %v", options.GlobalOptions.ExpiryCritical, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Run initial fetch
|
||||
o.refetchAllData()
|
||||
|
||||
// Set up periodic refetching
|
||||
ticker := time.NewTicker(cacheValidity)
|
||||
go func() {
|
||||
for range ticker.C {
|
||||
logger.Info("Cache validity period reached, refetching all data")
|
||||
o.refetchAllData()
|
||||
}
|
||||
}()
|
||||
|
||||
// Set up periodic expiry checking
|
||||
expiryTicker := time.NewTicker(1 * time.Hour) // Check every hour
|
||||
go func() {
|
||||
for range expiryTicker.C {
|
||||
logger.Debug("Checking extractor expiry timers")
|
||||
o.checkExpiryTimers(expiryWarning, expiryCritical)
|
||||
}
|
||||
}()
|
||||
|
||||
logger.Info("Orchestrator started successfully")
|
||||
}
|
||||
|
||||
// refetchAllData fetches data for all characters and checks thresholds
|
||||
func (o *Orchestrator) refetchAllData() {
|
||||
logger.Info("Starting data refetch for all characters")
|
||||
|
||||
// Get all characters from database
|
||||
characters, err := o.database.Character().GetAllCharacters()
|
||||
if err != nil {
|
||||
logger.Error("Failed to get all characters: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Info("Found %d characters to process", len(characters))
|
||||
|
||||
// Process each character
|
||||
for _, char := range characters {
|
||||
o.processCharacter(char)
|
||||
}
|
||||
|
||||
logger.Info("Completed data refetch for all characters")
|
||||
}
|
||||
|
||||
// processCharacter processes a single character's data
|
||||
func (o *Orchestrator) processCharacter(char types.Character) {
|
||||
logger.Debug("Processing character: %s", char.CharacterName)
|
||||
|
||||
// Get extractors for this character
|
||||
extractors, err := routes.GetExtractorsForCharacter(o.esiClient, int(char.ID), char.AccessToken)
|
||||
if err != nil {
|
||||
logger.Warning("Failed to get extractors for character %s: %v", char.CharacterName, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Get storage for this character
|
||||
storage, err := routes.GetStorageForCharacter(o.esiClient, int(char.ID), char.AccessToken)
|
||||
if err != nil {
|
||||
logger.Warning("Failed to get storage for character %s: %v", char.CharacterName, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Check storage thresholds
|
||||
o.checkStorageThresholds(char.CharacterName, storage)
|
||||
|
||||
// Update expiry timers for extractors
|
||||
o.updateExpiryTimers(char.CharacterName, extractors)
|
||||
|
||||
logger.Debug("Completed processing character: %s", char.CharacterName)
|
||||
}
|
||||
|
||||
// checkStorageThresholds checks storage utilization against configured thresholds
|
||||
func (o *Orchestrator) checkStorageThresholds(characterName string, storage []routes.StorageInfo) {
|
||||
warningThreshold := options.GlobalOptions.StorageWarning
|
||||
criticalThreshold := options.GlobalOptions.StorageCritical
|
||||
|
||||
for _, s := range storage {
|
||||
if s.Utilization >= criticalThreshold {
|
||||
message := routes.FormatStorageAlert(s, true)
|
||||
o.sendWebhook(characterName, message)
|
||||
} else if s.Utilization >= warningThreshold {
|
||||
message := routes.FormatStorageAlert(s, false)
|
||||
o.sendWebhook(characterName, message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// updateExpiryTimers updates the in-memory expiry timers for extractors
|
||||
func (o *Orchestrator) updateExpiryTimers(characterName string, extractors []routes.ExtractorInfo) {
|
||||
for _, extractor := range extractors {
|
||||
if extractor.ExpiryDate == "N/A" {
|
||||
continue
|
||||
}
|
||||
|
||||
expiryTime, err := time.Parse("2006-01-02 15:04:05", extractor.ExpiryDate)
|
||||
if err != nil {
|
||||
logger.Warning("Failed to parse expiry date %s for character %s: %v", extractor.ExpiryDate, characterName, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Store the earliest expiry time for this character
|
||||
key := fmt.Sprintf("%s_%s", characterName, extractor.PlanetName)
|
||||
if existingTime, exists := o.expiryTimers[key]; !exists || expiryTime.Before(existingTime) {
|
||||
o.expiryTimers[key] = expiryTime
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// checkExpiryTimers checks all expiry timers and sends alerts if needed
|
||||
func (o *Orchestrator) checkExpiryTimers(warningDuration, criticalDuration time.Duration) {
|
||||
now := time.Now()
|
||||
|
||||
for key, expiryTime := range o.expiryTimers {
|
||||
timeUntilExpiry := expiryTime.Sub(now)
|
||||
|
||||
if timeUntilExpiry <= criticalDuration {
|
||||
// Critical: extractor expired or about to expire
|
||||
characterName, planetName := o.parseExpiryKey(key)
|
||||
message := fmt.Sprintf("🚨 CRITICAL: Extractor expired on %s! (expired: %s)",
|
||||
planetName, expiryTime.Format("2006-01-02 15:04:05"))
|
||||
o.sendWebhook(characterName, message)
|
||||
} else if timeUntilExpiry <= warningDuration {
|
||||
// Warning: extractor expiring soon
|
||||
characterName, planetName := o.parseExpiryKey(key)
|
||||
message := fmt.Sprintf("⏰ WARNING: Extractor expiring soon on %s! (expires: %s)",
|
||||
planetName, expiryTime.Format("2006-01-02 15:04:05"))
|
||||
o.sendWebhook(characterName, message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// parseExpiryKey parses the expiry timer key to extract character and planet names
|
||||
func (o *Orchestrator) parseExpiryKey(key string) (characterName, planetName string) {
|
||||
// Key format: "characterName_planetName"
|
||||
// Find the last underscore to split properly
|
||||
for i := len(key) - 1; i >= 0; i-- {
|
||||
if key[i] == '_' {
|
||||
characterName = key[:i]
|
||||
planetName = key[i+1:]
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// sendWebhook sends a webhook notification
|
||||
func (o *Orchestrator) sendWebhook(characterName, message string) {
|
||||
if o.webhook == nil {
|
||||
logger.Warning("Webhook not configured, skipping notification: %s", message)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Info("Sending webhook notification for character %s: %s", characterName, message)
|
||||
|
||||
// Send webhook with character name as topic and message as content
|
||||
err := o.webhook.Post("Eve PI", characterName, message)
|
||||
if err != nil {
|
||||
logger.Error("Failed to send webhook for character %s: %v", characterName, err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user