Fix: Few multiplexing things
This commit is contained in:
@@ -14,7 +14,7 @@ class SshMultiplexingHelper
|
||||
{
|
||||
$privateKey = PrivateKey::findOrFail($server->private_key_id);
|
||||
$sshKeyLocation = $privateKey->getKeyLocation();
|
||||
$muxFilename = '/var/www/html/storage/app/ssh/mux/' . $server->muxFilename();
|
||||
$muxFilename = '/var/www/html/storage/app/ssh/mux/mux_' . $server->uuid;
|
||||
|
||||
return [
|
||||
'sshKeyLocation' => $sshKeyLocation,
|
||||
@@ -25,12 +25,12 @@ class SshMultiplexingHelper
|
||||
public static function ensureMultiplexedConnection(Server $server)
|
||||
{
|
||||
if (!self::isMultiplexingEnabled()) {
|
||||
ray('SSH Multiplexing: DISABLED')->red();
|
||||
// ray('SSH Multiplexing: DISABLED')->red();
|
||||
return;
|
||||
}
|
||||
|
||||
ray('SSH Multiplexing: ENABLED')->green();
|
||||
ray('Ensuring multiplexed connection for server:', $server->id);
|
||||
// ray('SSH Multiplexing: ENABLED')->green();
|
||||
// ray('Ensuring multiplexed connection for server:', $server);
|
||||
|
||||
$sshConfig = self::serverSshConfiguration($server);
|
||||
$muxSocket = $sshConfig['muxFilename'];
|
||||
@@ -42,18 +42,16 @@ class SshMultiplexingHelper
|
||||
$process = Process::run($checkCommand);
|
||||
|
||||
if ($process->exitCode() !== 0) {
|
||||
ray('SSH Multiplexing: Existing connection check failed or not found')->orange();
|
||||
ray('Establishing new connection');
|
||||
// ray('SSH Multiplexing: Existing connection check failed or not found')->orange();
|
||||
// ray('Establishing new connection');
|
||||
self::establishNewMultiplexedConnection($server);
|
||||
} else {
|
||||
ray('SSH Multiplexing: Existing connection is valid')->green();
|
||||
// ray('SSH Multiplexing: Existing connection is valid')->green();
|
||||
}
|
||||
}
|
||||
|
||||
public static function establishNewMultiplexedConnection(Server $server)
|
||||
{
|
||||
ray('SSH Multiplexing: Establishing new connection for server:', $server->id);
|
||||
|
||||
$sshConfig = self::serverSshConfiguration($server);
|
||||
$sshKeyLocation = $sshConfig['sshKeyLocation'];
|
||||
$muxSocket = $sshConfig['muxFilename'];
|
||||
@@ -63,27 +61,20 @@ class SshMultiplexingHelper
|
||||
$muxPersistTime = config('constants.ssh.mux_persist_time');
|
||||
|
||||
$establishCommand = "ssh -fNM -o ControlMaster=auto -o ControlPath=$muxSocket -o ControlPersist={$muxPersistTime} "
|
||||
. "-i {$sshKeyLocation} "
|
||||
. '-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null '
|
||||
. '-o PasswordAuthentication=no '
|
||||
. "-o ConnectTimeout=$connectionTimeout "
|
||||
. "-o ServerAliveInterval=$serverInterval "
|
||||
. '-o RequestTTY=no '
|
||||
. '-o LogLevel=ERROR '
|
||||
. "-p {$server->port} "
|
||||
. self::getCommonSshOptions($server, $sshKeyLocation, $connectionTimeout, $serverInterval)
|
||||
. "{$server->user}@{$server->ip}";
|
||||
|
||||
$establishProcess = Process::run($establishCommand);
|
||||
|
||||
if ($establishProcess->exitCode() !== 0) {
|
||||
ray('SSH Multiplexing: Failed to establish connection', $establishProcess->errorOutput())->red();
|
||||
throw new \RuntimeException('Failed to establish multiplexed connection: ' . $establishProcess->errorOutput());
|
||||
}
|
||||
|
||||
ray('SSH Multiplexing: Connection established successfully')->green();
|
||||
|
||||
$muxContent = "Multiplexed connection established at " . now()->toDateTimeString();
|
||||
Storage::disk('ssh-mux')->put(basename($muxSocket), $muxContent);
|
||||
$muxFilename = basename($muxSocket);
|
||||
if (!Storage::disk('ssh-mux')->put($muxFilename, $muxContent)) {
|
||||
throw new \RuntimeException('Failed to write mux file to disk: ' . $muxFilename);
|
||||
}
|
||||
}
|
||||
|
||||
public static function removeMuxFile(Server $server)
|
||||
@@ -93,8 +84,6 @@ class SshMultiplexingHelper
|
||||
|
||||
$closeCommand = "ssh -O exit -o ControlPath=/var/www/html/storage/app/ssh/mux/{$muxFilename} {$server->user}@{$server->ip}";
|
||||
Process::run($closeCommand);
|
||||
|
||||
Storage::disk('ssh-mux')->delete($muxFilename);
|
||||
}
|
||||
|
||||
public static function generateScpCommand(Server $server, string $source, string $dest)
|
||||
@@ -104,29 +93,26 @@ class SshMultiplexingHelper
|
||||
$muxSocket = $sshConfig['muxFilename'];
|
||||
|
||||
$timeout = config('constants.ssh.command_timeout');
|
||||
$connectionTimeout = config('constants.ssh.connection_timeout');
|
||||
$serverInterval = config('constants.ssh.server_interval');
|
||||
|
||||
$scp_command = "timeout $timeout scp ";
|
||||
|
||||
if (self::isMultiplexingEnabled()) {
|
||||
ray('SSH Multiplexing: Enabled for SCP command')->green();
|
||||
// ray('SSH Multiplexing: Enabled for SCP command')->green();
|
||||
$muxPersistTime = config('constants.ssh.mux_persist_time');
|
||||
$scp_command .= "-o ControlMaster=auto -o ControlPath=$muxSocket -o ControlPersist={$muxPersistTime} ";
|
||||
self::ensureMultiplexedConnection($server);
|
||||
|
||||
// Add this line to verify multiplexing is being used
|
||||
ray('SSH Multiplexing: Verifying usage')->blue();
|
||||
// ray('SSH Multiplexing: Verifying usage')->blue();
|
||||
$checkCommand = "ssh -O check -o ControlPath=$muxSocket {$server->user}@{$server->ip}";
|
||||
$checkProcess = Process::run($checkCommand);
|
||||
ray('SSH Multiplexing: ' . ($checkProcess->exitCode() === 0 ? 'Active' : 'Not Active'))->color($checkProcess->exitCode() === 0 ? 'green' : 'red');
|
||||
// ray('SSH Multiplexing: ' . ($checkProcess->exitCode() === 0 ? 'Active' : 'Not Active'))->color($checkProcess->exitCode() === 0 ? 'green' : 'red');
|
||||
} else {
|
||||
ray('SSH Multiplexing: Disabled for SCP command')->orange();
|
||||
// ray('SSH Multiplexing: Disabled for SCP command')->orange();
|
||||
}
|
||||
|
||||
self::addCloudflareProxyCommand($scp_command, $server);
|
||||
|
||||
$scp_command .= self::getCommonSshOptions($server, $sshKeyLocation, $connectionTimeout, $serverInterval);
|
||||
$scp_command .= self::getCommonSshOptions($server, $sshKeyLocation, config('constants.ssh.connection_timeout'), config('constants.ssh.server_interval'));
|
||||
$scp_command .= "{$source} {$server->user}@{$server->ip}:{$dest}";
|
||||
|
||||
return $scp_command;
|
||||
@@ -143,29 +129,26 @@ class SshMultiplexingHelper
|
||||
$muxSocket = $sshConfig['muxFilename'];
|
||||
|
||||
$timeout = config('constants.ssh.command_timeout');
|
||||
$connectionTimeout = config('constants.ssh.connection_timeout');
|
||||
$serverInterval = config('constants.ssh.server_interval');
|
||||
|
||||
$ssh_command = "timeout $timeout ssh ";
|
||||
|
||||
if (self::isMultiplexingEnabled()) {
|
||||
ray('SSH Multiplexing: Enabled for SSH command')->green();
|
||||
// ray('SSH Multiplexing: Enabled for SSH command')->green();
|
||||
$muxPersistTime = config('constants.ssh.mux_persist_time');
|
||||
$ssh_command .= "-o ControlMaster=auto -o ControlPath=$muxSocket -o ControlPersist={$muxPersistTime} ";
|
||||
self::ensureMultiplexedConnection($server);
|
||||
|
||||
// Add this line to verify multiplexing is being used
|
||||
ray('SSH Multiplexing: Verifying usage')->blue();
|
||||
// ray('SSH Multiplexing: Verifying usage')->blue();
|
||||
$checkCommand = "ssh -O check -o ControlPath=$muxSocket {$server->user}@{$server->ip}";
|
||||
$checkProcess = Process::run($checkCommand);
|
||||
ray('SSH Multiplexing: ' . ($checkProcess->exitCode() === 0 ? 'Active' : 'Not Active'))->color($checkProcess->exitCode() === 0 ? 'green' : 'red');
|
||||
// ray('SSH Multiplexing: ' . ($checkProcess->exitCode() === 0 ? 'Active' : 'Not Active'))->color($checkProcess->exitCode() === 0 ? 'green' : 'red');
|
||||
} else {
|
||||
ray('SSH Multiplexing: Disabled for SSH command')->orange();
|
||||
// ray('SSH Multiplexing: Disabled for SSH command')->orange();
|
||||
}
|
||||
|
||||
self::addCloudflareProxyCommand($ssh_command, $server);
|
||||
|
||||
$ssh_command .= self::getCommonSshOptions($server, $sshKeyLocation, $connectionTimeout, $serverInterval);
|
||||
$ssh_command .= self::getCommonSshOptions($server, $sshKeyLocation, config('constants.ssh.connection_timeout'), config('constants.ssh.server_interval'));
|
||||
|
||||
$command = "PATH=\$PATH:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/host/usr/local/sbin:/host/usr/local/bin:/host/usr/sbin:/host/usr/bin:/host/sbin:/host/bin && $command";
|
||||
$delimiter = Hash::make($command);
|
||||
@@ -181,7 +164,7 @@ class SshMultiplexingHelper
|
||||
private static function isMultiplexingEnabled(): bool
|
||||
{
|
||||
$isEnabled = config('constants.ssh.mux_enabled') && !config('coolify.is_windows_docker_desktop');
|
||||
ray('SSH Multiplexing Status:', $isEnabled ? 'ENABLED' : 'DISABLED')->color($isEnabled ? 'green' : 'red');
|
||||
// ray('SSH Multiplexing Status:', $isEnabled ? 'ENABLED' : 'DISABLED')->color($isEnabled ? 'green' : 'red');
|
||||
return $isEnabled;
|
||||
}
|
||||
|
||||
|
@@ -9,6 +9,8 @@ use Illuminate\Foundation\Bus\Dispatchable;
|
||||
use Illuminate\Queue\InteractsWithQueue;
|
||||
use Illuminate\Queue\SerializesModels;
|
||||
use Illuminate\Support\Facades\Process;
|
||||
use Illuminate\Support\Facades\Storage;
|
||||
use Carbon\Carbon;
|
||||
|
||||
class CleanupStaleMultiplexedConnections implements ShouldQueue
|
||||
{
|
||||
@@ -16,22 +18,64 @@ class CleanupStaleMultiplexedConnections implements ShouldQueue
|
||||
|
||||
public function handle()
|
||||
{
|
||||
Server::chunk(100, function ($servers) {
|
||||
foreach ($servers as $server) {
|
||||
$this->cleanupStaleConnection($server);
|
||||
}
|
||||
});
|
||||
$this->cleanupStaleConnections();
|
||||
$this->cleanupNonExistentServerConnections();
|
||||
}
|
||||
|
||||
private function cleanupStaleConnection(Server $server)
|
||||
private function cleanupStaleConnections()
|
||||
{
|
||||
$muxSocket = "/tmp/mux_{$server->id}";
|
||||
$checkCommand = "ssh -O check -o ControlPath=$muxSocket {$server->user}@{$server->ip} 2>/dev/null";
|
||||
$checkProcess = Process::run($checkCommand);
|
||||
$muxFiles = Storage::disk('ssh-mux')->files();
|
||||
|
||||
if ($checkProcess->exitCode() !== 0) {
|
||||
$closeCommand = "ssh -O exit -o ControlPath=$muxSocket {$server->user}@{$server->ip} 2>/dev/null";
|
||||
Process::run($closeCommand);
|
||||
foreach ($muxFiles as $muxFile) {
|
||||
$serverUuid = $this->extractServerUuidFromMuxFile($muxFile);
|
||||
$server = Server::where('uuid', $serverUuid)->first();
|
||||
|
||||
if (!$server) {
|
||||
$this->removeMultiplexFile($muxFile);
|
||||
continue;
|
||||
}
|
||||
|
||||
$muxSocket = "/var/www/html/storage/app/ssh/mux/{$muxFile}";
|
||||
$checkCommand = "ssh -O check -o ControlPath={$muxSocket} {$server->user}@{$server->ip} 2>/dev/null";
|
||||
$checkProcess = Process::run($checkCommand);
|
||||
|
||||
if ($checkProcess->exitCode() !== 0) {
|
||||
$this->removeMultiplexFile($muxFile);
|
||||
} else {
|
||||
$muxContent = Storage::disk('ssh-mux')->get($muxFile);
|
||||
$establishedAt = Carbon::parse(substr($muxContent, 37));
|
||||
$expirationTime = $establishedAt->addSeconds(config('constants.ssh.mux_persist_time'));
|
||||
|
||||
if (Carbon::now()->isAfter($expirationTime)) {
|
||||
$this->removeMultiplexFile($muxFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private function cleanupNonExistentServerConnections()
|
||||
{
|
||||
$muxFiles = Storage::disk('ssh-mux')->files();
|
||||
$existingServerUuids = Server::pluck('uuid')->toArray();
|
||||
|
||||
foreach ($muxFiles as $muxFile) {
|
||||
$serverUuid = $this->extractServerUuidFromMuxFile($muxFile);
|
||||
if (!in_array($serverUuid, $existingServerUuids)) {
|
||||
$this->removeMultiplexFile($muxFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private function extractServerUuidFromMuxFile($muxFile)
|
||||
{
|
||||
return substr($muxFile, 4);
|
||||
}
|
||||
|
||||
private function removeMultiplexFile($muxFile)
|
||||
{
|
||||
$muxSocket = "/var/www/html/storage/app/ssh/mux/{$muxFile}";
|
||||
$closeCommand = "ssh -O exit -o ControlPath={$muxSocket} localhost 2>/dev/null";
|
||||
Process::run($closeCommand);
|
||||
Storage::disk('ssh-mux')->delete($muxFile);
|
||||
}
|
||||
}
|
||||
|
@@ -967,7 +967,7 @@ $schema://$host {
|
||||
public function validateConnection($isManualCheck = true)
|
||||
{
|
||||
config()->set('constants.ssh.mux_enabled', !$isManualCheck);
|
||||
ray('Manual Check: ' . ($isManualCheck ? 'true' : 'false'));
|
||||
// ray('Manual Check: ' . ($isManualCheck ? 'true' : 'false'));
|
||||
|
||||
$server = Server::find($this->id);
|
||||
if (! $server) {
|
||||
|
@@ -60,40 +60,28 @@ function remote_process(
|
||||
|
||||
function instant_scp(string $source, string $dest, Server $server, $throwError = true)
|
||||
{
|
||||
$timeout = config('constants.ssh.command_timeout');
|
||||
$scp_command = SshMultiplexingHelper::generateScpCommand($server, $source, $dest);
|
||||
$process = Process::timeout($timeout)->run($scp_command);
|
||||
$process = Process::timeout(config('constants.ssh.command_timeout'))->run($scp_command);
|
||||
$output = trim($process->output());
|
||||
$exitCode = $process->exitCode();
|
||||
if ($exitCode !== 0) {
|
||||
if (! $throwError) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return excludeCertainErrors($process->errorOutput(), $exitCode);
|
||||
return $throwError ? excludeCertainErrors($process->errorOutput(), $exitCode) : null;
|
||||
}
|
||||
if ($output === 'null') {
|
||||
$output = null;
|
||||
}
|
||||
|
||||
return $output;
|
||||
return $output === 'null' ? null : $output;
|
||||
}
|
||||
|
||||
function instant_remote_process(Collection|array $command, Server $server, bool $throwError = true, bool $no_sudo = false): ?string
|
||||
{
|
||||
$timeout = config('constants.ssh.command_timeout');
|
||||
if ($command instanceof Collection) {
|
||||
$command = $command->toArray();
|
||||
}
|
||||
if ($server->isNonRoot() && ! $no_sudo) {
|
||||
$command = $command instanceof Collection ? $command->toArray() : $command;
|
||||
if ($server->isNonRoot() && !$no_sudo) {
|
||||
$command = parseCommandsByLineForSudo(collect($command), $server);
|
||||
}
|
||||
$command_string = implode("\n", $command);
|
||||
|
||||
$start_time = microtime(true);
|
||||
// $start_time = microtime(true);
|
||||
$sshCommand = SshMultiplexingHelper::generateSshCommand($server, $command_string);
|
||||
$process = Process::timeout($timeout)->run($sshCommand);
|
||||
$end_time = microtime(true);
|
||||
$process = Process::timeout(config('constants.ssh.command_timeout'))->run($sshCommand);
|
||||
// $end_time = microtime(true);
|
||||
|
||||
// $execution_time = ($end_time - $start_time) * 1000; // Convert to milliseconds
|
||||
// ray('SSH command execution time:', $execution_time.' ms')->orange();
|
||||
@@ -102,17 +90,9 @@ function instant_remote_process(Collection|array $command, Server $server, bool
|
||||
$exitCode = $process->exitCode();
|
||||
|
||||
if ($exitCode !== 0) {
|
||||
if (! $throwError) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return excludeCertainErrors($process->errorOutput(), $exitCode);
|
||||
return $throwError ? excludeCertainErrors($process->errorOutput(), $exitCode) : null;
|
||||
}
|
||||
if ($output === 'null') {
|
||||
$output = null;
|
||||
}
|
||||
|
||||
return $output;
|
||||
return $output === 'null' ? null : $output;
|
||||
}
|
||||
|
||||
function excludeCertainErrors(string $errorOutput, ?int $exitCode = null)
|
||||
@@ -121,13 +101,7 @@ function excludeCertainErrors(string $errorOutput, ?int $exitCode = null)
|
||||
'Permission denied (publickey',
|
||||
'Could not resolve hostname',
|
||||
]);
|
||||
$ignored = false;
|
||||
foreach ($ignoredErrors as $ignoredError) {
|
||||
if (Str::contains($errorOutput, $ignoredError)) {
|
||||
$ignored = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
$ignored = $ignoredErrors->contains(fn($error) => Str::contains($errorOutput, $error));
|
||||
if ($ignored) {
|
||||
// TODO: Create new exception and disable in sentry
|
||||
throw new \RuntimeException($errorOutput, $exitCode);
|
||||
@@ -137,11 +111,11 @@ function excludeCertainErrors(string $errorOutput, ?int $exitCode = null)
|
||||
|
||||
function decode_remote_command_output(?ApplicationDeploymentQueue $application_deployment_queue = null): Collection
|
||||
{
|
||||
$application = Application::find(data_get($application_deployment_queue, 'application_id'));
|
||||
$is_debug_enabled = data_get($application, 'settings.is_debug_enabled');
|
||||
if (is_null($application_deployment_queue)) {
|
||||
return collect([]);
|
||||
}
|
||||
$application = Application::find(data_get($application_deployment_queue, 'application_id'));
|
||||
$is_debug_enabled = data_get($application, 'settings.is_debug_enabled');
|
||||
try {
|
||||
$decoded = json_decode(
|
||||
data_get($application_deployment_queue, 'logs'),
|
||||
@@ -153,20 +127,19 @@ function decode_remote_command_output(?ApplicationDeploymentQueue $application_d
|
||||
}
|
||||
$seenCommands = collect();
|
||||
$formatted = collect($decoded);
|
||||
if (! $is_debug_enabled) {
|
||||
if (!$is_debug_enabled) {
|
||||
$formatted = $formatted->filter(fn ($i) => $i['hidden'] === false ?? false);
|
||||
}
|
||||
$formatted = $formatted
|
||||
return $formatted
|
||||
->sortBy(fn ($i) => data_get($i, 'order'))
|
||||
->map(function ($i) {
|
||||
data_set($i, 'timestamp', Carbon::parse(data_get($i, 'timestamp'))->format('Y-M-d H:i:s.u'));
|
||||
|
||||
return $i;
|
||||
})
|
||||
->reduce(function ($deploymentLogLines, $logItem) use ($seenCommands) {
|
||||
$command = data_get($logItem, 'command');
|
||||
$isStderr = data_get($logItem, 'type') === 'stderr';
|
||||
$isNewCommand = ! is_null($command) && ! $seenCommands->first(function ($seenCommand) use ($logItem) {
|
||||
$isNewCommand = !is_null($command) && !$seenCommands->first(function ($seenCommand) use ($logItem) {
|
||||
return data_get($seenCommand, 'command') === data_get($logItem, 'command') && data_get($seenCommand, 'batch') === data_get($logItem, 'batch');
|
||||
});
|
||||
|
||||
@@ -198,14 +171,11 @@ function decode_remote_command_output(?ApplicationDeploymentQueue $application_d
|
||||
|
||||
return $deploymentLogLines;
|
||||
}, collect());
|
||||
|
||||
return $formatted;
|
||||
}
|
||||
|
||||
function remove_iip($text)
|
||||
{
|
||||
$text = preg_replace('/x-access-token:.*?(?=@)/', 'x-access-token:'.REDACTED, $text);
|
||||
|
||||
return preg_replace('/\x1b\[[0-9;]*m/', '', $text);
|
||||
}
|
||||
|
||||
@@ -233,9 +203,8 @@ function checkRequiredCommands(Server $server)
|
||||
break;
|
||||
}
|
||||
$commandFound = instant_remote_process(["docker run --rm --privileged --net=host --pid=host --ipc=host --volume /:/host busybox chroot /host bash -c 'command -v {$command}'"], $server, false);
|
||||
if ($commandFound) {
|
||||
continue;
|
||||
if (!$commandFound) {
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@@ -6,9 +6,8 @@ return [
|
||||
'contact' => 'https://coolify.io/docs/contact',
|
||||
],
|
||||
'ssh' => [
|
||||
// Using MUX
|
||||
'mux_enabled' => env('MUX_ENABLED', env('SSH_MUX_ENABLED', true), true),
|
||||
'mux_persist_time' => env('SSH_MUX_PERSIST_TIME', '1h'),
|
||||
'mux_enabled' => env('MUX_ENABLED', env('SSH_MUX_ENABLED', true)),
|
||||
'mux_persist_time' => env('SSH_MUX_PERSIST_TIME', 3600),
|
||||
'connection_timeout' => 10,
|
||||
'server_interval' => 20,
|
||||
'command_timeout' => 7200,
|
||||
|
Reference in New Issue
Block a user