Deretard the orchestrator
This commit is contained in:
2
main.go
2
main.go
@@ -17,7 +17,7 @@ import (
|
|||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
)
|
)
|
||||||
|
|
||||||
var webhook *wh.ZulipWebhook
|
var webhook wh.Webhook
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
// Add flag for generating .env.example
|
// Add flag for generating .env.example
|
||||||
|
|||||||
248
orchestrator.go
248
orchestrator.go
@@ -15,111 +15,111 @@ import (
|
|||||||
logger "git.site.quack-lab.dev/dave/cylogger"
|
logger "git.site.quack-lab.dev/dave/cylogger"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Orchestrator manages periodic data refetching and monitoring
|
// Orchestrator manages timer-based monitoring
|
||||||
type Orchestrator struct {
|
type Orchestrator struct {
|
||||||
esiClient esi.ESIInterface
|
esiClient esi.ESIInterface
|
||||||
ssoClient routes.SSOInterface
|
ssoClient routes.SSOInterface
|
||||||
database repositories.DatabaseInterface
|
database repositories.DatabaseInterface
|
||||||
webhook *wh.ZulipWebhook
|
webhook wh.Webhook
|
||||||
expiryTimers map[string]time.Time // Track extractor expiry times
|
expiryAlert *time.Duration
|
||||||
|
expiryCritical *time.Duration
|
||||||
|
alertExpiryTimers map[string]*time.Timer // Track extractor expiry times
|
||||||
|
criticalExpiryTimers map[string]*time.Timer // Track critical extractor expiry times
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOrchestrator creates a new orchestrator instance
|
// NewOrchestrator creates a new orchestrator instance
|
||||||
func NewOrchestrator(esiClient esi.ESIInterface, ssoClient routes.SSOInterface, database repositories.DatabaseInterface, webhook *wh.ZulipWebhook) *Orchestrator {
|
func NewOrchestrator(esiClient esi.ESIInterface, ssoClient routes.SSOInterface, database repositories.DatabaseInterface, webhook wh.Webhook) *Orchestrator {
|
||||||
return &Orchestrator{
|
return &Orchestrator{
|
||||||
esiClient: esiClient,
|
esiClient: esiClient,
|
||||||
ssoClient: ssoClient,
|
ssoClient: ssoClient,
|
||||||
database: database,
|
database: database,
|
||||||
webhook: webhook,
|
webhook: webhook,
|
||||||
expiryTimers: make(map[string]time.Time),
|
alertExpiryTimers: make(map[string]*time.Timer),
|
||||||
|
criticalExpiryTimers: make(map[string]*time.Timer),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start begins the orchestrator's periodic operations
|
// Start begins the orchestrator's timer-based operations
|
||||||
func (o *Orchestrator) Start() {
|
func (o *Orchestrator) Start() {
|
||||||
logger.Info("Starting orchestrator with cache validity: %s", options.GlobalOptions.CacheValidity)
|
logger.Info("Starting orchestrator with cache validity: %s", options.GlobalOptions.CacheValidity)
|
||||||
|
|
||||||
// Parse cache validity duration
|
|
||||||
cacheValidity, err := time.ParseDuration(options.GlobalOptions.CacheValidity)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("Invalid cache validity duration %s: %v", options.GlobalOptions.CacheValidity, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Parse expiry warning and critical durations
|
// Parse expiry warning and critical durations
|
||||||
expiryWarning, err := time.ParseDuration(options.GlobalOptions.ExpiryWarning)
|
expiryWarning, err := time.ParseDuration(options.GlobalOptions.ExpiryWarning)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Invalid expiry warning duration %s: %v", options.GlobalOptions.ExpiryWarning, err)
|
logger.Error("Invalid expiry warning duration %s: %v", options.GlobalOptions.ExpiryWarning, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
o.expiryAlert = &expiryWarning
|
||||||
|
|
||||||
expiryCritical, err := time.ParseDuration(options.GlobalOptions.ExpiryCritical)
|
expiryCritical, err := time.ParseDuration(options.GlobalOptions.ExpiryCritical)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Invalid expiry critical duration %s: %v", options.GlobalOptions.ExpiryCritical, err)
|
logger.Error("Invalid expiry critical duration %s: %v", options.GlobalOptions.ExpiryCritical, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
o.expiryCritical = &expiryCritical
|
||||||
|
|
||||||
// Run initial fetch
|
// Set up timers once
|
||||||
o.refetchAllData()
|
o.setupTimers()
|
||||||
|
|
||||||
// Set up periodic refetching
|
|
||||||
ticker := time.NewTicker(cacheValidity)
|
|
||||||
go func() {
|
|
||||||
for range ticker.C {
|
|
||||||
logger.Info("Cache validity period reached, refetching all data")
|
|
||||||
o.refetchAllData()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Set up periodic expiry checking
|
|
||||||
expiryTicker := time.NewTicker(1 * time.Hour) // Check every hour
|
|
||||||
go func() {
|
|
||||||
for range expiryTicker.C {
|
|
||||||
logger.Debug("Checking extractor expiry timers")
|
|
||||||
o.checkExpiryTimers(expiryWarning, expiryCritical)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
logger.Info("Orchestrator started successfully")
|
logger.Info("Orchestrator started successfully")
|
||||||
}
|
}
|
||||||
|
|
||||||
// refetchAllData fetches data for all characters and checks thresholds
|
// Stop stops all timers and cleans up resources
|
||||||
func (o *Orchestrator) refetchAllData() {
|
func (o *Orchestrator) Stop() {
|
||||||
logger.Info("Orchestrator.refetchAllData: Starting data refetch for all characters")
|
logger.Info("Stopping orchestrator")
|
||||||
|
|
||||||
|
// Stop all alert timers
|
||||||
|
for key, timer := range o.alertExpiryTimers {
|
||||||
|
timer.Stop()
|
||||||
|
delete(o.alertExpiryTimers, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop all critical timers
|
||||||
|
for key, timer := range o.criticalExpiryTimers {
|
||||||
|
timer.Stop()
|
||||||
|
delete(o.criticalExpiryTimers, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("Orchestrator stopped")
|
||||||
|
}
|
||||||
|
|
||||||
|
// setupTimers sets up all timers based on current extractor data
|
||||||
|
func (o *Orchestrator) setupTimers() {
|
||||||
|
logger.Info("Setting up timers for all characters")
|
||||||
|
|
||||||
// Get all characters from database
|
// Get all characters from database
|
||||||
characters, err := o.database.Character().GetAllCharacters()
|
characters, err := o.database.Character().GetAllCharacters()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Orchestrator.refetchAllData: Failed to get all characters: %v", err)
|
logger.Error("Failed to get all characters: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("Orchestrator.refetchAllData: Found %d characters to process", len(characters))
|
logger.Info("Found %d characters to process", len(characters))
|
||||||
|
|
||||||
if len(characters) == 0 {
|
if len(characters) == 0 {
|
||||||
logger.Info("Orchestrator.refetchAllData: No characters found in database")
|
logger.Info("No characters found in database")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process each character
|
// Process each character and set up timers
|
||||||
for i, char := range characters {
|
for i, char := range characters {
|
||||||
logger.Info("Orchestrator.refetchAllData: Processing character %d/%d: %s", i+1, len(characters), char.CharacterName)
|
logger.Info("Processing character %d/%d: %s", i+1, len(characters), char.CharacterName)
|
||||||
o.processCharacter(char)
|
o.setupCharacterTimers(char)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("Orchestrator.refetchAllData: Completed data refetch for all characters")
|
logger.Info("Completed timer setup for all characters")
|
||||||
}
|
}
|
||||||
|
|
||||||
// processCharacter processes a single character's data
|
// setupCharacterTimers sets up timers for a single character's extractors
|
||||||
func (o *Orchestrator) processCharacter(char types.Character) {
|
func (o *Orchestrator) setupCharacterTimers(char types.Character) {
|
||||||
logger.Info("Orchestrator.processCharacter: Starting processing for character: %s (ID: %d)", char.CharacterName, char.ID)
|
logger.Info("Setting up timers for character: %s (ID: %d)", char.CharacterName, char.ID)
|
||||||
|
|
||||||
planets, err := o.esiClient.GetCharacterPlanets(context.Background(), int(char.ID), char.AccessToken)
|
planets, err := o.esiClient.GetCharacterPlanets(context.Background(), int(char.ID), char.AccessToken)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warning("Orchestrator.processCharacter: Failed to get planets for character %s: %v", char.CharacterName, err)
|
logger.Warning("Failed to get planets for character %s: %v", char.CharacterName, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
logger.Info("Orchestrator.processCharacter: Got %d planets for character %s", len(planets), char.CharacterName)
|
logger.Info("Got %d planets for character %s", len(planets), char.CharacterName)
|
||||||
|
|
||||||
planetIds := make([]int64, len(planets))
|
planetIds := make([]int64, len(planets))
|
||||||
for i, planet := range planets {
|
for i, planet := range planets {
|
||||||
@@ -127,106 +127,66 @@ func (o *Orchestrator) processCharacter(char types.Character) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get extractors for this character
|
// Get extractors for this character
|
||||||
logger.Info("Orchestrator.processCharacter: Getting extractors for character %s", char.CharacterName)
|
logger.Info("Getting extractors for character %s", char.CharacterName)
|
||||||
extractors, err := routes.GetExtractorsForCharacter(o.esiClient, int(char.ID), char.AccessToken, planetIds)
|
extractors, err := routes.GetExtractorsForCharacter(o.esiClient, int(char.ID), char.AccessToken, planetIds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warning("Orchestrator.processCharacter: Failed to get extractors for character %s: %v", char.CharacterName, err)
|
logger.Warning("Failed to get extractors for character %s: %v", char.CharacterName, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
logger.Info("Orchestrator.processCharacter: Got %d extractors for character %s", len(extractors), char.CharacterName)
|
logger.Info("Got %d extractors for character %s", len(extractors), char.CharacterName)
|
||||||
|
|
||||||
// Get storage for this character
|
// Set up timers for each extractor
|
||||||
// logger.Info("Orchestrator.processCharacter: Getting storage for character %s", char.CharacterName)
|
|
||||||
// storage, err := routes.GetStorageForCharacter(o.esiClient, int(char.ID), char.AccessToken)
|
|
||||||
// if err != nil {
|
|
||||||
// logger.Warning("Orchestrator.processCharacter: Failed to get storage for character %s: %v", char.CharacterName, err)
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
// logger.Info("Orchestrator.processCharacter: Got %d storage facilities for character %s", len(storage), char.CharacterName)
|
|
||||||
|
|
||||||
// Check storage thresholds
|
|
||||||
// logger.Info("Orchestrator.processCharacter: Checking storage thresholds for character %s", char.CharacterName)
|
|
||||||
// o.checkStorageThresholds(char.CharacterName, storage)
|
|
||||||
|
|
||||||
// Update expiry timers for extractors
|
|
||||||
logger.Info("Orchestrator.processCharacter: Updating expiry timers for character %s", char.CharacterName)
|
|
||||||
o.updateExpiryTimers(char.CharacterName, extractors)
|
|
||||||
|
|
||||||
logger.Info("Orchestrator.processCharacter: Completed processing for character %s", char.CharacterName)
|
|
||||||
}
|
|
||||||
|
|
||||||
// checkStorageThresholds checks storage utilization against configured thresholds
|
|
||||||
// func (o *Orchestrator) checkStorageThresholds(characterName string, storage []routes.StorageInfo) {
|
|
||||||
// warningThreshold := options.GlobalOptions.StorageWarning
|
|
||||||
// criticalThreshold := options.GlobalOptions.StorageCritical
|
|
||||||
|
|
||||||
// for _, s := range storage {
|
|
||||||
// if s.Utilization >= criticalThreshold {
|
|
||||||
// message := routes.FormatStorageAlert(s, true)
|
|
||||||
// o.sendWebhook(characterName, message)
|
|
||||||
// } else if s.Utilization >= warningThreshold {
|
|
||||||
// message := routes.FormatStorageAlert(s, false)
|
|
||||||
// o.sendWebhook(characterName, message)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// updateExpiryTimers updates the in-memory expiry timers for extractors
|
|
||||||
func (o *Orchestrator) updateExpiryTimers(characterName string, extractors []routes.ExtractorInfo) {
|
|
||||||
for _, extractor := range extractors {
|
for _, extractor := range extractors {
|
||||||
if extractor.ExpiryDate == "N/A" {
|
o.setupExtractorTimers(char.CharacterName, extractor)
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
expiryTime, err := time.Parse(time.RFC3339, extractor.ExpiryDate)
|
|
||||||
if err != nil {
|
|
||||||
logger.Warning("Failed to parse expiry date %s for character %s: %v", extractor.ExpiryDate, characterName, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store the earliest expiry time for this character
|
|
||||||
key := fmt.Sprintf("%s_%s", characterName, extractor.PlanetName)
|
|
||||||
if existingTime, exists := o.expiryTimers[key]; !exists || expiryTime.Before(existingTime) {
|
|
||||||
o.expiryTimers[key] = expiryTime
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.Info("Completed timer setup for character %s", char.CharacterName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkExpiryTimers checks all expiry timers and sends alerts if needed
|
// setupExtractorTimers sets up timers for a single extractor
|
||||||
func (o *Orchestrator) checkExpiryTimers(warningDuration, criticalDuration time.Duration) {
|
func (o *Orchestrator) setupExtractorTimers(characterName string, extractor routes.ExtractorInfo) {
|
||||||
now := time.Now()
|
if extractor.ExpiryDate == "N/A" {
|
||||||
|
logger.Info("Extractor %d has no expiry date, skipping", extractor.ExtractorNumber)
|
||||||
for key, expiryTime := range o.expiryTimers {
|
return
|
||||||
timeUntilExpiry := expiryTime.Sub(now)
|
|
||||||
|
|
||||||
if timeUntilExpiry <= criticalDuration {
|
|
||||||
// Critical: extractor expired or about to expire
|
|
||||||
characterName, planetName := o.parseExpiryKey(key)
|
|
||||||
message := fmt.Sprintf("CRITICAL: Extractor expired on %s (expired: %s)",
|
|
||||||
planetName, expiryTime.Format("2006-01-02 15:04:05"))
|
|
||||||
o.sendWebhook(characterName, message)
|
|
||||||
} else if timeUntilExpiry <= warningDuration {
|
|
||||||
// Warning: extractor expiring soon
|
|
||||||
characterName, planetName := o.parseExpiryKey(key)
|
|
||||||
message := fmt.Sprintf("WARNING: Extractor expiring soon on %s (expires: %s)",
|
|
||||||
planetName, expiryTime.Format("2006-01-02 15:04:05"))
|
|
||||||
o.sendWebhook(characterName, message)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.Info("Setting up timers for extractor %d with expiry date %s", extractor.ExtractorNumber, extractor.ExpiryDate)
|
||||||
|
expiryTime, err := time.Parse(time.RFC3339, extractor.ExpiryDate)
|
||||||
|
if err != nil {
|
||||||
|
logger.Warning("Failed to parse expiry date %s: %v", extractor.ExpiryDate, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create timer key for this extractor
|
||||||
|
key := fmt.Sprintf("%s_%s_%d", characterName, extractor.PlanetName, extractor.ExtractorNumber)
|
||||||
|
|
||||||
|
// Set warning timer to fire at the exact warning time
|
||||||
|
warningAlertAt := expiryTime.Add(-*o.expiryAlert)
|
||||||
|
logger.Info("Setting warning timer for extractor %d to fire at %s", extractor.ExtractorNumber, warningAlertAt.Format(time.RFC3339))
|
||||||
|
o.alertExpiryTimers[key] = time.AfterFunc(time.Until(warningAlertAt), func() {
|
||||||
|
o.sendExpiryAlert(characterName, extractor, false)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Set critical timer to fire at the exact critical time
|
||||||
|
criticalAlertAt := expiryTime.Add(-*o.expiryCritical)
|
||||||
|
logger.Info("Setting critical timer for extractor %d to fire at %s", extractor.ExtractorNumber, criticalAlertAt.Format(time.RFC3339))
|
||||||
|
o.criticalExpiryTimers[key] = time.AfterFunc(time.Until(criticalAlertAt), func() {
|
||||||
|
o.sendExpiryAlert(characterName, extractor, true)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// parseExpiryKey parses the expiry timer key to extract character and planet names
|
// sendExpiryAlert sends an alert for extractor expiry
|
||||||
func (o *Orchestrator) parseExpiryKey(key string) (characterName, planetName string) {
|
func (o *Orchestrator) sendExpiryAlert(characterName string, extractor routes.ExtractorInfo, isCritical bool) {
|
||||||
// Key format: "characterName_planetName"
|
alertType := "WARNING"
|
||||||
// Find the last underscore to split properly
|
if isCritical {
|
||||||
for i := len(key) - 1; i >= 0; i-- {
|
alertType = "CRITICAL"
|
||||||
if key[i] == '_' {
|
|
||||||
characterName = key[:i]
|
|
||||||
planetName = key[i+1:]
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return
|
|
||||||
|
message := fmt.Sprintf("%s: Extractor %d on %s expires at %s",
|
||||||
|
alertType, extractor.ExtractorNumber, extractor.PlanetName, extractor.ExpiryDate)
|
||||||
|
|
||||||
|
logger.Info("Sending %s alert for character %s: %s", alertType, characterName, message)
|
||||||
|
o.sendWebhook(characterName, message)
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendWebhook sends a webhook notification
|
// sendWebhook sends a webhook notification
|
||||||
@@ -239,7 +199,7 @@ func (o *Orchestrator) sendWebhook(characterName, message string) {
|
|||||||
logger.Info("Sending webhook notification for character %s: %s", characterName, message)
|
logger.Info("Sending webhook notification for character %s: %s", characterName, message)
|
||||||
|
|
||||||
// Send webhook with character name as topic and message as content
|
// Send webhook with character name as topic and message as content
|
||||||
err := o.webhook.Post("Eve PI", characterName, message)
|
err := o.webhook.Post("EvePI", characterName, message)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Failed to send webhook for character %s: %v", characterName, err)
|
logger.Error("Failed to send webhook for character %s: %v", characterName, err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ type ZulipWebhook struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewZulipWebhook creates a new Zulip webhook client
|
// NewZulipWebhook creates a new Zulip webhook client
|
||||||
func NewZulipWebhook(url, email, token string) *ZulipWebhook {
|
func NewZulipWebhook(url, email, token string) Webhook {
|
||||||
logger.Info("Zulip webhook client initialized with email: %s", email)
|
logger.Info("Zulip webhook client initialized with email: %s", email)
|
||||||
|
|
||||||
// Parse HTTP timeout ONCE at initialization
|
// Parse HTTP timeout ONCE at initialization
|
||||||
@@ -74,3 +74,5 @@ func (z *ZulipWebhook) Post(channel, topic, message string) error {
|
|||||||
logger.Info("Zulip message sent successfully to channel: %s, topic: %s", channel, topic)
|
logger.Info("Zulip message sent successfully to channel: %s, topic: %s", channel, topic)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ Webhook = &ZulipWebhook{}
|
||||||
Reference in New Issue
Block a user