Merge pull request #3364 from peaklabs-dev/improve-persist-ssh-sessions
Feat: Implement SSH multiplexing to reduce the number of SSH authentications in remote processes
This commit is contained in:
@@ -4,6 +4,7 @@ namespace App\Console;
|
||||
|
||||
use App\Jobs\CheckForUpdatesJob;
|
||||
use App\Jobs\CleanupInstanceStuffsJob;
|
||||
use App\Jobs\CleanupStaleMultiplexedConnections;
|
||||
use App\Jobs\DatabaseBackupJob;
|
||||
use App\Jobs\DockerCleanupJob;
|
||||
use App\Jobs\PullHelperImageJob;
|
||||
@@ -29,7 +30,8 @@ class Kernel extends ConsoleKernel
|
||||
$this->all_servers = Server::all();
|
||||
$settings = InstanceSettings::get();
|
||||
|
||||
$schedule->command('telescope:prune')->daily();
|
||||
$schedule->job(new CleanupStaleMultiplexedConnections)->hourly();
|
||||
|
||||
if (isDev()) {
|
||||
// Instance Jobs
|
||||
$schedule->command('horizon:snapshot')->everyMinute();
|
||||
@@ -39,6 +41,8 @@ class Kernel extends ConsoleKernel
|
||||
$this->check_resources($schedule);
|
||||
$this->check_scheduled_tasks($schedule);
|
||||
$schedule->command('uploads:clear')->everyTwoMinutes();
|
||||
|
||||
$schedule->command('telescope:prune')->daily();
|
||||
} else {
|
||||
// Instance Jobs
|
||||
$schedule->command('horizon:snapshot')->everyFiveMinutes();
|
||||
|
||||
37
app/Jobs/CleanupStaleMultiplexedConnections.php
Normal file
37
app/Jobs/CleanupStaleMultiplexedConnections.php
Normal file
@@ -0,0 +1,37 @@
|
||||
<?php
|
||||
|
||||
namespace App\Jobs;
|
||||
|
||||
use App\Models\Server;
|
||||
use Illuminate\Bus\Queueable;
|
||||
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||
use Illuminate\Foundation\Bus\Dispatchable;
|
||||
use Illuminate\Queue\InteractsWithQueue;
|
||||
use Illuminate\Queue\SerializesModels;
|
||||
use Illuminate\Support\Facades\Process;
|
||||
|
||||
class CleanupStaleMultiplexedConnections implements ShouldQueue
|
||||
{
|
||||
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
|
||||
|
||||
public function handle()
|
||||
{
|
||||
Server::chunk(100, function ($servers) {
|
||||
foreach ($servers as $server) {
|
||||
$this->cleanupStaleConnection($server);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private function cleanupStaleConnection(Server $server)
|
||||
{
|
||||
$muxSocket = "/tmp/mux_{$server->id}";
|
||||
$checkCommand = "ssh -O check -o ControlPath=$muxSocket {$server->user}@{$server->ip} 2>/dev/null";
|
||||
$checkProcess = Process::run($checkCommand);
|
||||
|
||||
if ($checkProcess->exitCode() !== 0) {
|
||||
$closeCommand = "ssh -O exit -o ControlPath=$muxSocket {$server->user}@{$server->ip} 2>/dev/null";
|
||||
Process::run($closeCommand);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -145,9 +145,20 @@ function generateSshCommand(Server $server, string $command)
|
||||
|
||||
$ssh_command = "timeout $timeout ssh ";
|
||||
|
||||
if (config('coolify.mux_enabled') && config('coolify.is_windows_docker_desktop') == false) {
|
||||
$ssh_command .= "-o ControlMaster=auto -o ControlPersist={$muxPersistTime} -o ControlPath=/var/www/html/storage/app/ssh/mux/{$server->muxFilename()} ";
|
||||
// Check if multiplexing is enabled
|
||||
$muxEnabled = config('constants.ssh.mux_enabled', true);
|
||||
ray('SSH Multiplexing Enabled:', $muxEnabled)->blue();
|
||||
|
||||
if ($muxEnabled) {
|
||||
// Always use multiplexing when enabled
|
||||
$muxSocket = "/var/www/html/storage/app/ssh/mux/{$server->muxFilename()}";
|
||||
$ssh_command .= "-o ControlMaster=auto -o ControlPath=$muxSocket -o ControlPersist={$muxPersistTime} ";
|
||||
ensureMultiplexedConnection($server);
|
||||
ray('Using SSH Multiplexing')->green();
|
||||
} else {
|
||||
ray('Not using SSH Multiplexing')->red();
|
||||
}
|
||||
|
||||
if (data_get($server, 'settings.is_cloudflare_tunnel')) {
|
||||
$ssh_command .= '-o ProxyCommand="/usr/local/bin/cloudflared access ssh --hostname %h" ';
|
||||
}
|
||||
@@ -169,8 +180,96 @@ function generateSshCommand(Server $server, string $command)
|
||||
|
||||
return $ssh_command;
|
||||
}
|
||||
|
||||
function ensureMultiplexedConnection(Server $server)
|
||||
{
|
||||
static $ensuredConnections = [];
|
||||
|
||||
if (isset($ensuredConnections[$server->id])) {
|
||||
if (!shouldResetMultiplexedConnection($server)) {
|
||||
ray('Using Existing Multiplexed Connection')->green();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
$muxSocket = "/var/www/html/storage/app/ssh/mux/{$server->muxFilename()}";
|
||||
$checkCommand = "ssh -O check -o ControlPath=$muxSocket {$server->user}@{$server->ip} 2>/dev/null";
|
||||
|
||||
$process = Process::run($checkCommand);
|
||||
|
||||
if ($process->exitCode() === 0) {
|
||||
ray('Existing Multiplexed Connection is Valid')->green();
|
||||
$ensuredConnections[$server->id] = [
|
||||
'timestamp' => now(),
|
||||
'muxSocket' => $muxSocket,
|
||||
];
|
||||
return;
|
||||
}
|
||||
|
||||
ray('Establishing New Multiplexed Connection')->orange();
|
||||
|
||||
$privateKeyLocation = savePrivateKeyToFs($server);
|
||||
$connectionTimeout = config('constants.ssh.connection_timeout');
|
||||
$serverInterval = config('constants.ssh.server_interval');
|
||||
$muxPersistTime = config('constants.ssh.mux_persist_time');
|
||||
|
||||
$establishCommand = "ssh -fNM -o ControlMaster=auto -o ControlPath=$muxSocket -o ControlPersist={$muxPersistTime} "
|
||||
. "-i {$privateKeyLocation} "
|
||||
. "-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null "
|
||||
. "-o PasswordAuthentication=no "
|
||||
. "-o ConnectTimeout=$connectionTimeout "
|
||||
. "-o ServerAliveInterval=$serverInterval "
|
||||
. "-o RequestTTY=no "
|
||||
. "-o LogLevel=ERROR "
|
||||
. "-p {$server->port} "
|
||||
. "{$server->user}@{$server->ip}";
|
||||
|
||||
$establishProcess = Process::run($establishCommand);
|
||||
|
||||
if ($establishProcess->exitCode() !== 0) {
|
||||
throw new \RuntimeException("Failed to establish multiplexed connection: " . $establishProcess->errorOutput());
|
||||
}
|
||||
|
||||
$ensuredConnections[$server->id] = [
|
||||
'timestamp' => now(),
|
||||
'muxSocket' => $muxSocket,
|
||||
];
|
||||
|
||||
ray('Established New Multiplexed Connection')->green();
|
||||
}
|
||||
|
||||
function shouldResetMultiplexedConnection(Server $server)
|
||||
{
|
||||
static $ensuredConnections = [];
|
||||
|
||||
if (!isset($ensuredConnections[$server->id])) {
|
||||
return true;
|
||||
}
|
||||
|
||||
$lastEnsured = $ensuredConnections[$server->id]['timestamp'];
|
||||
$muxPersistTime = config('constants.ssh.mux_persist_time');
|
||||
$resetInterval = strtotime($muxPersistTime) - time();
|
||||
|
||||
return $lastEnsured->addSeconds($resetInterval)->isPast();
|
||||
}
|
||||
|
||||
function resetMultiplexedConnection(Server $server)
|
||||
{
|
||||
static $ensuredConnections = [];
|
||||
|
||||
if (isset($ensuredConnections[$server->id])) {
|
||||
$muxSocket = $ensuredConnections[$server->id]['muxSocket'];
|
||||
$closeCommand = "ssh -O exit -o ControlPath=$muxSocket {$server->user}@{$server->ip}";
|
||||
Process::run($closeCommand);
|
||||
unset($ensuredConnections[$server->id]);
|
||||
}
|
||||
}
|
||||
|
||||
function instant_remote_process(Collection|array $command, Server $server, bool $throwError = true, bool $no_sudo = false): ?string
|
||||
{
|
||||
static $processCount = 0;
|
||||
$processCount++;
|
||||
|
||||
$timeout = config('constants.ssh.command_timeout');
|
||||
if ($command instanceof Collection) {
|
||||
$command = $command->toArray();
|
||||
@@ -179,10 +278,18 @@ function instant_remote_process(Collection|array $command, Server $server, bool
|
||||
$command = parseCommandsByLineForSudo(collect($command), $server);
|
||||
}
|
||||
$command_string = implode("\n", $command);
|
||||
$ssh_command = generateSshCommand($server, $command_string, $no_sudo);
|
||||
$process = Process::timeout($timeout)->run($ssh_command);
|
||||
|
||||
$start_time = microtime(true);
|
||||
$sshCommand = generateSshCommand($server, $command_string);
|
||||
$process = Process::timeout($timeout)->run($sshCommand);
|
||||
$end_time = microtime(true);
|
||||
|
||||
$execution_time = ($end_time - $start_time) * 1000; // Convert to milliseconds
|
||||
ray('SSH command execution time:', $execution_time . ' ms')->orange();
|
||||
|
||||
$output = trim($process->output());
|
||||
$exitCode = $process->exitCode();
|
||||
|
||||
if ($exitCode !== 0) {
|
||||
if (! $throwError) {
|
||||
return null;
|
||||
@@ -222,7 +329,6 @@ function decode_remote_command_output(?ApplicationDeploymentQueue $application_d
|
||||
if (is_null($application_deployment_queue)) {
|
||||
return collect([]);
|
||||
}
|
||||
// ray(data_get($application_deployment_queue, 'logs'));
|
||||
try {
|
||||
$decoded = json_decode(
|
||||
data_get($application_deployment_queue, 'logs'),
|
||||
@@ -232,7 +338,6 @@ function decode_remote_command_output(?ApplicationDeploymentQueue $application_d
|
||||
} catch (\JsonException $exception) {
|
||||
return collect([]);
|
||||
}
|
||||
// ray($decoded );
|
||||
$seenCommands = collect();
|
||||
$formatted = collect($decoded);
|
||||
if (! $is_debug_enabled) {
|
||||
@@ -293,6 +398,10 @@ function remove_mux_and_private_key(Server $server)
|
||||
{
|
||||
$muxFilename = $server->muxFilename();
|
||||
$privateKeyLocation = savePrivateKeyToFs($server);
|
||||
|
||||
$closeCommand = "ssh -O exit -o ControlPath=/var/www/html/storage/app/ssh/mux/{$muxFilename} {$server->user}@{$server->ip}";
|
||||
Process::run($closeCommand);
|
||||
|
||||
Storage::disk('ssh-mux')->delete($muxFilename);
|
||||
Storage::disk('ssh-keys')->delete($privateKeyLocation);
|
||||
}
|
||||
@@ -302,7 +411,10 @@ function refresh_server_connection(?PrivateKey $private_key = null)
|
||||
return;
|
||||
}
|
||||
foreach ($private_key->servers as $server) {
|
||||
Storage::disk('ssh-mux')->delete($server->muxFilename());
|
||||
$muxFilename = $server->muxFilename();
|
||||
$closeCommand = "ssh -O exit -o ControlPath=/var/www/html/storage/app/ssh/mux/{$muxFilename} {$server->user}@{$server->ip}";
|
||||
Process::run($closeCommand);
|
||||
Storage::disk('ssh-mux')->delete($muxFilename);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -312,24 +424,17 @@ function checkRequiredCommands(Server $server)
|
||||
foreach ($commands as $command) {
|
||||
$commandFound = instant_remote_process(["docker run --rm --privileged --net=host --pid=host --ipc=host --volume /:/host busybox chroot /host bash -c 'command -v {$command}'"], $server, false);
|
||||
if ($commandFound) {
|
||||
ray($command.' found');
|
||||
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
instant_remote_process(["docker run --rm --privileged --net=host --pid=host --ipc=host --volume /:/host busybox chroot /host bash -c 'apt update && apt install -y {$command}'"], $server);
|
||||
} catch (\Throwable $e) {
|
||||
ray('could not install '.$command);
|
||||
ray($e);
|
||||
break;
|
||||
}
|
||||
$commandFound = instant_remote_process(["docker run --rm --privileged --net=host --pid=host --ipc=host --volume /:/host busybox chroot /host bash -c 'command -v {$command}'"], $server, false);
|
||||
if ($commandFound) {
|
||||
ray($command.' found');
|
||||
|
||||
continue;
|
||||
}
|
||||
ray('could not install '.$command);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,8 @@ return [
|
||||
'contact' => 'https://coolify.io/docs/contact',
|
||||
],
|
||||
'ssh' => [
|
||||
'mux_persist_time' => env('SSH_MUX_PERSIST_TIME', '1m'),
|
||||
'mux_enabled' => env('SSH_MUX_ENABLED', true),
|
||||
'mux_persist_time' => env('SSH_MUX_PERSIST_TIME', '1h'),
|
||||
'connection_timeout' => 10,
|
||||
'server_interval' => 20,
|
||||
'command_timeout' => 7200,
|
||||
|
||||
Reference in New Issue
Block a user