From a154fe80e8cbc8944890497a93445969c79bf4e5 Mon Sep 17 00:00:00 2001 From: Mark Friedrich Date: Sun, 24 Feb 2019 22:24:54 +0100 Subject: [PATCH] =?UTF-8?q?-=20remove=20all=20PHP=20"=5FZMQ=5F"=20related?= =?UTF-8?q?=20dependencies=20from=20Pathfinder.=20PHP=C2=B4s=20native=20So?= =?UTF-8?q?ckets=20work=20as=20replacement=20-=20added=20status=20informat?= =?UTF-8?q?ion=20for=20"WebSocket"=20installations=20to=20`/setup`=20page?= =?UTF-8?q?=20(e.g.=20active=20connections,=20startup=20time)=20-=20remove?= =?UTF-8?q?d=20"ext-zmq"=20as=20required=20PHP=20extension=20-=20removed?= =?UTF-8?q?=20"react/zmq"=20as=20required=20Composer=20package=20-=20remov?= =?UTF-8?q?ed=20"websoftwares/monolog-zmq-handler"=20as=20required=20Compo?= =?UTF-8?q?ser=20package?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/main/controller/accesscontroller.php | 10 +- app/main/controller/api/map.php | 20 +- app/main/controller/api/rest/connection.php | 2 - app/main/controller/api/rest/system.php | 2 - app/main/controller/api/user.php | 2 - app/main/controller/controller.php | 18 +- app/main/controller/setup.php | 175 ++++++------- app/main/lib/Monolog.php | 4 +- app/main/lib/config.php | 19 +- app/main/lib/logging/AbstractLog.php | 101 ++++---- app/main/lib/logging/MapLog.php | 4 +- .../{ZMQHandler.php => SocketHandler.php} | 22 +- app/main/lib/socket.php | 227 ---------------- app/main/lib/socket/AbstractSocket.php | 242 ++++++++++++++++++ app/main/lib/socket/Socket.php | 14 - app/main/lib/socket/SocketInterface.php | 28 ++ app/main/lib/socket/TcpSocket.php | 22 ++ app/main/model/charactermodel.php | 4 +- app/main/model/mapmodel.php | 4 +- composer-dev.json | 5 +- composer.json | 6 +- js/app/setup.js | 4 +- public/templates/view/setup.html | 31 ++- 23 files changed, 481 insertions(+), 485 deletions(-) rename app/main/lib/logging/handler/{ZMQHandler.php => SocketHandler.php} (68%) delete mode 100644 app/main/lib/socket.php create mode 100644 app/main/lib/socket/AbstractSocket.php delete mode 100644 app/main/lib/socket/Socket.php create mode 100644 app/main/lib/socket/SocketInterface.php create mode 100644 app/main/lib/socket/TcpSocket.php diff --git a/app/main/controller/accesscontroller.php b/app/main/controller/accesscontroller.php index 866378e5..72f9772e 100644 --- a/app/main/controller/accesscontroller.php +++ b/app/main/controller/accesscontroller.php @@ -8,8 +8,7 @@ namespace Controller; -use lib\Config; -use lib\Socket; + use Model; class AccessController extends Controller { @@ -20,7 +19,6 @@ class AccessController extends Controller { * @param $params * @return bool * @throws \Exception - * @throws \ZMQSocketException */ function beforeroute(\Base $f3, $params): bool { if($return = parent::beforeroute($f3, $params)){ @@ -80,13 +78,11 @@ class AccessController extends Controller { * broadcast map data to clients * -> send over TCP Socket * @param Model\MapModel $map - * @return int (number of active connections for this map) * @throws \Exception - * @throws \ZMQSocketException */ - protected function broadcastMapData(Model\MapModel $map){ + protected function broadcastMapData(Model\MapModel $map) : void { $mapData = $this->getFormattedMapData($map); - return (int)(new Socket( Config::getSocketUri() ))->sendData('mapUpdate', $mapData); + $this->getF3()->webSocket()->write('mapUpdate', $mapData); } /** diff --git a/app/main/controller/api/map.php b/app/main/controller/api/map.php index c5078061..8a818251 100644 --- a/app/main/controller/api/map.php +++ b/app/main/controller/api/map.php @@ -639,10 +639,8 @@ class Map extends Controller\AccessController { * -> if characters with map access found -> broadcast mapData to them * @param Model\MapModel $map * @throws Exception - * @throws \ZMQSocketException */ protected function broadcastMapAccess(Model\MapModel $map){ - $mapAccess = [ 'id' => $map->_id, 'characterIds' => array_map(function ($data){ @@ -650,7 +648,7 @@ class Map extends Controller\AccessController { }, $map->getCharactersData()) ]; - (new Socket( Config::getSocketUri() ))->sendData('mapAccess', $mapAccess); + $this->getF3()->webSocket()->write('mapAccess', $mapAccess); // map has (probably) active connections that should receive map Data $this->broadcastMapData($map); @@ -659,11 +657,9 @@ class Map extends Controller\AccessController { /** * broadcast map delete information to clients * @param int $mapId - * @return bool|string - * @throws \ZMQSocketException */ - protected function broadcastMapDeleted($mapId){ - return (new Socket( Config::getSocketUri() ))->sendData('mapDeleted', $mapId); + protected function broadcastMapDeleted(int $mapId){ + $this->getF3()->webSocket()->write('mapDeleted', $mapId); } /** @@ -702,7 +698,15 @@ class Map extends Controller\AccessController { // send Access Data to WebSocket Server and get response (status) // if 'OK' -> Socket exists - $return->status = (new Socket( Config::getSocketUri() ))->sendData('mapConnectionAccess', $return->data); + $status = ''; + $f3->webSocket() + ->write('mapConnectionAccess', $return->data) + ->then( + function($payload) use (&$status) { + $status = (string)$payload['load']; + }); + + $return->status = $status; echo json_encode( $return ); } diff --git a/app/main/controller/api/rest/connection.php b/app/main/controller/api/rest/connection.php index 709f0d50..ad2f45e8 100644 --- a/app/main/controller/api/rest/connection.php +++ b/app/main/controller/api/rest/connection.php @@ -17,7 +17,6 @@ class Connection extends AbstractRestController { * if a connection is changed (drag&drop) to another system. -> this function is called for update * @param \Base $f3 * @throws \Exception - * @throws \ZMQSocketException */ public function put(\Base $f3){ $requestData = $this->getRequestData($f3); @@ -71,7 +70,6 @@ class Connection extends AbstractRestController { * @param \Base $f3 * @param $params * @throws \Exception - * @throws \ZMQSocketException */ public function delete(\Base $f3, $params){ $requestData = $this->getRequestData($f3); diff --git a/app/main/controller/api/rest/system.php b/app/main/controller/api/rest/system.php index 46694ec0..dee03b3e 100644 --- a/app/main/controller/api/rest/system.php +++ b/app/main/controller/api/rest/system.php @@ -68,7 +68,6 @@ class System extends AbstractRestController { /** * @param \Base $f3 * @param $params - * @throws \ZMQSocketException * @throws \Exception */ public function delete(\Base $f3, $params){ @@ -128,7 +127,6 @@ class System extends AbstractRestController { * @param Model\SystemModel $system * @param array $systemData * @return Model\SystemModel - * @throws \ZMQSocketException * @throws \Exception */ private function update(Model\SystemModel $system, array $systemData) : Model\SystemModel { diff --git a/app/main/controller/api/user.php b/app/main/controller/api/user.php index 7b44ed80..53b95193 100644 --- a/app/main/controller/api/user.php +++ b/app/main/controller/api/user.php @@ -200,7 +200,6 @@ class User extends Controller\Controller{ /** * log the current user out + clear character system log data * @param \Base $f3 - * @throws \ZMQSocketException */ public function logout(\Base $f3){ $this->logoutCharacter($f3, false, true, true, true); @@ -355,7 +354,6 @@ class User extends Controller\Controller{ * delete current user account from DB * @param \Base $f3 * @throws Exception - * @throws \ZMQSocketException */ public function deleteAccount(\Base $f3){ $data = $f3->get('POST.formData'); diff --git a/app/main/controller/controller.php b/app/main/controller/controller.php index 003fece3..2732ee8c 100644 --- a/app/main/controller/controller.php +++ b/app/main/controller/controller.php @@ -14,7 +14,6 @@ use lib\api\CcpClient; use lib\Config; use lib\Resource; use lib\Monolog; -use lib\Socket; use lib\Util; use Model; use DB; @@ -491,7 +490,7 @@ class Controller { * @param bool $deleteSession * @param bool $deleteLog * @param bool $deleteCookie - * @throws \ZMQSocketException + * @throws \Exception */ protected function logoutCharacter(\Base $f3, bool $all = false, bool $deleteSession = true, bool $deleteLog = true, bool $deleteCookie = false){ $sessionCharacterData = (array)$f3->get(Api\User::SESSION_KEY_CHARACTERS); @@ -517,7 +516,7 @@ class Controller { if($characterIds){ // broadcast logout information to webSocket server - (new Socket( Config::getSocketUri() ))->sendData('characterLogout', $characterIds); + $f3->webSocket()->write('characterLogout', $characterIds); } } @@ -621,7 +620,7 @@ class Controller { } if(empty($return->error)){ - $f3->set($cacheKey, $return, 15); + $f3->set($cacheKey, $return, 60); } } } @@ -974,15 +973,4 @@ class Controller { return Config::getEnvironmentData($key); } - - /** - * health check for ICP socket -> ping request - * @param $ttl - * @param $load - * @throws \ZMQSocketException - */ - static function checkTcpSocket($ttl, $load){ - (new Socket( Config::getSocketUri(), $ttl ))->sendData('healthCheck', $load); - } - } \ No newline at end of file diff --git a/app/main/controller/setup.php b/app/main/controller/setup.php index 6804064b..64931fc8 100644 --- a/app/main/controller/setup.php +++ b/app/main/controller/setup.php @@ -192,72 +192,6 @@ class Setup extends Controller { * @throws \Exception */ public function init(\Base $f3){ - - $uri = Config::getSocketUri(); - var_dump('Socket URI: ' . $uri); - - - $loop = \React\EventLoop\Factory::create(); - $connector = new \React\Socket\Connector($loop, [ - 'timeout' => 7.0 // connection timeout - ]); - - $connector->connect($uri)->then(function(\React\Socket\ConnectionInterface $connection) use ($loop) { - echo "pf: connected" . PHP_EOL; - echo "default_socket_timeout: " . ini_get("default_socket_timeout") . PHP_EOL; - $connection->on('data', function($chunk) use ($connection) { - echo "pf: connection on data:" . PHP_EOL; - echo $chunk. PHP_EOL; - if($chunk == 'DEF'){ - //$connection->end('pf: bye1 '); - } - }); - - $connection->on('end', function(){ - echo "pf: connection on end" . PHP_EOL; - }); - - $connection->on('error', function(\Exception $e){ - echo "pf: connection on error" . PHP_EOL; - echo 'error: ' . $e->getMessage(); - }); - - $connection->on('close', function(){ - echo "pf: connection on ' close" . PHP_EOL; - }); - - //$connection->pipe(new \React\Stream\WritableResourceStream(STDOUT, $loop)); -/* - $connection->write('PF ABC '); - //$connection->end('pf: bye1 '); - //sleep(2); - - $connection->write('PF DEF '); - - $connection->end('pf: bye2 '); -*/ - $loop->addTimer(1.0, function () use ($connection) { - $connection->write('ABC'); - }); - - $loop->addTimer(3.0, function () use ($connection) { - $connection->write('DEF'); - //$connection->end('pf: bye1 '); - }); - - $loop->addTimer(3.0, function ($test) use ($connection) { - //var_dump($test); - $connection->end('end with timeout'); - //$connection->end('pf: bye1 '); - }); - }); -echo 'loop start' . PHP_EOL; - $loop->run(); - echo 'loop end//' . PHP_EOL; - die(); -return; - //sleep(6); - //die('END...'); $params = $f3->get('GET'); // enables automatic column fix @@ -328,7 +262,7 @@ return; // Socket ----------------------------------------------------------------------------------------------------- // WebSocket information - $f3->set('socketInformation', $this->getSocketInformation()); + $f3->set('socketInformation', $this->getSocketInformation($f3)); // Administration --------------------------------------------------------------------------------------------- // Index information @@ -626,23 +560,6 @@ return; 'check' => version_compare( phpversion('redis'), $f3->get('REQUIREMENTS.PHP.REDIS'), '>='), 'tooltip' => 'Redis can replace the default file-caching mechanic. It is much faster!' ], - [ - 'label' => 'ØMQ TCP sockets [optional]' - ], - 'ext_zmq' => [ - 'label' => 'ZeroMQ extension', - 'required' => $f3->get('REQUIREMENTS.PHP.ZMQ'), - 'version' => extension_loaded('zmq') ? phpversion('zmq') : 'missing', - 'check' => version_compare( phpversion('zmq'), $f3->get('REQUIREMENTS.PHP.ZMQ'), '>='), - 'tooltip' => 'ØMQ PHP extension. Required for WebSocket configuration.' - ], - 'lib_zmq' => [ - 'label' => 'ZeroMQ installation', - 'required' => $f3->get('REQUIREMENTS.LIBS.ZMQ'), - 'version' => (class_exists('ZMQ') && defined('ZMQ::LIBZMQ_VER')) ? \ZMQ::LIBZMQ_VER : 'unknown', - 'check' => version_compare( (class_exists('ZMQ') && defined('ZMQ::LIBZMQ_VER')) ? \ZMQ::LIBZMQ_VER : 0, $f3->get('REQUIREMENTS.LIBS.ZMQ'), '>='), - 'tooltip' => 'ØMQ version. Required for WebSocket configuration.' - ], [ 'label' => 'LibEvent library [optional]' ], @@ -808,7 +725,7 @@ return; $getClientInfo = function(\Redis $client, array $conf) : array { $redisInfo = [ 'dsn' => [ - 'label' => 'DNS', + 'label' => 'DSN', 'value' => $conf['host'] . ':' . $conf['port'] ], 'connected' => [ @@ -1572,51 +1489,109 @@ return; /** * get Socket information (TCP (internal)), (WebSocket (clients)) + * @param \Base $f3 * @return array - * @throws \ZMQSocketException */ - protected function getSocketInformation(){ - // $ttl for health check - $ttl = 600; - + protected function getSocketInformation(\Base $f3) : array { + $ttl = 0.6; + $task = 'healthCheck'; $healthCheckToken = microtime(true); - // ping TCP Socket with checkToken - self::checkTcpSocket($ttl, $healthCheckToken); + $statusTcp = [ + 'type' => 'danger', + 'label' => 'INIT CONNECTION…', + 'class' => 'txt-color-danger' + ]; + + $webSocketStatus = [ + 'type' => 'danger', + 'label' => 'INIT CONNECTION…', + 'class' => 'txt-color-danger' + ]; + + $statsTcp = [ + 'startup' => 0, + 'connections' => 0, + 'maxConnections' => 0 + ]; + + // ping TCP Socket with "healthCheck" task + $f3->webSocket(['timeout' => $ttl]) + ->write($task, $healthCheckToken) + ->then( + function($payload) use ($task, $healthCheckToken, &$statusTcp, &$statsTcp) { + if( + $payload['task'] == $task && + $payload['load'] == $healthCheckToken + ){ + $statusTcp['type'] = 'success'; + $statusTcp['label'] = 'PING OK'; + $statusTcp['class'] = 'txt-color-success'; + + // statistics (e.g. current connection count) + if(!empty($payload['stats'])){ + $statsTcp = $payload['stats']; + } + }else{ + $statusTcp['type'] = 'warning'; + $statusTcp['label'] = is_string($payload['load']) ? $payload['load'] : 'INVALID RESPONSE'; + $statusTcp['class'] = 'txt-color-warning'; + } + }, + function($payload) use (&$statusTcp) { + $statusTcp['label'] = $payload['load']; + }); + + $formatTimeInterval = function(int $seconds = 0){ + $dtF = new \DateTime('@0'); + $dtT = new \DateTime("@" . $seconds); + $diff = $dtF->diff($dtT); + + $format = ($d = $diff->format('%d')) ? $d . 'd ' : ''; + $format .= ($h = $diff->format('%h')) ? $h . 'h ' : ''; + $format .= ($i = $diff->format('%i')) ? $i . 'm ' : ''; + $format .= ($s = $diff->format('%s')) ? $s . 's' : ''; + return $format; + }; $socketInformation = [ 'tcpSocket' => [ - 'label' => 'Socket (intern) [TCP]', - 'online' => true, + 'label' => 'Socket (intern) [TCP]', + 'status' => $statusTcp, + 'stats' => $statsTcp, 'data' => [ [ 'label' => 'HOST', - 'value' => Config::getEnvironmentData('SOCKET_HOST'), + 'value' => Config::getEnvironmentData('SOCKET_HOST') ? : '[missing]', 'check' => !empty( Config::getEnvironmentData('SOCKET_HOST') ) ],[ 'label' => 'PORT', - 'value' => Config::getEnvironmentData('SOCKET_PORT'), + 'value' => Config::getEnvironmentData('SOCKET_PORT') ? : '[missing]', 'check' => !empty( Config::getEnvironmentData('SOCKET_PORT') ) ],[ 'label' => 'URI', - 'value' => Config::getSocketUri(), + 'value' => Config::getSocketUri() ? : '[missing]', 'check' => !empty( Config::getSocketUri() ) ],[ - 'label' => 'timeout (ms)', + 'label' => 'timeout (seconds)', 'value' => $ttl, 'check' => !empty( $ttl ) + ],[ + 'label' => 'uptime', + 'value' => $formatTimeInterval($statsTcp['startup']), + 'check' => $statsTcp['startup'] > 0 ] ], 'token' => $healthCheckToken ], 'webSocket' => [ 'label' => 'WebSocket (clients) [HTTP]', - 'online' => false, + 'status' => $webSocketStatus, 'data' => [ [ 'label' => 'URI', 'value' => '', - 'check' => false + 'check' => null // undefined ] ] ] diff --git a/app/main/lib/Monolog.php b/app/main/lib/Monolog.php index b3fc008c..57e0b026 100644 --- a/app/main/lib/Monolog.php +++ b/app/main/lib/Monolog.php @@ -34,11 +34,11 @@ class Monolog extends \Prefab { const HANDLER = [ 'stream' => 'Monolog\Handler\StreamHandler', 'mail' => 'Monolog\Handler\SwiftMailerHandler', + 'socket' => 'lib\logging\handler\SocketHandler', 'slackMap' => 'lib\logging\handler\SlackMapWebhookHandler', 'slackRally' => 'lib\logging\handler\SlackRallyWebhookHandler', 'discordMap' => 'lib\logging\handler\DiscordMapWebhookHandler', - 'discordRally' => 'lib\logging\handler\DiscordRallyWebhookHandler', - 'zmq' => 'lib\logging\handler\ZMQHandler' + 'discordRally' => 'lib\logging\handler\DiscordRallyWebhookHandler' ]; const PROCESSOR = [ diff --git a/app/main/lib/config.php b/app/main/lib/config.php index 42a44be9..165c7451 100644 --- a/app/main/lib/config.php +++ b/app/main/lib/config.php @@ -12,6 +12,8 @@ namespace lib; use lib\api\CcpClient; use lib\api\GitHubClient; use lib\api\SsoClient; +use lib\socket\SocketInterface; +use lib\socket\TcpSocket; class Config extends \Prefab { @@ -100,6 +102,11 @@ class Config extends \Prefab { $f3->set(SsoClient::CLIENT_NAME, SsoClient::instance()); $f3->set(CcpClient::CLIENT_NAME, CcpClient::instance()); $f3->set(GitHubClient::CLIENT_NAME, GitHubClient::instance()); + + // Socket connectors + $f3->set(TcpSocket::SOCKET_NAME, function(array $options = ['timeout' => 1]) : SocketInterface { + return new TcpSocket(self::getSocketUri(), $options); + }); } /** @@ -336,16 +343,6 @@ class Config extends \Prefab { return $message; } - /** - * check whether this installation fulfills all requirements - * -> check for ZMQ PHP extension and installed ZQM version - * -> this does NOT check versions! -> those can be verified on /setup page - * @return bool - */ - static function checkSocketRequirements(): bool { - return extension_loaded('zmq') && class_exists('ZMQ'); - } - /** * use this function to "validate" the socket connection. * The result will be CACHED for a few seconds! @@ -359,7 +356,7 @@ class Config extends \Prefab { $f3 = \Base::instance(); if( !$f3->exists(self::CACHE_KEY_SOCKET_VALID, $valid) ){ - if(self::checkSocketRequirements() && ($socketUrl = self::getSocketUri()) ){ + if( $socketUrl = self::getSocketUri() ){ // get socket URI parts -> not elegant... $domain = parse_url( $socketUrl, PHP_URL_SCHEME) . '://' . parse_url( $socketUrl, PHP_URL_HOST); $port = parse_url( $socketUrl, PHP_URL_PORT); diff --git a/app/main/lib/logging/AbstractLog.php b/app/main/lib/logging/AbstractLog.php index d485fe5c..f44010b8 100644 --- a/app/main/lib/logging/AbstractLog.php +++ b/app/main/lib/logging/AbstractLog.php @@ -176,7 +176,7 @@ abstract class AbstractLog implements LogInterface { * @param array $data * @return LogInterface */ - public function setData(array $data): LogInterface{ + public function setData(array $data) : LogInterface{ $this->data = $data; return $this; } @@ -185,7 +185,7 @@ abstract class AbstractLog implements LogInterface { * @param array $data * @return LogInterface */ - public function setTempData(array $data): LogInterface{ + public function setTempData(array $data) : LogInterface{ $this->tmpData = $data; return $this; } @@ -198,7 +198,7 @@ abstract class AbstractLog implements LogInterface { * @param \stdClass|null $handlerParams * @return LogInterface */ - public function addHandler(string $handlerKey, string $formatterKey = null, \stdClass $handlerParams = null): LogInterface { + public function addHandler(string $handlerKey, string $formatterKey = null, \stdClass $handlerParams = null) : LogInterface { if(!$this->hasHandlerKey($handlerKey)){ $this->handlerConfig[$handlerKey] = $formatterKey; // add more configuration params for the new handler @@ -244,10 +244,10 @@ abstract class AbstractLog implements LogInterface { switch($handlerKey){ case 'stream': $params = $this->getHandlerParamsStream(); break; - case 'zmq': $params = $this->getHandlerParamsZMQ(); - break; case 'mail': $params = $this->getHandlerParamsMail(); break; + case 'socket': $params = $this->getHandlerParamsSocket(); + break; case 'slackMap': case 'slackRally': case 'discordMap': @@ -267,69 +267,69 @@ abstract class AbstractLog implements LogInterface { /** * @return array */ - public function getHandlerParamsConfig(): array { + public function getHandlerParamsConfig() : array { return $this->handlerParamsConfig; } /** * @return array */ - public function getProcessorConfig(): array { + public function getProcessorConfig() : array { return $this->processorConfig; } /** * @return string */ - public function getMessage(): string{ + public function getMessage() : string{ return $this->message; } /** * @return string */ - public function getAction(): string{ + public function getAction() : string{ return $this->action; } /** * @return string */ - public function getChannelType(): string{ + public function getChannelType() : string{ return $this->channelType; } /** * @return string */ - public function getChannelName(): string{ + public function getChannelName() : string{ return $this->getChannelType(); } /** * @return string */ - public function getLevel(): string{ + public function getLevel() : string{ return $this->level; } /** * @return string */ - public function getTag(): string{ + public function getTag() : string{ return $this->tag; } /** * @return array */ - public function getData(): array{ + public function getData() : array{ return $this->data; } /** * @return array */ - public function getContext(): array{ + public function getContext() : array{ $context = [ 'data' => $this->getData(), 'tag' => $this->getTag() @@ -344,14 +344,14 @@ abstract class AbstractLog implements LogInterface { /** * @return array */ - protected function getTempData(): array { + protected function getTempData() : array { return $this->tmpData; } /** * @return array */ - public function getHandlerGroups(): array{ + public function getHandlerGroups() : array{ return $this->handlerGroups; } @@ -359,7 +359,7 @@ abstract class AbstractLog implements LogInterface { * get unique hash for this kind of logs (channel) and same $handlerGroups * @return string */ - public function getGroupHash(): string { + public function getGroupHash() : string { $groupName = $this->getChannelName(); if($this->isGrouped()){ $groupName .= '_' . implode('_', $this->getHandlerGroups()); @@ -372,7 +372,7 @@ abstract class AbstractLog implements LogInterface { * @param string $handlerKey * @return bool */ - public function hasHandlerKey(string $handlerKey): bool{ + public function hasHandlerKey(string $handlerKey) : bool{ return array_key_exists($handlerKey, $this->handlerConfig); } @@ -380,21 +380,21 @@ abstract class AbstractLog implements LogInterface { * @param string $handlerKey * @return bool */ - public function hasHandlerGroupKey(string $handlerKey): bool{ + public function hasHandlerGroupKey(string $handlerKey) : bool{ return in_array($handlerKey, $this->getHandlerGroups()); } /** * @return bool */ - public function hasBuffer(): bool{ + public function hasBuffer() : bool{ return $this->buffer; } /** * @return bool */ - public function isGrouped(): bool{ + public function isGrouped() : bool{ return !empty($this->getHandlerGroups()); } @@ -416,7 +416,7 @@ abstract class AbstractLog implements LogInterface { } // Handler parameters for Monolog\Handler\AbstractHandler --------------------------------------------------------- - protected function getHandlerParamsStream(): array{ + protected function getHandlerParamsStream() : array{ $params = []; if( !empty($conf = $this->handlerParamsConfig['stream']) ){ $params[] = $conf->stream; @@ -428,40 +428,11 @@ abstract class AbstractLog implements LogInterface { return $params; } - /** - * get __construct() parameters for ZMQHandler() call - * @return array - * @throws \ZMQSocketException - */ - protected function getHandlerParamsZMQ(): array { - $params = []; - if( !empty($conf = $this->handlerParamsConfig['zmq']) ){ - // meta data (required by receiver socket) - $meta = [ - 'logType' => 'mapLog', - 'stream'=> $conf->streamConf->stream - ]; - - $context = new \ZMQContext(); - $pusher = $context->getSocket(\ZMQ::SOCKET_PUSH); - $pusher->connect($conf->uri); - - $params[] = $pusher; - $params[] = \ZMQ::MODE_DONTWAIT; - $params[] = false; // multipart - $params[] = Logger::toMonologLevel($this->getLevel()); // min level that is handled - $params[] = true; // bubble - $params[] = $meta; - } - - return $params; - } - /** * get __construct() parameters for SwiftMailerHandler() call * @return array */ - protected function getHandlerParamsMail(): array{ + protected function getHandlerParamsMail() : array { $params = []; if( !empty($conf = $this->handlerParamsConfig['mail']) ){ $transport = (new \Swift_SmtpTransport()) @@ -520,12 +491,34 @@ abstract class AbstractLog implements LogInterface { return $params; } + /** + * get __construct() parameters for SocketHandler() call + * @return array + */ + protected function getHandlerParamsSocket() : array { + $params = []; + if( !empty($conf = $this->handlerParamsConfig['socket']) ){ + // meta data (required by receiver socket) + $meta = [ + 'logType' => 'mapLog', + 'stream'=> $conf->streamConf->stream + ]; + + $params[] = $conf->dsn; + $params[] = Logger::toMonologLevel($this->getLevel()); + $params[] = true; + $params[] = $meta; + } + + return $params; + } + /** * get __construct() params for SlackWebhookHandler() call * @param string $handlerKey * @return array */ - protected function getHandlerParamsSlack(string $handlerKey): array { + protected function getHandlerParamsSlack(string $handlerKey) : array { $params = []; if( !empty($conf = $this->handlerParamsConfig[$handlerKey]) ){ $params[] = $conf->slackWebHookURL; diff --git a/app/main/lib/logging/MapLog.php b/app/main/lib/logging/MapLog.php index 37d678a4..29b368b0 100644 --- a/app/main/lib/logging/MapLog.php +++ b/app/main/lib/logging/MapLog.php @@ -19,8 +19,8 @@ class MapLog extends AbstractCharacterLog{ * @var array */ protected $handlerConfig = [ - //'stream' => 'json', - //'zmq' => 'json', + //'stream' => 'json', + //'socket' => 'json', //'slackMap' => 'json' ]; diff --git a/app/main/lib/logging/handler/ZMQHandler.php b/app/main/lib/logging/handler/SocketHandler.php similarity index 68% rename from app/main/lib/logging/handler/ZMQHandler.php rename to app/main/lib/logging/handler/SocketHandler.php index df73d007..24d2b9fb 100644 --- a/app/main/lib/logging/handler/ZMQHandler.php +++ b/app/main/lib/logging/handler/SocketHandler.php @@ -1,9 +1,9 @@ metaData = $metaData; - parent::__construct($zmqSocket, $zmqMode, $multipart, $level, $bubble); + parent::__construct($connectionString, $level, $bubble); } /** @@ -38,7 +30,6 @@ class ZMQHandler extends \Websoftwares\Monolog\Handler\ZMQHandler { * -> change data structure after processor() calls and before formatter() calls * @param array $record * @return bool - * @throws \Exception */ public function handle(array $record){ if (!$this->isHandling($record)) { @@ -61,5 +52,4 @@ class ZMQHandler extends \Websoftwares\Monolog\Handler\ZMQHandler { return false === $this->bubble; } - } \ No newline at end of file diff --git a/app/main/lib/socket.php b/app/main/lib/socket.php deleted file mode 100644 index 0c8e1c34..00000000 --- a/app/main/lib/socket.php +++ /dev/null @@ -1,227 +0,0 @@ - The total timeout for a request is ($tll * $maxRetries) - * @var int - */ - protected $ttl = (self::DEFAULT_TTL_MAX / self::DEFAULT_RETRY_MAX); - - /** - * max retry count for message send - * @var int - */ - protected $maxRetries = self::DEFAULT_RETRY_MAX; - - public function __construct($uri, $ttl = self::DEFAULT_TTL_MAX, $maxRetries = self::DEFAULT_RETRY_MAX){ - $this->setTtl($ttl, $maxRetries); - $this->setSocketUri($uri); - } - - /** - * @param mixed $socketUri - */ - public function setSocketUri($socketUri){ - $this->socketUri = $socketUri; - } - - /** - * @param int $ttl - * @param int $maxRetries - */ - public function setTtl(int $ttl, int $maxRetries){ - if( - $ttl > 0 && - $maxRetries > 0 - ){ - $this->maxRetries = $maxRetries; - $this->ttl = round($ttl / $maxRetries); - } - } - - /** - * init new socket - */ - public function initSocket(){ - if(Config::checkSocketRequirements()){ - $context = new \ZMQContext(); - $this->socket = $context->getSocket(\ZMQ::SOCKET_PUSH); - } - } - - /** - * @param string $task - * @param string $load - * @return bool|string - * @throws \ZMQSocketException - */ - public function sendData(string $task, $load = ''){ - $response = false; - - $this->initSocket(); - - if( - !$this->socket || - !$this->socketUri - ){ - // Socket not active (e.g. URI missing) - return $response; - } - - // add task, and wrap data - $send = [ - 'task' => $task, - 'load' => $load - ]; - - $this->socket->connect($this->socketUri); - //$this->socket->send(json_encode($send), \ZMQ::MODE_DONTWAIT); - - $this->socket->send(json_encode($send)); - // $this->socket->disconnect($this->socketUri); - - $response = 'OK'; - - return $response; - } - - /** - * send data to socket and listen for response - * -> "Request" => "Response" setup - * @param $task - * @param $load - * @return bool|string - */ - /* - public function sendData($task, $load = ''){ - $response = false; - - $this->initSocket(); - - if( !$this->socket ){ - // Socket not active (e.g. URI missing) - return $response; - } - - // add task, and wrap data - $send = [ - 'task' => $task, - 'load' => $load - ]; - - $retriesLeft = $this->maxRetries; - // try sending data - while($retriesLeft){ - // Get list of connected endpoints - $endpoints = $this->socket->getEndpoints(); - if (in_array($this->socketUri, $endpoints['connect'])) { - // disconnect e.g. there was no proper response yet - - $this->socket->disconnect($this->socketUri); - // try new socket connection - $this->initSocket(); - } - - $this->socket->connect($this->socketUri); - $this->socket->send(json_encode($send)); - - $readable = []; - $writable = []; - - $poller = new \ZMQPoll(); - $poller->add($this->socket, \ZMQ::POLL_IN); - - $startTime = microtime(true); - // infinite loop until we get a proper answer - while(true){ - // Amount of events retrieved - $events = 0; - - try{ - // Poll until there is something to do - $events = $poller->poll($readable, $writable, $this->ttl); - $errors = $poller->getLastErrors(); - - if(count($errors) > 0){ - // log errors - foreach($errors as $error){ - LogController::getLogger('SOCKET_ERROR')->write(sprintf(self::ERROR_POLLING, $error)); - } - // break infinite loop - break; - } - }catch(\ZMQPollException $e){ - LogController::getLogger('SOCKET_ERROR')->write(sprintf(self::ERROR_POLLING_FAILED, $e->getMessage() )); - } - - - if($events > 0){ - try{ - $response = $this->socket->recv(); - // everything OK -> stop infinite loop AND retry loop! - break 2; - }catch(\ZMQException $e){ - LogController::getLogger('SOCKET_ERROR')->write(sprintf(self::ERROR_RECV_FAILED, $e->getMessage() )); - } - } - - if((microtime(true) - $startTime) > (self::DEFAULT_RESPONSE_MAX / 1000)){ - // max time for response exceeded - LogController::getLogger('SOCKET_ERROR')->write(sprintf(self::ERROR_SEND_FAILED, self::DEFAULT_RESPONSE_MAX)); - break; - } - - // start inf loop again, no proper answer :( - } - - if(--$retriesLeft <= 0){ - // retry limit exceeded - LogController::getLogger('SOCKET_ERROR')->write(sprintf(self::ERROR_OFFLINE, $this->socketUri, $this->maxRetries, $this->ttl)); - break; - } - } - - $this->socket->disconnect($this->socketUri); - - return $response; - }*/ - - -} \ No newline at end of file diff --git a/app/main/lib/socket/AbstractSocket.php b/app/main/lib/socket/AbstractSocket.php new file mode 100644 index 00000000..d7053a8a --- /dev/null +++ b/app/main/lib/socket/AbstractSocket.php @@ -0,0 +1,242 @@ +uri = $uri; + $this->options = $options; + } + + /** + * @return Socket\ConnectorInterface + */ + abstract protected function getConnector() : Socket\ConnectorInterface; + + /** + * @return EventLoop\LoopInterface + */ + public function getLoop(): EventLoop\LoopInterface { + if(!($this->loop instanceof EventLoop\LoopInterface)){ + $this->loop = EventLoop\Factory::create(); + } + + return $this->loop; + } + + /** + * connect to socket + * @return Promise\PromiseInterface + */ + protected function connect() : Promise\PromiseInterface { + $deferred = new Promise\Deferred(); + + $this->getConnector() + ->connect($this->uri) + ->then($this->initConnection()) + ->then( + function(Socket\ConnectionInterface $connection) use ($deferred) { + $deferred->resolve($connection); + }, + function(\Exception $e) use ($deferred) { + $deferred->reject($e); + }); + + return $deferred->promise(); + } + + /** + * @param string $task + * @param null $load + * @return Promise\PromiseInterface + */ + public function write(string $task, $load = null) : Promise\PromiseInterface { + $deferred = new Promise\Deferred(); + $payload = $this->newPayload($task, $load); + + $this->connect() + ->then( + function(Socket\ConnectionInterface $connection) use ($payload, $deferred) { + return (new Promise\FulfilledPromise($connection)) + ->then($this->initWrite($payload)) + ->then($this->initRead()) + ->then($this->initClose($connection)) + ->then( + function($payload) use ($deferred) { + // we got valid data from socketServer -> check if $payload contains an error + if(is_array($payload) && $payload['task'] == 'error'){ + // ... wrap error payload in a rejectedPromise + $deferred->reject( + new Promise\RejectedPromise( + new \Exception($payload['load']) + ) + ); + }else{ + // good response + $deferred->resolve($payload); + } + }, + function(\Exception $e) use ($deferred) { + $deferred->reject($e); + }); + }, + function(\Exception $e) use ($deferred) { + // connection error + $deferred->reject($e); + }); + + $this->getLoop()->run(); + + return $deferred->promise() + ->otherwise( + // final exception handler for rejected promises -> convert to payload array + // -> No socket related Exceptions should be thrown down the chain + function(\Exception $e){ + return new Promise\RejectedPromise( + $this->newPayload('error', $e->getMessage()) + ); + }); + } + + /** + * set connection events + * @return callable + */ + protected function initConnection() : callable { + return function(Socket\ConnectionInterface $connection) : Promise\PromiseInterface { + $deferred = new Promise\Deferred(); + + /* connection event callbacks should be added here (if needed) + $connection->on('end', function(){ + echo "pf: connection on end" . PHP_EOL; + }); + + $connection->on('error', function(\Exception $e) { + echo "pf: connection on error: " . $e->getMessage() . PHP_EOL; + }); + + $connection->on('close', function(){ + echo "pf: connection on close" . PHP_EOL; + }); + */ + + $deferred->resolve($connection); + //$deferred->reject(new \RuntimeException('lala')); + + return $deferred->promise(); + }; + } + + /** + * write payload to connection + * @param $payload + * @return callable + */ + protected function initWrite($payload) : callable { + return function(Socket\ConnectionInterface $connection) use ($payload) : Promise\PromiseInterface { + $deferred = new Promise\Deferred(); + + $streamEncoded = new NDJson\Encoder($connection); + + $streamEncoded->on('error', function(\Exception $e) use ($deferred) { + $deferred->reject($e); + }); + + if($streamEncoded->write($payload)){ + $deferred->resolve($connection); + } + + return $deferred->promise(); + }; + } + + /** + * read response data from connection + * @return callable + */ + protected function initRead() : callable { + return function(Socket\ConnectionInterface $connection) : Promise\PromiseInterface { + // new empty stream for processing JSON + $stream = new Stream\ThroughStream(); + + $streamDecoded = new NDJson\Decoder($stream, true); + + // promise get resolved on first emit('data') + $promise = Promise\Stream\first($streamDecoded); + + // register on('data') for main input stream + $connection->once('data', function ($chunk) use ($stream) { + // send current data chunk to processing stream -> resolves promise + $stream->emit('data', [$chunk]); + }); + + return $promise; + }; + } + + /** + * close connection + * @param Socket\ConnectionInterface $connection + * @return callable + */ + protected function initClose(Socket\ConnectionInterface $connection) : callable { + return function($payload) use ($connection) : Promise\PromiseInterface { + $deferred = new Promise\Deferred(); + $deferred->resolve($payload); + + //$connection->close(); + return $deferred->promise(); + }; + } + /** + * get new payload + * @param string $task + * @param null $load + * @return array + */ + protected function newPayload(string $task, $load = null) : array { + $payload = [ + 'task' => $task, + 'load' => $load + ]; + + return $payload; + } + +} \ No newline at end of file diff --git a/app/main/lib/socket/Socket.php b/app/main/lib/socket/Socket.php deleted file mode 100644 index e5f52f46..00000000 --- a/app/main/lib/socket/Socket.php +++ /dev/null @@ -1,14 +0,0 @@ -getLoop(), $this->options); + } + +} \ No newline at end of file diff --git a/app/main/model/charactermodel.php b/app/main/model/charactermodel.php index 8b7810d5..98aa96c9 100644 --- a/app/main/model/charactermodel.php +++ b/app/main/model/charactermodel.php @@ -960,11 +960,11 @@ class CharacterModel extends BasicModel { /** * broadcast characterData - * @throws \ZMQSocketException */ public function broadcastCharacterUpdate(){ $characterData = $this->getData(true); - (new Socket( Config::getSocketUri() ))->sendData('characterUpdate', $characterData); + + self::getF3()->webSocket()->write('characterUpdate', $characterData); } /** diff --git a/app/main/model/mapmodel.php b/app/main/model/mapmodel.php index 6261841e..0d682688 100644 --- a/app/main/model/mapmodel.php +++ b/app/main/model/mapmodel.php @@ -982,7 +982,7 @@ class MapModel extends AbstractMapTrackingModel { if($this->isHistoryLogEnabled()){ // check socket config if(Config::validSocketConnect()){ - $log->addHandler('zmq', 'json', $this->getSocketConfig()); + $log->addHandler('socket', 'json', $this->getSocketConfig()); }else{ // update log file local (slow) $log->addHandler('stream', 'json', $this->getStreamConfig()); @@ -1155,7 +1155,7 @@ class MapModel extends AbstractMapTrackingModel { */ public function getSocketConfig(): \stdClass{ $config = (object) []; - $config->uri = Config::getSocketUri(); + $config->dsn = Config::getSocketUri(); $config->streamConf = $this->getStreamConfig(true); return $config; } diff --git a/composer-dev.json b/composer-dev.json index 0393c708..8a3b377f 100644 --- a/composer-dev.json +++ b/composer-dev.json @@ -27,10 +27,7 @@ "ext-json": "*", "ext-mbstring": "*", "ext-ctype": "*", - "ext-zmq": ">=1.1.3", - "react/zmq": "0.3.*", "monolog/monolog": "1.*", - "websoftwares/monolog-zmq-handler": "0.2.*", "swiftmailer/swiftmailer": "^6.0", "league/html-to-markdown": "4.8.*", "cache/redis-adapter": "1.0.*", @@ -39,6 +36,8 @@ "cache/void-adapter": "1.0.*", "cache/namespaced-cache": "1.0.*", "react/socket": "1.2.*", + "react/promise-stream": "1.1.*", + "clue/ndjson-react": "1.0.*", "exodus4d/pathfinder_esi": "dev-develop as 0.0.x-dev" }, "suggest": { diff --git a/composer.json b/composer.json index e6169d12..9f5264b1 100644 --- a/composer.json +++ b/composer.json @@ -27,10 +27,7 @@ "ext-json": "*", "ext-mbstring": "*", "ext-ctype": "*", - "ext-zmq": ">=1.1.3", - "react/zmq": "0.3.*", "monolog/monolog": "1.*", - "websoftwares/monolog-zmq-handler": "0.2.*", "swiftmailer/swiftmailer": "^6.0", "league/html-to-markdown": "4.8.*", "cache/redis-adapter": "1.0.*", @@ -38,6 +35,9 @@ "cache/array-adapter": "1.0.*", "cache/void-adapter": "1.0.*", "cache/namespaced-cache": "1.0.*", + "react/socket": "1.2.*", + "react/promise-stream": "1.1.*", + "clue/ndjson-react": "1.0.*", "exodus4d/pathfinder_esi": "dev-master#v1.3.0" }, "suggest": { diff --git a/js/app/setup.js b/js/app/setup.js index b623b146..faf5f650 100644 --- a/js/app/setup.js +++ b/js/app/setup.js @@ -201,7 +201,7 @@ define([ }, status: { type: 'warning', - label: 'CONNECTING...', + label: 'CONNECTING…', class: 'txt-color-warning' } }); @@ -213,7 +213,7 @@ define([ updateWebSocketPanel({ status: { type: 'warning', - label: 'OPEN wait for response...', + label: 'OPEN wait for response…', class: 'txt-color-warning' } }); diff --git a/public/templates/view/setup.html b/public/templates/view/setup.html index e4f9ae6c..ddbe79f7 100644 --- a/public/templates/view/setup.html +++ b/public/templates/view/setup.html @@ -997,7 +997,14 @@
-

{{ @socketData.label }}

+

{{ @socketData.label }} + + + {{ @socketData.stats.connections }} con    + {{ @socketData.stats.maxConnections }} con max + + +

@@ -1007,7 +1014,14 @@ {{ @entry.label }} - {{@entry.value | raw}} + + + {{@entry.value | raw}} + + + {{@entry.value | raw}} + + @@ -1027,14 +1041,9 @@
@@ -1062,7 +1071,7 @@

  Index data