feat(scheduling): introduce ScheduledJobManager and ServerResourceManager for enhanced job scheduling and resource management
This commit is contained in:
255
app/Jobs/ScheduledJobManager.php
Normal file
255
app/Jobs/ScheduledJobManager.php
Normal file
@@ -0,0 +1,255 @@
|
||||
<?php
|
||||
|
||||
namespace App\Jobs;
|
||||
|
||||
use App\Models\ScheduledDatabaseBackup;
|
||||
use App\Models\ScheduledTask;
|
||||
use Cron\CronExpression;
|
||||
use Illuminate\Bus\Queueable;
|
||||
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||
use Illuminate\Foundation\Bus\Dispatchable;
|
||||
use Illuminate\Queue\InteractsWithQueue;
|
||||
use Illuminate\Queue\Middleware\WithoutOverlapping;
|
||||
use Illuminate\Queue\SerializesModels;
|
||||
use Illuminate\Support\Carbon;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
|
||||
class ScheduledJobManager implements ShouldQueue
|
||||
{
|
||||
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
|
||||
|
||||
/**
|
||||
* The time when this job execution started.
|
||||
* Used to ensure all scheduled items are evaluated against the same point in time.
|
||||
*/
|
||||
private ?Carbon $executionTime = null;
|
||||
|
||||
/**
|
||||
* Create a new job instance.
|
||||
*/
|
||||
public function __construct()
|
||||
{
|
||||
$this->onQueue($this->determineQueue());
|
||||
}
|
||||
|
||||
private function determineQueue(): string
|
||||
{
|
||||
$preferredQueue = 'crons';
|
||||
$fallbackQueue = 'high';
|
||||
|
||||
$configuredQueues = explode(',', env('HORIZON_QUEUES', 'high,default'));
|
||||
|
||||
return in_array($preferredQueue, $configuredQueues) ? $preferredQueue : $fallbackQueue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the middleware the job should pass through.
|
||||
*/
|
||||
public function middleware(): array
|
||||
{
|
||||
return [
|
||||
(new WithoutOverlapping('scheduled-job-manager'))
|
||||
->releaseAfter(60), // Release the lock after 60 seconds if job fails
|
||||
];
|
||||
}
|
||||
|
||||
public function handle(): void
|
||||
{
|
||||
// Freeze the execution time at the start of the job
|
||||
$this->executionTime = Carbon::now();
|
||||
|
||||
Log::channel('scheduled')->info('ScheduledJobManager started', [
|
||||
'execution_time' => $this->executionTime->format('Y-m-d H:i:s T'),
|
||||
]);
|
||||
|
||||
// Process backups - don't let failures stop task processing
|
||||
try {
|
||||
$this->processScheduledBackups();
|
||||
} catch (\Exception $e) {
|
||||
Log::channel('scheduled-errors')->error('Failed to process scheduled backups', [
|
||||
'error' => $e->getMessage(),
|
||||
'trace' => $e->getTraceAsString(),
|
||||
]);
|
||||
}
|
||||
|
||||
// Process tasks - don't let failures stop the job manager
|
||||
try {
|
||||
$this->processScheduledTasks();
|
||||
} catch (\Exception $e) {
|
||||
Log::channel('scheduled-errors')->error('Failed to process scheduled tasks', [
|
||||
'error' => $e->getMessage(),
|
||||
'trace' => $e->getTraceAsString(),
|
||||
]);
|
||||
}
|
||||
|
||||
Log::channel('scheduled')->info('ScheduledJobManager completed');
|
||||
}
|
||||
|
||||
private function processScheduledBackups(): void
|
||||
{
|
||||
$backups = ScheduledDatabaseBackup::with(['database'])
|
||||
->where('enabled', true)
|
||||
->get();
|
||||
|
||||
foreach ($backups as $backup) {
|
||||
try {
|
||||
// Apply the same filtering logic as the original
|
||||
if (! $this->shouldProcessBackup($backup)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$server = $backup->server();
|
||||
$serverTimezone = data_get($server->settings, 'server_timezone', config('app.timezone'));
|
||||
|
||||
if (validate_timezone($serverTimezone) === false) {
|
||||
$serverTimezone = config('app.timezone');
|
||||
}
|
||||
|
||||
$frequency = $backup->frequency;
|
||||
if (isset(VALID_CRON_STRINGS[$frequency])) {
|
||||
$frequency = VALID_CRON_STRINGS[$frequency];
|
||||
}
|
||||
|
||||
if ($this->shouldRunNow($frequency, $serverTimezone)) {
|
||||
Log::channel('scheduled')->info('Dispatching backup job', [
|
||||
'backup_id' => $backup->id,
|
||||
'backup_name' => $backup->name ?? 'unnamed',
|
||||
'server_name' => $server->name,
|
||||
'frequency' => $frequency,
|
||||
'timezone' => $serverTimezone,
|
||||
'execution_time' => $this->executionTime?->format('Y-m-d H:i:s T'),
|
||||
'current_time' => Carbon::now()->format('Y-m-d H:i:s T'),
|
||||
]);
|
||||
|
||||
DatabaseBackupJob::dispatch($backup);
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::channel('scheduled-errors')->error('Error processing backup', [
|
||||
'backup_id' => $backup->id,
|
||||
'error' => $e->getMessage(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private function processScheduledTasks(): void
|
||||
{
|
||||
$tasks = ScheduledTask::with(['service', 'application'])
|
||||
->where('enabled', true)
|
||||
->get();
|
||||
|
||||
foreach ($tasks as $task) {
|
||||
try {
|
||||
if (! $this->shouldProcessTask($task)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$server = $task->server();
|
||||
$serverTimezone = data_get($server->settings, 'server_timezone', config('app.timezone'));
|
||||
|
||||
if (validate_timezone($serverTimezone) === false) {
|
||||
$serverTimezone = config('app.timezone');
|
||||
}
|
||||
|
||||
$frequency = $task->frequency;
|
||||
if (isset(VALID_CRON_STRINGS[$frequency])) {
|
||||
$frequency = VALID_CRON_STRINGS[$frequency];
|
||||
}
|
||||
|
||||
if ($this->shouldRunNow($frequency, $serverTimezone)) {
|
||||
Log::channel('scheduled')->info('Dispatching task job', [
|
||||
'task_id' => $task->id,
|
||||
'task_name' => $task->name ?? 'unnamed',
|
||||
'server_name' => $server->name,
|
||||
'frequency' => $frequency,
|
||||
'timezone' => $serverTimezone,
|
||||
'execution_time' => $this->executionTime?->format('Y-m-d H:i:s T'),
|
||||
'current_time' => Carbon::now()->format('Y-m-d H:i:s T'),
|
||||
]);
|
||||
|
||||
ScheduledTaskJob::dispatch($task);
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::channel('scheduled-errors')->error('Error processing task', [
|
||||
'task_id' => $task->id,
|
||||
'error' => $e->getMessage(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private function shouldProcessBackup(ScheduledDatabaseBackup $backup): bool
|
||||
{
|
||||
if (blank(data_get($backup, 'database'))) {
|
||||
$backup->delete();
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
$server = $backup->server();
|
||||
if (blank($server)) {
|
||||
$backup->delete();
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
if ($server->isFunctional() === false) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (isCloud() && data_get($server->team->subscription, 'stripe_invoice_paid', false) === false && $server->team->id !== 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private function shouldProcessTask(ScheduledTask $task): bool
|
||||
{
|
||||
$service = $task->service;
|
||||
$application = $task->application;
|
||||
|
||||
$server = $task->server();
|
||||
if (blank($server)) {
|
||||
$task->delete();
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
if ($server->isFunctional() === false) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (isCloud() && data_get($server->team->subscription, 'stripe_invoice_paid', false) === false && $server->team->id !== 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (! $service && ! $application) {
|
||||
$task->delete();
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
if ($application && str($application->status)->contains('running') === false) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if ($service && str($service->status)->contains('running') === false) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private function shouldRunNow(string $frequency, string $timezone): bool
|
||||
{
|
||||
$cron = new CronExpression($frequency);
|
||||
|
||||
// Use the frozen execution time, not the current time
|
||||
// Fallback to current time if execution time is not set (shouldn't happen)
|
||||
$baseTime = $this->executionTime ?? Carbon::now();
|
||||
$executionTime = $baseTime->copy()->setTimezone($timezone);
|
||||
|
||||
return $cron->isDue($executionTime);
|
||||
}
|
||||
}
|
168
app/Jobs/ServerResourceManager.php
Normal file
168
app/Jobs/ServerResourceManager.php
Normal file
@@ -0,0 +1,168 @@
|
||||
<?php
|
||||
|
||||
namespace App\Jobs;
|
||||
|
||||
use App\Models\InstanceSettings;
|
||||
use App\Models\Server;
|
||||
use App\Models\Team;
|
||||
use Cron\CronExpression;
|
||||
use Illuminate\Bus\Queueable;
|
||||
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||
use Illuminate\Foundation\Bus\Dispatchable;
|
||||
use Illuminate\Queue\InteractsWithQueue;
|
||||
use Illuminate\Queue\Middleware\WithoutOverlapping;
|
||||
use Illuminate\Queue\SerializesModels;
|
||||
use Illuminate\Support\Carbon;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
|
||||
class ServerResourceManager implements ShouldQueue
|
||||
{
|
||||
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
|
||||
|
||||
/**
|
||||
* The time when this job execution started.
|
||||
*/
|
||||
private ?Carbon $executionTime = null;
|
||||
|
||||
private InstanceSettings $settings;
|
||||
|
||||
private string $instanceTimezone;
|
||||
|
||||
/**
|
||||
* Create a new job instance.
|
||||
*/
|
||||
public function __construct()
|
||||
{
|
||||
$this->onQueue('high');
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the middleware the job should pass through.
|
||||
*/
|
||||
public function middleware(): array
|
||||
{
|
||||
return [
|
||||
(new WithoutOverlapping('server-resource-manager'))
|
||||
->releaseAfter(60),
|
||||
];
|
||||
}
|
||||
|
||||
public function handle(): void
|
||||
{
|
||||
// Freeze the execution time at the start of the job
|
||||
$this->executionTime = Carbon::now();
|
||||
|
||||
$this->settings = instanceSettings();
|
||||
$this->instanceTimezone = $this->settings->instance_timezone ?: config('app.timezone');
|
||||
|
||||
if (validate_timezone($this->instanceTimezone) === false) {
|
||||
$this->instanceTimezone = config('app.timezone');
|
||||
}
|
||||
|
||||
Log::channel('scheduled')->info('ServerResourceManager started', [
|
||||
'execution_time' => $this->executionTime->format('Y-m-d H:i:s T'),
|
||||
]);
|
||||
|
||||
// Process server checks - don't let failures stop the job
|
||||
try {
|
||||
$this->processServerChecks();
|
||||
} catch (\Exception $e) {
|
||||
Log::channel('scheduled-errors')->error('Failed to process server checks', [
|
||||
'error' => $e->getMessage(),
|
||||
'trace' => $e->getTraceAsString(),
|
||||
]);
|
||||
}
|
||||
|
||||
Log::channel('scheduled')->info('ServerResourceManager completed');
|
||||
}
|
||||
|
||||
private function processServerChecks(): void
|
||||
{
|
||||
$servers = $this->getServers();
|
||||
|
||||
foreach ($servers as $server) {
|
||||
try {
|
||||
$this->processServer($server);
|
||||
} catch (\Exception $e) {
|
||||
Log::channel('scheduled-errors')->error('Error processing server', [
|
||||
'server_id' => $server->id,
|
||||
'server_name' => $server->name,
|
||||
'error' => $e->getMessage(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private function getServers()
|
||||
{
|
||||
$allServers = Server::where('ip', '!=', '1.2.3.4');
|
||||
|
||||
if (isCloud()) {
|
||||
$servers = $allServers->whereRelation('team.subscription', 'stripe_invoice_paid', true)->get();
|
||||
$own = Team::find(0)->servers;
|
||||
|
||||
return $servers->merge($own);
|
||||
} else {
|
||||
return $allServers->get();
|
||||
}
|
||||
}
|
||||
|
||||
private function processServer(Server $server): void
|
||||
{
|
||||
$serverTimezone = data_get($server->settings, 'server_timezone', $this->instanceTimezone);
|
||||
if (validate_timezone($serverTimezone) === false) {
|
||||
$serverTimezone = config('app.timezone');
|
||||
}
|
||||
|
||||
// Sentinel check
|
||||
$lastSentinelUpdate = $server->sentinel_updated_at;
|
||||
if (Carbon::parse($lastSentinelUpdate)->isBefore($this->executionTime->subSeconds($server->waitBeforeDoingSshCheck()))) {
|
||||
// Dispatch ServerCheckJob if due
|
||||
$checkFrequency = isCloud() ? '*/5 * * * *' : '* * * * *'; // Every 5 min for cloud, every minute for self-hosted
|
||||
if ($this->shouldRunNow($checkFrequency, $serverTimezone)) {
|
||||
ServerCheckJob::dispatch($server);
|
||||
}
|
||||
|
||||
// Dispatch ServerStorageCheckJob if due
|
||||
$serverDiskUsageCheckFrequency = data_get($server->settings, 'server_disk_usage_check_frequency', '0 * * * *');
|
||||
if (isset(VALID_CRON_STRINGS[$serverDiskUsageCheckFrequency])) {
|
||||
$serverDiskUsageCheckFrequency = VALID_CRON_STRINGS[$serverDiskUsageCheckFrequency];
|
||||
}
|
||||
if ($this->shouldRunNow($serverDiskUsageCheckFrequency, $serverTimezone)) {
|
||||
ServerStorageCheckJob::dispatch($server);
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatch DockerCleanupJob if due
|
||||
$dockerCleanupFrequency = data_get($server->settings, 'docker_cleanup_frequency', '0 * * * *');
|
||||
if (isset(VALID_CRON_STRINGS[$dockerCleanupFrequency])) {
|
||||
$dockerCleanupFrequency = VALID_CRON_STRINGS[$dockerCleanupFrequency];
|
||||
}
|
||||
if ($this->shouldRunNow($dockerCleanupFrequency, $serverTimezone)) {
|
||||
DockerCleanupJob::dispatch($server);
|
||||
}
|
||||
|
||||
// Dispatch ServerPatchCheckJob if due (weekly)
|
||||
if ($this->shouldRunNow('0 0 * * 0', $serverTimezone)) { // Weekly on Sunday at midnight
|
||||
ServerPatchCheckJob::dispatch($server);
|
||||
}
|
||||
|
||||
// Dispatch Sentinel restart if due (daily for Sentinel-enabled servers)
|
||||
if ($server->isSentinelEnabled() && $this->shouldRunNow('0 0 * * *', $serverTimezone)) {
|
||||
dispatch(function () use ($server) {
|
||||
$server->restartContainer('coolify-sentinel');
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private function shouldRunNow(string $frequency, string $timezone): bool
|
||||
{
|
||||
$cron = new CronExpression($frequency);
|
||||
|
||||
// Use the frozen execution time, not the current time
|
||||
$baseTime = $this->executionTime ?? Carbon::now();
|
||||
$executionTime = $baseTime->copy()->setTimezone($timezone);
|
||||
|
||||
return $cron->isDue($executionTime);
|
||||
}
|
||||
}
|
@@ -122,15 +122,15 @@ return [
|
||||
'scheduled' => [
|
||||
'driver' => 'daily',
|
||||
'path' => storage_path('logs/scheduled.log'),
|
||||
'level' => env('LOG_LEVEL', 'debug'),
|
||||
'days' => 1, // Keep logs for 1 day only (truncated daily)
|
||||
'level' => 'debug',
|
||||
'days' => 1,
|
||||
],
|
||||
|
||||
'scheduled-errors' => [
|
||||
'driver' => 'daily',
|
||||
'path' => storage_path('logs/scheduled-errors.log'),
|
||||
'level' => 'error',
|
||||
'days' => 7, // Keep error logs for 7 days
|
||||
'level' => 'debug',
|
||||
'days' => 7,
|
||||
],
|
||||
],
|
||||
|
||||
|
Reference in New Issue
Block a user