From cbca53ad735becb93d1919eccf4d516e5011db63 Mon Sep 17 00:00:00 2001 From: Mark Friedrich Date: Mon, 1 Jul 2019 19:59:52 +0200 Subject: [PATCH] - improved logging - fixed multiple problem with map sync - added some "garbage collection" for unsubscribed connection data - updated README.md --- README.md | 45 +- app/Component/AbstractMessageComponent.php | 269 +++++++++ .../Formatter/SubscriptionFormatter.php | 2 +- .../Handler/LogFileHandler.php | 2 +- app/{Main => Component}/MapUpdate.php | 535 ++++++++++++------ app/Data/Payload.php | 94 +++ app/Log/ShellColors.php | 90 +++ app/Log/Store.php | 220 +++++++ app/Socket/AbstractSocket.php | 67 +++ app/Socket/TcpSocket.php | 135 +++-- app/WebSockets.php | 33 +- cmd.php | 76 ++- 12 files changed, 1285 insertions(+), 283 deletions(-) create mode 100644 app/Component/AbstractMessageComponent.php rename app/{Main => Component}/Formatter/SubscriptionFormatter.php (95%) rename app/{Main => Component}/Handler/LogFileHandler.php (97%) rename app/{Main => Component}/MapUpdate.php (56%) create mode 100644 app/Data/Payload.php create mode 100644 app/Log/ShellColors.php create mode 100644 app/Log/Store.php create mode 100644 app/Socket/AbstractSocket.php diff --git a/README.md b/README.md index 98387c7..045d3c7 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ ## WebSocket server for [Pathfinder](https://github.com/exodus4d/pathfinder) ### Requirements +- _PHP_ **(≥ v7.1)** - A working instance of *[Pathfinder](https://github.com/exodus4d/pathfinder)* **(≥ v1.2.0)** - [_Composer_](https://getcomposer.org/download/) to install packages for the WebSocket server @@ -19,20 +20,49 @@ **Clients (WebBrowser) listen for connections** - Host: `0.0.0.0.` (=> any client can connect) - Port: `8020` -- URI: `127.0.0.1:8020` (Your WebServer (e.g. Nginx) should pass all WebSocket connections to this source) +- ↪ URI: `127.0.0.1:8020` -**TCP TcpSocket connection (Internal use for WebServer ⇄ WebSocket communication)** -- Host: `127.0.0.1` (=> Assumed WebServer and WebSocket Server running on the same machine) +(=> Your WebServer (e.g. Nginx) should proxy all WebSocket connections to this source) + +**TCP TcpSocket connection (Internal use for WebServer ⇄ WebSocket server communication)** +- Host: `127.0.0.1` (=> Assumed WebServer and WebSocket server running on the same machine) - Port: `5555` -- URI: `tcp://127.0.0.1:5555` +- ↪ URI: `tcp://127.0.0.1:5555` + +(=> Where _Pathfinder_ reaches the WebSocket server. This must match `SOCKET_HOST`, `SOCKET_PORT` options in `environment.ini`) -#### Custom [Optional] +#### Start parameters [Optional] The default configuration should be fine for most installations. You can change/overwrite the default **Host** and **Port** configuration by adding additional CLI parameters when starting the WebSocket server: -`$ php cmd.php --pf_listen_host [CLIENTS_HOST] --pf_listen_port [CLIENTS_PORT] --pf_host [TCP_HOST] --pf_port [TCP_PORT]` +`$ php cmd.php --wsHost [CLIENTS_HOST] --wsPort [CLIENTS_PORT] --tcpHost [TCP_HOST] --tcpPort [TCP_PORT] --debug 0` + For example: If you want to change the the WebSocket port and increase debug output: + + `$ php cmd.php --wsPort 8030 --debug 3` + +##### --debug (default `--debug 2`) + +Allows you to set log output level from `0` (silent) - errors are not logged, to `3` (debug) for detailed logging. + +![alt text](https://i.imgur.com/KfNF4lk.png) + +### WebSocket UI + +There is a WebSocket section on _Pathinders_ `/setup` page. After the WebSocket server is started, you should check it if everything works. +You see the most recent WebSocket log entries, the current connection state, the current number of active connections and all maps that have subscriptions + +![alt text](https://i.imgur.com/dDUrnx2.png) + +Log entry view. Depending on the `--debug` parameter, the most recent (max 50) entries will be shown: + +![alt text](https://i.imgur.com/LIn9aNm.png) + +Subscriptions for each map: + +![alt text](https://i.imgur.com/fANYwho.gif) + ### Unix Service (systemd) #### New Service @@ -97,4 +127,5 @@ ExecStart = /usr/bin/systemctl try-restart websocket.pathfinder.service ``` ### Info -- [*Ratchet*](http://socketo.me/) - "WebSockets for PHP" +- [*Ratchet*](http://socketo.me) - "WebSockets for PHP" +- [*ReactPHP*](https://reactphp.org) - "Event-driven, non-blocking I/O with PHP" diff --git a/app/Component/AbstractMessageComponent.php b/app/Component/AbstractMessageComponent.php new file mode 100644 index 0000000..299fb4b --- /dev/null +++ b/app/Component/AbstractMessageComponent.php @@ -0,0 +1,269 @@ + should be overwritten in child instances + * -> is used as "log store" name + */ + const COMPONENT_NAME = 'default'; + + /** + * log message server start + */ + const LOG_TEXT_SERVER_START = 'start WebSocket server…'; + + /** + * store for logs + * @var Store + */ + protected $logStore; + + /** + * stores all active connections + * -> regardless of its subscription state + * [ + * '$conn1->resourceId' => [ + * 'connection' => $conn1, + * 'data' => null + * ], + * '$conn2->resourceId' => [ + * 'connection' => $conn2, + * 'data' => null + * ] + * ] + * @var array + */ + private $connections; + + /** + * max count of concurrent open connections + * @var int + */ + private $maxConnections = 0; + + /** + * AbstractMessageComponent constructor. + * @param Store $store + */ + public function __construct(Store $store){ + $this->connections = []; + $this->logStore = $store; + + $this->log(['debug', 'info'], null, 'START', static::LOG_TEXT_SERVER_START); + } + + // Connection callbacks from MessageComponentInterface ============================================================ + + /** + * new client connection onOpen + * @param ConnectionInterface $conn + */ + public function onOpen(ConnectionInterface $conn){ + $this->log(['debug'], $conn, __FUNCTION__, 'open connection'); + + $this->addConnection($conn); + } + + /** + * client connection onClose + * @param ConnectionInterface $conn + */ + public function onClose(ConnectionInterface $conn){ + $this->log(['debug'], $conn, __FUNCTION__, 'close connection'); + + $this->removeConnection($conn); + } + + /** + * client connection onError + * @param ConnectionInterface $conn + * @param \Exception $e + */ + public function onError(ConnectionInterface $conn, \Exception $e){ + $this->log(['debug', 'error'], $conn, __FUNCTION__, $e->getMessage()); + } + + /** + * new message received from client connection + * @param ConnectionInterface $conn + * @param string $msg + */ + public function onMessage(ConnectionInterface $conn, $msg){ + // parse message into payload object + $payload = $this->getPayloadFromMessage($msg); + + if($payload){ + $this->dispatchWebSocketPayload($conn, $payload); + } + } + + // Connection handling ============================================================================================ + + /** + * add connection + * @param ConnectionInterface $conn + */ + private function addConnection(ConnectionInterface $conn) : void { + $this->connections[$conn->resourceId] = [ + 'connection' => $conn, + ]; + + $this->maxConnections = max(count($this->connections), $this->maxConnections); + } + + /** + * remove connection + * @param ConnectionInterface $conn + */ + private function removeConnection(ConnectionInterface $conn) : void { + if($this->hasConnection($conn)){ + unset($this->connections[$conn->resourceId]); + } + } + + /** + * @param ConnectionInterface $conn + * @return bool + */ + protected function hasConnection(ConnectionInterface $conn) : bool { + return isset($this->connections[$conn->resourceId]); + } + + /** + * @param int $resourceId + * @return bool + */ + protected function hasConnectionId(int $resourceId) : bool { + return isset($this->connections[$resourceId]); + } + + /** + * @param int $resourceId + * @return ConnectionInterface|null + */ + protected function getConnection(int $resourceId) : ?ConnectionInterface { + return $this->hasConnectionId($resourceId) ? $this->connections[$resourceId]['connection'] : null; + } + + /** + * update meta data for $conn + * @param ConnectionInterface $conn + */ + protected function updateConnection(ConnectionInterface $conn){ + if($this->hasConnection($conn)){ + $meta = [ + 'mTimeSend' => microtime(true) + ]; + $this->connections[$conn->resourceId]['data'] = array_merge($this->getConnectionData($conn), $meta); + } + } + + /** + * get meta data from $conn + * @param ConnectionInterface $conn + * @return array + */ + protected function getConnectionData(ConnectionInterface $conn) : array { + $meta = []; + if($this->hasConnection($conn)){ + $meta = (array)$this->connections[$conn->resourceId]['data']; + } + return $meta; + } + + /** + * wrapper for ConnectionInterface->send() + * -> this stores some meta data to the $conn + * @param ConnectionInterface $conn + * @param $data + */ + protected function send(ConnectionInterface $conn, $data){ + $conn->send($data); + $this->updateConnection($conn); + } + + /** + * @param ConnectionInterface $conn + * @param Payload $payload + */ + abstract protected function dispatchWebSocketPayload(ConnectionInterface $conn, Payload $payload) : void; + + /** + * get Payload class from client message + * @param mixed $msg + * @return Payload|null + */ + protected function getPayloadFromMessage($msg) : ?Payload { + $payload = null; + $msg = (array)json_decode($msg, true); + + if(isset($msg['task'], $msg['load'])){ + $payload = $this->newPayload((string)$msg['task'], $msg['load']); + } + + return $payload; + } + + /** + * @param string $task + * @param null $load + * @param array|null $characterIds + * @return Payload|null + */ + protected function newPayload(string $task, $load = null, ?array $characterIds = null) : ?Payload { + $payload = null; + try{ + $payload = new Payload($task, $load, $characterIds); + }catch(\Exception $e){ + $this->log(['debug', 'error'], null, __FUNCTION__, $e->getMessage()); + } + + return $payload; + } + + /** + * get WebSocket stats data + * @return array + */ + public function getSocketStats() : array { + return [ + 'connections' => count($this->connections), + 'maxConnections' => $this->maxConnections, + 'logs' => array_reverse($this->logStore->getStore()) + ]; + } + + /** + * @param $logTypes + * @param ConnectionInterface|null $connection + * @param string $action + * @param string $message + */ + protected function log($logTypes, ?ConnectionInterface $connection, string $action, string $message = '') : void { + if($this->logStore){ + $remoteAddress = $connection ? $connection->remoteAddress : null; + $resourceId = $connection ? $connection->resourceId : null; + $this->logStore->log($logTypes, $remoteAddress, $resourceId, $action, $message); + } + } + + /** + * + * @param TimerInterface $timer + */ + public function housekeeping(TimerInterface $timer) : void { + + } +} \ No newline at end of file diff --git a/app/Main/Formatter/SubscriptionFormatter.php b/app/Component/Formatter/SubscriptionFormatter.php similarity index 95% rename from app/Main/Formatter/SubscriptionFormatter.php rename to app/Component/Formatter/SubscriptionFormatter.php index cef76b3..f1e5972 100644 --- a/app/Main/Formatter/SubscriptionFormatter.php +++ b/app/Component/Formatter/SubscriptionFormatter.php @@ -6,7 +6,7 @@ * Time: 13:09 */ -namespace Exodus4D\Socket\Main\Formatter; +namespace Exodus4D\Socket\Component\Formatter; class SubscriptionFormatter{ diff --git a/app/Main/Handler/LogFileHandler.php b/app/Component/Handler/LogFileHandler.php similarity index 97% rename from app/Main/Handler/LogFileHandler.php rename to app/Component/Handler/LogFileHandler.php index 374bc50..22a759b 100644 --- a/app/Main/Handler/LogFileHandler.php +++ b/app/Component/Handler/LogFileHandler.php @@ -6,7 +6,7 @@ * Time: 17:02 */ -namespace Exodus4D\Socket\Main\Handler; +namespace Exodus4D\Socket\Component\Handler; class LogFileHandler { diff --git a/app/Main/MapUpdate.php b/app/Component/MapUpdate.php similarity index 56% rename from app/Main/MapUpdate.php rename to app/Component/MapUpdate.php index 6c9f3c8..62852a3 100644 --- a/app/Main/MapUpdate.php +++ b/app/Component/MapUpdate.php @@ -6,19 +6,67 @@ * Time: 22:29 */ -namespace Exodus4D\Socket\Main; +namespace Exodus4D\Socket\Component; -use Exodus4D\Socket\Main\Handler\LogFileHandler; -use Exodus4D\Socket\Main\Formatter\SubscriptionFormatter; -use Ratchet\MessageComponentInterface; +use Exodus4D\Socket\Component\Handler\LogFileHandler; +use Exodus4D\Socket\Component\Formatter\SubscriptionFormatter; +use Exodus4D\Socket\Data\Payload; +use Exodus4D\Socket\Log\Store; use Ratchet\ConnectionInterface; -class MapUpdate implements MessageComponentInterface { +class MapUpdate extends AbstractMessageComponent { + + /** + * unique name for this component + * -> should be overwritten in child instances + * -> is used as "log store" name + */ + const COMPONENT_NAME = 'webSock'; + + /** + * log message unknown task name + */ + const LOG_TEXT_TASK_UNKNOWN = 'unknown task: %s'; + + /** + * log message for denied subscription attempt. -> character data unknown + */ + const LOG_TEXT_SUBSCRIBE_DENY = 'sub. denied for charId: %d'; + + /** + * log message for invalid subscription data + */ + const LOG_TEXT_SUBSCRIBE_INVALID = 'sub. data invalid'; + + /** + * log message for subscribe characterId + */ + const LOG_TEXT_SUBSCRIBE = 'sub. charId: %s to mapIds: [%s]'; + + /** + * log message unsubscribe characterId + */ + const LOG_TEXT_UNSUBSCRIBE = 'unsub. charId: %d from mapIds: [%s]'; + + /** + * log message for map data updated broadcast + */ + const LOG_TEXT_MAP_UPDATE = 'update map data, mapId: %d → broadcast to %d connections'; + + /** + * log message for map subscriptions data updated broadcast + */ + const LOG_TEXT_MAP_SUBSCRIPTIONS = 'update map subscriptions data, mapId: %d. → broadcast to %d connections'; + + /** + * log message for delete mapId broadcast + */ + const LOG_TEXT_MAP_DELETE = 'delete mapId: $d → broadcast to %d connections'; /** * timestamp (ms) from last healthCheck ping * -> timestamp received from remote TCP socket - * @var + * @var int|null */ protected $healthCheckToken; @@ -48,7 +96,7 @@ class MapUpdate implements MessageComponentInterface { * [ * 'token' => $characterToken3, * 'expire' => $expireTime3, - * 'characterData' => $characterData2 + * 'characterData' => $characterData2 * ] * ] * ] @@ -108,86 +156,127 @@ class MapUpdate implements MessageComponentInterface { protected $characterData; /** - * enable debug output - * -> check debug() for more information - * @var bool + * MapUpdate constructor. + * @param Store $store */ - protected $debug = false; + public function __construct(Store $store){ + parent::__construct($store); - public function __construct() { - $this->characterAccessData = []; - $this->mapAccessData = []; - $this->characters = []; - $this->subscriptions = []; - $this->characterData = []; - - $this->log('Server START ------------------------------------------'); + $this->characterAccessData = []; + $this->mapAccessData = []; + $this->characters = []; + $this->subscriptions = []; + $this->characterData = []; } - public function onOpen(ConnectionInterface $conn) { - $this->log('NEW connection. ID (' . $conn->resourceId .') '); - } - - public function onMessage(ConnectionInterface $conn, $msg) { - $msg = json_decode($msg, true); - - if( - isset($msg['task']) && - isset($msg['load']) - ){ - $task = $msg['task']; - $load = $msg['load']; - - switch($task){ - case 'subscribe': - $this->subscribe($conn, $load); - break; - case 'healthCheck': - $this->validateHealthCheck($conn, $load); - break; - default: - break; - } - } + /** + * new client connection + * @param ConnectionInterface $conn + */ + public function onOpen(ConnectionInterface $conn){ + parent::onOpen($conn); } /** * @param ConnectionInterface $conn */ - public function onClose(ConnectionInterface $conn) { - $this->unSubscribeConnection($conn); + public function onClose(ConnectionInterface $conn){ + parent::onClose($conn); - $this->log('DISCONNECTED connection. ID (' . $conn->resourceId .') '); + $this->unSubscribeConnection($conn); } /** * @param ConnectionInterface $conn * @param \Exception $e */ - public function onError(ConnectionInterface $conn, \Exception $e) { - $this->unSubscribeConnection($conn); - $this->log('ERROR "' . $e->getMessage() . '" ID (' . $conn->resourceId .') '); + public function onError(ConnectionInterface $conn, \Exception $e){ + parent::onError($conn, $e); + + // close connection should trigger the onClose() callback for unSubscribe $conn->close(); } /** - * Check token (timestamp from initial TCP healthCheck poke against token send from client * @param ConnectionInterface $conn - * @param $token + * @param string $msg */ - private function validateHealthCheck($conn, $token){ - $isValid = 0; + public function onMessage(ConnectionInterface $conn, $msg){ + parent::onMessage($conn, $msg); + } - if( - $token && $this->healthCheckToken && - (int)$token === (int)$this->healthCheckToken - ){ - $isValid = 1; + /** + * @param ConnectionInterface $conn + * @param Payload $payload + */ + protected function dispatchWebSocketPayload(ConnectionInterface $conn, Payload $payload) : void { + switch($payload->task){ + case 'healthCheck': + $this->broadcastHealthCheck($conn, $payload); + break; + case 'subscribe': + $this->subscribe($conn, (array)$payload->load); + break; + case 'unsubscribe': + // make sure characterIds got from client are valid + // -> intersect with subscribed characterIds for current $conn + $characterIds = array_intersect((array)$payload->load, $this->getCharacterIdsByConnection($conn)); + if(!empty($characterIds)){ + $this->unSubscribeCharacterIds($characterIds, $conn); + } + break; + default: + $this->log(['debug', 'error'], $conn, __FUNCTION__, sprintf(static::LOG_TEXT_TASK_UNKNOWN, $payload->task)); + break; + } + } + + /** + * checks healthCheck $token and respond with validation status + subscription stats + * @param ConnectionInterface $conn + * @param Payload $payload + */ + private function broadcastHealthCheck(ConnectionInterface $conn, Payload $payload) : void { + $isValid = $this->validateHealthCheckToken((int)$payload->load); + + $load = [ + 'isValid' => $isValid, + ]; + + // Make sure WebSocket client request is valid + if($isValid){ + // set new healthCheckToken for next check + $load['token'] = $this->setHealthCheckToken(microtime(true)); + + // add subscription stats if $token is valid + $load['subStats'] = $this->getSubscriptionStats(); + } + + $payload->setLoad($load); + + $connections = new \SplObjectStorage(); + $connections->attach($conn); + + $this->broadcast($connections, $payload); + + } + + /** + * compare token (timestamp from initial TCP healthCheck message) with token send from WebSocket + * @param int $token + * @return bool + */ + private function validateHealthCheckToken(int $token) : bool { + $isValid = false; + + if($token && $this->healthCheckToken && $token === (int)$this->healthCheckToken){ + $isValid = true; } - $conn->send( json_encode($isValid) ); // reset token $this->healthCheckToken = null; + + return $isValid; } /** @@ -195,9 +284,9 @@ class MapUpdate implements MessageComponentInterface { * @param ConnectionInterface $conn * @param $subscribeData */ - private function subscribe(ConnectionInterface $conn, $subscribeData){ + private function subscribe(ConnectionInterface $conn, array $subscribeData) : void { $characterId = (int)$subscribeData['id']; - $characterToken = $subscribeData['token']; + $characterToken = (string)$subscribeData['token']; if($characterId && $characterToken){ // check if character access token is valid (exists and not expired in $this->characterAccessData) @@ -213,21 +302,33 @@ class MapUpdate implements MessageComponentInterface { $changedSubscriptionsMapIds = []; foreach((array)$subscribeData['mapData'] as $data){ $mapId = (int)$data['id']; - $mapToken = $data['token']; + $mapToken = (string)$data['token']; + $mapName = (string)$data['name']; if($mapId && $mapToken){ // check if token is valid (exists and not expired) in $this->mapAccessData - if( $this->checkMapAccess($characterId, $mapId, $mapToken) ){ + if($this->checkMapAccess($characterId, $mapId, $mapToken)){ // valid map subscribe request - $this->subscriptions[$mapId][$characterId] = $characterId; + $this->subscriptions[$mapId]['characterIds'][$characterId] = $characterId; + $this->subscriptions[$mapId]['data']['name'] = $mapName; $changedSubscriptionsMapIds[] = $mapId; } } } + sort($changedSubscriptionsMapIds, SORT_NUMERIC); + + $this->log(['debug', 'info'], $conn, __FUNCTION__, + sprintf(static::LOG_TEXT_SUBSCRIBE, $characterId, implode($changedSubscriptionsMapIds, ',')) + ); + // broadcast all active subscriptions to subscribed connections ------------------------------------------- - $this->broadcastMapSubscriptions('mapSubscriptions', $changedSubscriptionsMapIds); + $this->broadcastMapSubscriptions($changedSubscriptionsMapIds); + }else{ + $this->log(['debug', 'info'], $conn, __FUNCTION__, sprintf(static::LOG_TEXT_SUBSCRIBE_DENY, $characterId)); } + }else{ + $this->log(['debug', 'error'], $conn, __FUNCTION__, static::LOG_TEXT_SUBSCRIBE_INVALID); } } @@ -244,10 +345,10 @@ class MapUpdate implements MessageComponentInterface { * unSubscribe a $characterId from ALL maps * -> if $conn is set -> just unSub the $characterId from this $conn * @param int $characterId - * @param null $conn + * @param ConnectionInterface|null $conn * @return bool */ - private function unSubscribeCharacterId($characterId, $conn = null){ + private function unSubscribeCharacterId(int $characterId, ?ConnectionInterface $conn = null) : bool { if($characterId){ // unSub from $this->characters --------------------------------------------------------------------------- if($conn){ @@ -258,7 +359,7 @@ class MapUpdate implements MessageComponentInterface { // no connection left for this character unset($this->characters[$characterId]); } - // TODO unset $this->>$characterData if $characterid does not have any other map subscribed to + // TODO unset $this->>$characterData if $characterId does not have any other map subscribed to }else{ // unSub ALL connections from a character (e.g. multiple browsers) unset($this->characters[$characterId]); @@ -269,11 +370,11 @@ class MapUpdate implements MessageComponentInterface { // unSub from $this->subscriptions ------------------------------------------------------------------------ $changedSubscriptionsMapIds = []; - foreach($this->subscriptions as $mapId => $characterIds){ - if(array_key_exists($characterId, $characterIds)){ - unset($this->subscriptions[$mapId][$characterId]); + foreach($this->subscriptions as $mapId => $subData){ + if(array_key_exists($characterId, (array)$subData['characterIds'])){ + unset($this->subscriptions[$mapId]['characterIds'][$characterId]); - if( !count($this->subscriptions[$mapId]) ){ + if( !count($this->subscriptions[$mapId]['characterIds']) ){ // no characters left on this map unset($this->subscriptions[$mapId]); } @@ -282,8 +383,14 @@ class MapUpdate implements MessageComponentInterface { } } + sort($changedSubscriptionsMapIds, SORT_NUMERIC); + + $this->log(['debug', 'info'], $conn, __FUNCTION__, + sprintf(static::LOG_TEXT_UNSUBSCRIBE, $characterId, implode($changedSubscriptionsMapIds, ',')) + ); + // broadcast all active subscriptions to subscribed connections ------------------------------------------- - $this->broadcastMapSubscriptions('mapSubscriptions', $changedSubscriptionsMapIds); + $this->broadcastMapSubscriptions($changedSubscriptionsMapIds); } return true; @@ -292,11 +399,11 @@ class MapUpdate implements MessageComponentInterface { /** * unSubscribe $characterIds from ALL maps * -> if $conn is set -> just unSub the $characterId from this $conn - * @param array $characterIds - * @param null $conn + * @param int[] $characterIds + * @param ConnectionInterface|null $conn * @return bool */ - private function unSubscribeCharacterIds(array $characterIds, $conn = null): bool{ + private function unSubscribeCharacterIds(array $characterIds, ?ConnectionInterface $conn = null) : bool { $response = false; foreach($characterIds as $characterId){ $response = $this->unSubscribeCharacterId($characterId, $conn); @@ -310,26 +417,30 @@ class MapUpdate implements MessageComponentInterface { * @param int $mapId * @return int */ - private function deleteMapId(string $task, $mapId){ - $connectionCount = $this->broadcastMapData($task, $mapId, $mapId); + private function deleteMapId(string $task, int $mapId) : int { + $connectionCount = $this->broadcastMapData($task, $mapId, $mapId); // remove map from subscriptions - if( isset($this->subscriptions[$mapId]) ){ + if(isset($this->subscriptions[$mapId])){ unset($this->subscriptions[$mapId]); } + $this->log(['debug', 'info'], null, __FUNCTION__, + sprintf(static::LOG_TEXT_MAP_DELETE, $mapId, $connectionCount) + ); + return $connectionCount; } /** * get all mapIds a characterId has subscribed to * @param int $characterId - * @return array + * @return int[] */ private function getMapIdsByCharacterId(int $characterId) : array { $mapIds = []; - foreach($this->subscriptions as $mapId => $characterIds){ - if(array_key_exists($characterId, $characterIds)){ + foreach($this->subscriptions as $mapId => $subData) { + if(array_key_exists($characterId, (array)$subData['characterIds'])){ $mapIds[] = $mapId; } } @@ -340,7 +451,7 @@ class MapUpdate implements MessageComponentInterface { * @param ConnectionInterface $conn * @return int[] */ - private function getCharacterIdsByConnection(ConnectionInterface $conn){ + private function getCharacterIdsByConnection(ConnectionInterface $conn) : array { $characterIds = []; $resourceId = $conn->resourceId; @@ -363,31 +474,43 @@ class MapUpdate implements MessageComponentInterface { $characterIds = []; if( array_key_exists($mapId, $this->subscriptions) && - is_array($this->subscriptions[$mapId]) + is_array($this->subscriptions[$mapId]['characterIds']) ){ - $characterIds = array_keys($this->subscriptions[$mapId]); + $characterIds = array_keys($this->subscriptions[$mapId]['characterIds']); } return $characterIds; } /** - * get connection objects by characterIds + * get connections by $characterIds * @param int[] $characterIds * @return \SplObjectStorage */ - private function getConnectionsByCharacterIds($characterIds){ + private function getConnectionsByCharacterIds(array $characterIds) : \SplObjectStorage { $connections = new \SplObjectStorage; - foreach($characterIds as $characterId){ - if($charConnections = (array)$this->characters[$characterId] ){ - foreach($charConnections as $conn){ - if( !$connections->contains($conn) ){ - $connections->attach($conn); - } + $connections->addAll($this->getConnectionsByCharacterId($characterId)); + } + return $connections; + } + + /** + * get connections by $characterId + * @param int $characterId + * @return \SplObjectStorage + */ + private function getConnectionsByCharacterId(int $characterId) : \SplObjectStorage { + $connections = new \SplObjectStorage; + if(isset($this->characters[$characterId])){ + foreach(array_keys($this->characters[$characterId]) as $resourceId){ + if( + $this->hasConnectionId($resourceId) && + !$connections->contains($conn = $this->getConnection($resourceId)) + ){ + $connections->attach($conn); } } } - return $connections; } @@ -397,12 +520,13 @@ class MapUpdate implements MessageComponentInterface { * @param $characterToken * @return array */ - private function checkCharacterAccess($characterId, $characterToken) : array { + private function checkCharacterAccess(int $characterId, string $characterToken) : array { $characterData = []; if( !empty($characterAccessData = (array)$this->characterAccessData[$characterId]) ){ + // check expire for $this->characterAccessData -> check ALL characters and remove expired foreach($characterAccessData as $i => $data){ $deleteToken = false; - // check expire for $this->characterAccessData -> check ALL characters and remove expired + if( ((int)$data['expire'] - time()) > 0 ){ // still valid -> check token if($characterToken === $data['token']){ @@ -425,6 +549,7 @@ class MapUpdate implements MessageComponentInterface { } } } + return $characterData; } @@ -435,7 +560,7 @@ class MapUpdate implements MessageComponentInterface { * @param $mapToken * @return bool */ - private function checkMapAccess($characterId, $mapId, $mapToken){ + private function checkMapAccess(int $characterId, int $mapId, string $mapToken) : bool { $access = false; if( !empty($mapAccessData = (array)$this->mapAccessData[$mapId][$characterId]) ){ foreach($mapAccessData as $i => $data){ @@ -469,21 +594,14 @@ class MapUpdate implements MessageComponentInterface { } /** - * broadcast data ($load) to $connections - * @param ConnectionInterface[] | \SplObjectStorage $connections - * @param $task - * @param $load - * @param int[] $characterIds optional, recipients (e.g if multiple browser tabs are open) + * broadcast $payload to $connections + * @param \SplObjectStorage $connections + * @param Payload $payload */ - private function broadcastData($connections, string $task, $load, array $characterIds = []){ - $response = [ - 'task' => $task, - 'characterIds' => $characterIds, - 'load' => $load - ]; - + private function broadcast(\SplObjectStorage $connections, Payload $payload) : void { + $data = json_encode($payload); foreach($connections as $conn){ - $conn->send( json_encode($response) ); + $this->send($conn, $data); } } @@ -493,7 +611,7 @@ class MapUpdate implements MessageComponentInterface { * receive data from TCP socket (main App) * -> send response back * @param string $task - * @param null $load + * @param null|int|array $load * @return bool|float|int|null */ public function receiveData(string $task, $load = null){ @@ -501,16 +619,15 @@ class MapUpdate implements MessageComponentInterface { switch($task){ case 'healthCheck': - $this->healthCheckToken = (float)$load; - $responseLoad = $this->healthCheckToken; + $responseLoad = $this->setHealthCheckToken((float)$load); break; case 'characterUpdate': - $this->updateCharacterData($load); + $this->updateCharacterData((array)$load); $mapIds = $this->getMapIdsByCharacterId((int)$load['id']); - $this->broadcastMapSubscriptions('mapSubscriptions', $mapIds); + $this->broadcastMapSubscriptions($mapIds); break; case 'characterLogout': - $responseLoad = $this->unSubscribeCharacterIds($load); + $responseLoad = $this->unSubscribeCharacterIds((array)$load); break; case 'mapConnectionAccess': $responseLoad = $this->setConnectionAccess($load); @@ -519,10 +636,10 @@ class MapUpdate implements MessageComponentInterface { $responseLoad = $this->setAccess($task, $load); break; case 'mapUpdate': - $responseLoad = $this->broadcastMapUpdate($task, $load); + $responseLoad = $this->broadcastMapUpdate($task, (array)$load); break; case 'mapDeleted': - $responseLoad = $this->deleteMapId($task, $load); + $responseLoad = $this->deleteMapId($task, (int)$load); break; case 'logData': $this->handleLogData((array)$load['meta'], (array)$load['log']); @@ -532,39 +649,63 @@ class MapUpdate implements MessageComponentInterface { return $responseLoad; } - private function setCharacterData(array $characterData){ - $characterId = (int)$characterData['id']; - if($characterId){ + /** + * @param float $token + * @return float + */ + private function setHealthCheckToken(float $token) : float { + $this->healthCheckToken = $token; + return $this->healthCheckToken; + } + + /** + * @param array $characterData + */ + private function setCharacterData(array $characterData) : void { + if($characterId = (int)$characterData['id']){ $this->characterData[$characterId] = $characterData; } } + /** + * @param int $characterId + * @return array + */ private function getCharacterData(int $characterId) : array { return empty($this->characterData[$characterId]) ? [] : $this->characterData[$characterId]; } + /** + * @param array $characterIds + * @return array + */ private function getCharactersData(array $characterIds) : array { return array_filter($this->characterData, function($characterId) use($characterIds) { return in_array($characterId, $characterIds); }, ARRAY_FILTER_USE_KEY); } - private function updateCharacterData(array $characterData){ + /** + * @param array $characterData + */ + private function updateCharacterData(array $characterData) : void { $characterId = (int)$characterData['id']; if($this->getCharacterData($characterId)){ $this->setCharacterData($characterData); } } - private function deleteCharacterData(int $characterId){ + /** + * @param int $characterId + */ + private function deleteCharacterData(int $characterId) : void { unset($this->characterData[$characterId]); } /** - * @param string $task * @param array $mapIds */ - private function broadcastMapSubscriptions(string $task, array $mapIds){ + private function broadcastMapSubscriptions(array $mapIds) : void { $mapIds = array_unique($mapIds); foreach($mapIds as $mapId){ @@ -578,7 +719,11 @@ class MapUpdate implements MessageComponentInterface { $mapUserData->config = (object)['id' => $mapId]; $mapUserData->data = (object)['systems' => $systems]; - $this->broadcastMapData($task, $mapId, $mapUserData); + $connectionCount = $this->broadcastMapData('mapSubscriptions', $mapId, $mapUserData); + + $this->log(['debug'], null, __FUNCTION__, + sprintf(static::LOG_TEXT_MAP_SUBSCRIPTIONS, $mapId, $connectionCount) + ); } } } @@ -588,10 +733,15 @@ class MapUpdate implements MessageComponentInterface { * @param array $mapData * @return int */ - private function broadcastMapUpdate(string $task, $mapData){ + private function broadcastMapUpdate(string $task, array $mapData) : int { $mapId = (int)$mapData['config']['id']; + $connectionCount = $this->broadcastMapData($task, $mapId, $mapData); - return $this->broadcastMapData($task, $mapId, $mapData); + $this->log(['debug'], null, __FUNCTION__, + sprintf(static::LOG_TEXT_MAP_UPDATE, $mapId, $connectionCount) + ); + + return $connectionCount; } /** @@ -605,7 +755,8 @@ class MapUpdate implements MessageComponentInterface { $characterIds = $this->getCharacterIdsByMapId($mapId); $connections = $this->getConnectionsByCharacterIds($characterIds); - $this->broadcastData($connections, $task, $load, $characterIds); + $this->broadcast($connections, $this->newPayload($task, $load, $characterIds)); + return count($connections); } @@ -619,6 +770,7 @@ class MapUpdate implements MessageComponentInterface { $newMapCharacterIds = []; if($mapId = (int)$accessData['id']){ + $mapName = (string)$accessData['name']; $characterIds = (array)$accessData['characterIds']; // check all charactersIds that have map access... -------------------------------------------------------- foreach($characterIds as $characterId){ @@ -632,21 +784,23 @@ class MapUpdate implements MessageComponentInterface { } } - $currentMapCharacterIds = (array)$this->subscriptions[$mapId]; + $currentMapCharacterIds = (array)$this->subscriptions[$mapId]['characterIds']; // broadcast "map delete" to no longer valid characters --------------------------------------------------- $removedMapCharacterIds = array_keys(array_diff_key($currentMapCharacterIds, $newMapCharacterIds)); $removedMapCharacterConnections = $this->getConnectionsByCharacterIds($removedMapCharacterIds); - $this->broadcastData($removedMapCharacterConnections, $task, $mapId, $removedMapCharacterIds); + + $this->broadcast($removedMapCharacterConnections, $this->newPayload($task, $mapId, $removedMapCharacterIds)); // update map subscriptions ------------------------------------------------------------------------------- if( !empty($newMapCharacterIds) ){ // set new characters that have map access (overwrites existing subscriptions for that map) - $this->subscriptions[$mapId] = $newMapCharacterIds; + $this->subscriptions[$mapId]['characterIds'] = $newMapCharacterIds; + $this->subscriptions[$mapId]['data']['name'] = $mapName; // check if subscriptions have changed if( !$this->arraysEqualKeys($currentMapCharacterIds, $newMapCharacterIds) ){ - $this->broadcastMapSubscriptions('mapSubscriptions', [$mapId]); + $this->broadcastMapSubscriptions([$mapId]); } }else{ // no characters (left) on this map @@ -661,7 +815,7 @@ class MapUpdate implements MessageComponentInterface { * @param $connectionAccessData * @return bool */ - private function setConnectionAccess($connectionAccessData) { + private function setConnectionAccess($connectionAccessData){ $response = false; $characterId = (int)$connectionAccessData['id']; $characterData = $connectionAccessData['characterData']; @@ -697,6 +851,75 @@ class MapUpdate implements MessageComponentInterface { return $response; } + /** + * get stats data + * -> lists all channels, subscribed characters + connection info + * @return array + */ + protected function getSubscriptionStats() : array { + $uniqueConnections = []; + $uniqueSubscriptions = []; + $channelsStats = []; + + foreach($this->subscriptions as $mapId => $subData){ + $characterIds = $this->getCharacterIdsByMapId($mapId); + $uniqueMapConnections = []; + + $channelStats = [ + 'channelId' => $mapId, + 'channelName' => $subData['data']['name'], + 'countSub' => count($characterIds), + 'countCon' => 0, + 'subscriptions' => [] + ]; + + foreach($characterIds as $characterId){ + $characterData = $this->getCharacterData($characterId); + $connections = $this->getConnectionsByCharacterId($characterId); + + $characterStats = [ + 'characterId' => $characterId, + 'characterName' => isset($characterData['name']) ? $characterData['name'] : null, + 'countCon' => $connections->count(), + 'connections' => [] + ]; + + foreach($connections as $connection){ + if(!in_array($connection->resourceId, $uniqueMapConnections)){ + $uniqueMapConnections[] = $connection->resourceId; + } + + $metaData = $this->getConnectionData($connection); + $microTime = (float)$metaData['mTimeSend']; + $logTime = Store::getDateTimeFromMicrotime($microTime); + + $characterStats['connections'][] = [ + 'resourceId' => $connection->resourceId, + 'remoteAddress' => $connection->remoteAddress, + 'mTimeSend' => $microTime, + 'mTimeSendFormat1' => $logTime->format('Y-m-d H:i:s.u'), + 'mTimeSendFormat2' => $logTime->format('H:i:s') + ]; + } + + $channelStats['subscriptions'][] = $characterStats; + } + + $uniqueConnections = array_unique(array_merge($uniqueConnections, $uniqueMapConnections)); + $uniqueSubscriptions = array_unique(array_merge($uniqueSubscriptions, $characterIds)); + + $channelStats['countCon'] = count($uniqueMapConnections); + + $channelsStats[] = $channelStats; + } + + return [ + 'countSub' => count($uniqueSubscriptions), + 'countCon' => count($uniqueConnections), + 'channels' => $channelsStats + ]; + } + /** * compare two assoc arrays by keys. Key order is ignored * -> if all keys from array1 exist in array2 && all keys from array2 exist in array 1, arrays are supposed to be equal @@ -717,46 +940,4 @@ class MapUpdate implements MessageComponentInterface { $logHandler = new LogFileHandler((string)$meta['stream']); $logHandler->write($log); } - - - // logging ======================================================================================================== - - /** - * outputs a custom log text - * -> The output can be written to a *.log file if running the webSocket server (cmd.php) as a service - * @param $text - */ - protected function log($text){ - $text = date('Y-m-d H:i:s') . ' ' . $text; - echo $text . "\n"; - - $this->debug(); - } - - protected function debug(){ - if( $this->debug ){ - $mapId = 1; - $characterId = 1946320202; - - $subscriptions = $this->subscriptions[$mapId]; - $connectionsForChar = count($this->characters[$characterId]); - $mapAccessData = $this->mapAccessData[$mapId][$characterId]; - echo "\n" . "========== START ==========" . "\n"; - - echo "-> characterAccessData: " . "\n"; - var_dump( $this->characterAccessData ); - - echo "\n" . "-> Subscriptions mapId: " . $mapId . " " . "\n"; - var_dump($subscriptions); - - echo "\n" . "-> connectionsForChar characterId: " . $characterId . " count: " . $connectionsForChar . " " . "\n"; - - echo "-> mapAccessData: " . "\n"; - var_dump($mapAccessData); - - echo "\n" . "========== END ==========" . "\n"; - } - - } - } \ No newline at end of file diff --git a/app/Data/Payload.php b/app/Data/Payload.php new file mode 100644 index 0000000..5d72db3 --- /dev/null +++ b/app/Data/Payload.php @@ -0,0 +1,94 @@ + recipients + * -> e.g if multiple browser tabs are open + * @var null|array + */ + private $characterIds; + + /** + * Payload constructor. + * @param string $task + * @param null $load + * @param array|null $characterIds + */ + public function __construct(string $task, $load = null, ?array $characterIds = null){ + $this->setTask($task); + $this->setLoad($load); + $this->setCharacterIds($characterIds); + } + + /** + * @param string $task + */ + public function setTask(string $task){ + if($task){ + $this->task = $task; + }else{ + throw new \InvalidArgumentException(self::ERROR_TASK_MISSING); + } + } + + /** + * @param null $load + */ + public function setLoad($load = null){ + $this->load = $load; + } + + /** + * @param array|null $characterIds + */ + public function setCharacterIds(?array $characterIds){ + if(is_array($characterIds)){ + $this->characterIds = $characterIds; + }else{ + $this->characterIds = null; + } + } + + /** + * @param $name + * @return mixed + */ + public function __get($name){ + return $this->$name; + } + + /** + * @return array|mixed + */ + public function jsonSerialize(){ + return get_object_vars($this); + } +} \ No newline at end of file diff --git a/app/Log/ShellColors.php b/app/Log/ShellColors.php new file mode 100644 index 0000000..0858179 --- /dev/null +++ b/app/Log/ShellColors.php @@ -0,0 +1,90 @@ +foregroundColors['black'] = '0;30'; + $this->foregroundColors['dark_gray'] = '1;30'; + $this->foregroundColors['blue'] = '0;34'; + $this->foregroundColors['light_blue'] = '1;34'; + $this->foregroundColors['green'] = '0;32'; + $this->foregroundColors['light_green'] = '1;32'; + $this->foregroundColors['cyan'] = '0;36'; + $this->foregroundColors['light_cyan'] = '1;36'; + $this->foregroundColors['red'] = '0;31'; + $this->foregroundColors['light_red'] = '1;31'; + $this->foregroundColors['purple'] = '0;35'; + $this->foregroundColors['light_purple'] = '1;35'; + $this->foregroundColors['brown'] = '0;33'; + $this->foregroundColors['yellow'] = '1;33'; + $this->foregroundColors['light_gray'] = '0;37'; + $this->foregroundColors['white'] = '1;37'; + + $this->backgroundColors['black'] = '40'; + $this->backgroundColors['red'] = '41'; + $this->backgroundColors['green'] = '42'; + $this->backgroundColors['yellow'] = '43'; + $this->backgroundColors['blue'] = '44'; + $this->backgroundColors['magenta'] = '45'; + $this->backgroundColors['cyan'] = '46'; + $this->backgroundColors['light_gray'] = '47'; + } + + /** + * get colored string + * @param string $string + * @param string|null $foregroundColor + * @param string|null $backgroundColor + * @return string + */ + public function getColoredString(string $string, ?string $foregroundColor = null, ?string $backgroundColor = null) : string { + $coloredString = ""; + + // Check if given foreground color found + if (isset($this->foregroundColors[$foregroundColor])) { + $coloredString .= "\033[" . $this->foregroundColors[$foregroundColor] . "m"; + } + // Check if given background color found + if (isset($this->backgroundColors[$backgroundColor])) { + $coloredString .= "\033[" . $this->backgroundColors[$backgroundColor] . "m"; + } + + // Add string and end coloring + $coloredString .= $string . "\033[0m"; + + return $coloredString; + } + + /** + * returns all foreground color names + * @return array + */ + public function getForegroundColors() : array { + return array_keys($this->foregroundColors); + } + + /** + * returns all background color names + * @return array + */ + public function getBackgroundColors() : array { + return array_keys($this->backgroundColors); + } +} \ No newline at end of file diff --git a/app/Log/Store.php b/app/Log/Store.php new file mode 100644 index 0000000..6b35335 --- /dev/null +++ b/app/Log/Store.php @@ -0,0 +1,220 @@ + store size should be limited for memory reasons + * @var array + */ + private $store = []; + + /** + * all valid types for custom log events + * if value is false, logs for this type are ignored + * @var array + */ + protected $logTypes = [ + 'error' => true, + 'info' => true, + 'debug' => true + ]; + + /** + * if Store is locked, current state can not be changed + * @var bool + */ + protected $locked = false; + + /** + * @var ShellColors + */ + static $colors; + + /** + * Store constructor. + * @param string $name + */ + public function __construct(string $name){ + $this->name = $name; + } + + /** + * get all stored log entries + * @return array + */ + public function getStore() : array { + return $this->store; + } + + /** + * @param bool $locked + */ + public function setLocked(bool $locked){ + $this->locked = $locked; + } + + /** + * @return bool + */ + public function isLocked() : bool { + return $this->locked; + } + + /** + * @param int $logLevel + */ + public function setLogLevel(int $logLevel){ + switch($logLevel){ + case 3: + $this->logTypes['error'] = true; + $this->logTypes['info'] = true; + $this->logTypes['debug'] = true; + break; + case 2: + $this->logTypes['error'] = true; + $this->logTypes['info'] = true; + $this->logTypes['debug'] = false; + break; + case 1: + $this->logTypes['error'] = true; + $this->logTypes['info'] = false; + $this->logTypes['debug'] = false; + break; + case 0: + default: + $this->setLocked(true); // no logging + } + } + + /** + * this is used for custom log events like 'error', 'debug',... + * works as dispatcher method that calls individual log*() methods + * @param $logTypes + * @param string|null $remoteAddress + * @param int|null $resourceId + * @param string $action + * @param string $message + */ + public function log($logTypes, ?string $remoteAddress, ?int $resourceId, string $action, string $message = '') : void { + if(!$this->isLocked()){ + // filter out logTypes that should not be logged + $logTypes = array_filter((array)$logTypes, function(string $type) : bool { + return array_key_exists($type, $this->logTypes) && $this->logTypes[$type]; + }); + + if($logTypes){ + // get log entry data + $logData = $this->getLogData($logTypes, $remoteAddress, $resourceId, $action, $message); + + if(self::DEFAULT_LOG_TO_STDOUT){ + $this->echoLog($logData); + } + + // add entry to local store and check size limit for store + $this->store[] = $logData; + $this->store = array_slice($this->store, self::DEFAULT_LOG_STORE_SIZE * -1); + } + } + } + + /** + * get log data as array for a custom log entry + * @param array $logTypes + * @param string|null $remoteAddress + * @param int|null $resourceId + * @param string $action + * @param string $message + * @return array + */ + private function getLogData(array $logTypes, ?string $remoteAddress, ?int $resourceId, string $action, string $message = '') : array { + $file = null; + $lineNum = null; + $function = null; + + $traceIndex = 4; + $backtrace = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, $traceIndex); + if(count($backtrace) == $traceIndex){ + $caller = $backtrace[$traceIndex - 2]; + $callerOrig = $backtrace[$traceIndex - 1]; + + $file = substr($caller['file'], strlen(dirname(dirname(dirname($caller['file'])))) + 1); + $lineNum = $caller['line']; + $function = $callerOrig['function']; + } + + $microTime = microtime(true); + $logTime = self::getDateTimeFromMicrotime($microTime); + + return [ + 'store' => $this->name, + 'mTime' => $microTime, + 'mTimeFormat1' => $logTime->format('Y-m-d H:i:s.u'), + 'mTimeFormat2' => $logTime->format('H:i:s'), + 'logTypes' => $logTypes, + 'remoteAddress' => $remoteAddress, + 'resourceId' => $resourceId, + 'fileName' => $file, + 'lineNumber' => $lineNum, + 'function' => $function, + 'action' => $action, + 'message' => $message + ]; + } + + /** + * echo log data to stdout -> terminal + * @param array $logData + */ + private function echoLog(array $logData) : void { + if(!self::$colors){ + self::$colors = new ShellColors(); + } + + $data = [ + self::$colors->getColoredString($logData['mTimeFormat1'], 'dark_gray'), + self::$colors->getColoredString($logData['store'], $logData['store'] == 'webSock' ? 'brown' : 'cyan'), + $logData['remoteAddress'] . ($logData['resourceId'] ? ' #' . $logData['resourceId'] : ''), + self::$colors->getColoredString($logData['fileName'] . ' line ' . $logData['lineNumber'], 'dark_gray'), + self::$colors->getColoredString($logData['function'] . '()' . (($logData['function'] !== $logData['action']) ? ' [' . $logData['action'] . ']' : ''), 'dark_gray'), + implode((array)$logData['logTypes'], ','), + self::$colors->getColoredString($logData['message'], 'light_purple') + ]; + + echo implode(array_filter($data), ' | ') . PHP_EOL; + } + + /** + * @see https://stackoverflow.com/a/29598719/4329969 + * @param float $mTime + * @return \DateTime + */ + public static function getDateTimeFromMicrotime(float $mTime) : \DateTime { + return \DateTime::createFromFormat('U.u', number_format($mTime, 6, '.', '')); + } +} \ No newline at end of file diff --git a/app/Socket/AbstractSocket.php b/app/Socket/AbstractSocket.php new file mode 100644 index 0000000..5753c03 --- /dev/null +++ b/app/Socket/AbstractSocket.php @@ -0,0 +1,67 @@ + should be overwritten in child instances + * -> is used as "log store" name + */ + const COMPONENT_NAME = 'default'; + + /** + * global server loop + * @var EventLoop\LoopInterface + */ + protected $loop; + + /** + * @var MessageComponentInterface + */ + protected $handler; + + /** + * @var Store + */ + protected $logStore; + + /** + * AbstractSocket constructor. + * @param EventLoop\LoopInterface $loop + * @param MessageComponentInterface $handler + * @param Store $store + */ + public function __construct( + EventLoop\LoopInterface $loop, + MessageComponentInterface $handler, + Store $store + ){ + $this->loop = $loop; + $this->handler = $handler; + $this->logStore = $store; + + $this->log(['debug', 'info'], null, 'START', 'start Socket server…'); + } + + /** + * @param $logTypes + * @param Socket\ConnectionInterface|null $connection + * @param string $action + * @param string $message + */ + public function log($logTypes, ?Socket\ConnectionInterface $connection, string $action, string $message = '') : void { + if(!$this->logStore->isLocked()){ + $remoteAddress = $connection ? $connection->getRemoteAddress() : null; + $this->logStore->log($logTypes, $remoteAddress, null, $action, $message); + } + } + +} \ No newline at end of file diff --git a/app/Socket/TcpSocket.php b/app/Socket/TcpSocket.php index e989879..0af264c 100644 --- a/app/Socket/TcpSocket.php +++ b/app/Socket/TcpSocket.php @@ -9,6 +9,7 @@ namespace Exodus4D\Socket\Socket; +use Exodus4D\Socket\Log\Store; use React\EventLoop; use React\Socket; use React\Promise; @@ -16,7 +17,14 @@ use React\Stream; use Clue\React\NDJson; use Ratchet\MessageComponentInterface; -class TcpSocket { +class TcpSocket extends AbstractSocket{ + + /** + * unique name for this component + * -> should be overwritten in child instances + * -> is used as "log store" name + */ + const COMPONENT_NAME = 'tcpSock'; /** * error message for unknown acceptType @@ -77,12 +85,7 @@ class TcpSocket { /** * default for: add socket statistic to response payload */ - const DEFAULT_STATS = true; - - /** - * default for: echo debug messages - */ - const DEFAULT_DEBUG = false; + const DEFAULT_ADD_STATS = false; /** * max length for JSON data string @@ -90,17 +93,6 @@ class TcpSocket { */ const JSON_DECODE_MAX_LENGTH = 65536 * 4; - /** - * global server loop - * @var EventLoop\LoopInterface - */ - private $loop; - - /** - * @var MessageComponentInterface - */ - private $handler; - /** * @see TcpSocket::DEFAULT_ACCEPT_TYPE * @var string @@ -123,7 +115,7 @@ class TcpSocket { * @see TcpSocket::DEFAULT_STATS * @var bool */ - private $stats = self::DEFAULT_STATS; + private $addStats = self::DEFAULT_ADD_STATS; /** * storage for all active connections @@ -137,18 +129,19 @@ class TcpSocket { * -> represents number of active connected clients * @var int */ - private $maxConnections = 0; + private $maxConnections = 0; /** * timestamp on startup * @var int */ - private $startupTime = 0; + private $startupTime = 0; /** * TcpSocket constructor. * @param EventLoop\LoopInterface $loop * @param MessageComponentInterface $handler + * @param Store $store * @param string $acceptType * @param float $waitTimeout * @param bool $endWithResponse @@ -156,12 +149,13 @@ class TcpSocket { public function __construct( EventLoop\LoopInterface $loop, MessageComponentInterface $handler, + Store $store, string $acceptType = self::DEFAULT_ACCEPT_TYPE, float $waitTimeout = self::DEFAULT_WAIT_TIMEOUT, bool $endWithResponse = self::DEFAULT_END_WITH_RESPONSE ){ - $this->loop = $loop; - $this->handler = $handler; + parent::__construct($loop, $handler, $store); + $this->acceptType = $acceptType; $this->waitTimeout = $waitTimeout; $this->endWithResponse = $endWithResponse; @@ -173,7 +167,7 @@ class TcpSocket { * @param Socket\ConnectionInterface $connection */ public function onConnect(Socket\ConnectionInterface $connection){ - $this->debug($connection, __FUNCTION__, '----------------------------------------'); + $this->log('debug', $connection, __FUNCTION__, 'open connection…'); if($this->isValidConnection($connection)){ // connection can be used @@ -182,32 +176,30 @@ class TcpSocket { // set waitTimeout timer for connection $this->setTimerTimeout($connection, $this->waitTimeout); - $this->debug($connection, __FUNCTION__); - // register connection events ... ------------------------------------------------------------------------- $this->initRead($connection) - ->then($this->initDispatch()) + ->then($this->initDispatch($connection)) ->then($this->initResponse($connection)) ->then( - function(string $message) use ($connection) { - $this->debug($connection, 'DONE', $message); + function(array $payload) use ($connection) { + $this->log(['debug', 'info'], $connection,'DONE', 'task "' . $payload['task'] . '" done → response send'); }, function(\Exception $e) use ($connection) { - $this->debug($connection, 'ERROR', $e->getMessage()); + $this->log(['debug', 'error'], $connection, 'ERROR', $e->getMessage()); $this->connectionError($connection, $e); }); $connection->on('end', function() use ($connection) { - $this->debug($connection, 'onEnd'); + $this->log('debug', $connection, 'onEnd'); }); - $connection->on('close', function() use ($connection){ - $this->debug($connection, 'onClose'); + $connection->on('close', function() use ($connection) { + $this->log(['debug'], $connection, 'onClose', 'close connection'); $this->removeConnection($connection); }); $connection->on('error', function(\Exception $e) use ($connection) { - $this->debug($connection, 'onError', $e->getMessage()); + $this->log(['debug', 'error'], $connection, 'onError', $e->getMessage()); }); }else{ // invalid connection -> can not be used @@ -254,15 +246,16 @@ class TcpSocket { /** * init dispatcher for payload + * @param Socket\ConnectionInterface $connection * @return callable */ - protected function initDispatch() : callable { - return function($payload) : Promise\PromiseInterface { + protected function initDispatch(Socket\ConnectionInterface $connection) : callable { + return function(array $payload) use ($connection) : Promise\PromiseInterface { $task = (string)$payload['task']; if(!empty($task)){ $load = $payload['load']; $deferred = new Promise\Deferred(); - $this->dispatch($deferred, $task, $load); + $this->dispatch($connection, $deferred, $task, $load); return $deferred->promise(); }else{ return new Promise\RejectedPromise( @@ -273,17 +266,21 @@ class TcpSocket { } /** + * @param Socket\ConnectionInterface $connection * @param Promise\Deferred $deferred * @param string $task * @param null $load */ - protected function dispatch(Promise\Deferred $deferred, string $task, $load = null) : void { + protected function dispatch(Socket\ConnectionInterface $connection, Promise\Deferred $deferred, string $task, $load = null) : void { + $addStatusData = false; switch($task){ case 'getStats': - $deferred->resolve($this->newPayload($task, null)); + $addStatusData = true; + $deferred->resolve($this->newPayload($task, null, $addStatusData)); break; case 'healthCheck': + $addStatusData = true; case 'characterUpdate': case 'characterLogout': case 'mapConnectionAccess': @@ -292,10 +289,13 @@ class TcpSocket { case 'mapDeleted': case 'logData': if(method_exists($this->handler, 'receiveData')){ + $this->log(['info'], $connection, __FUNCTION__, 'task "' . $task . '" processing…'); + $deferred->resolve( $this->newPayload( $task, - call_user_func_array([$this->handler, 'receiveData'], [$task, $load]) + call_user_func_array([$this->handler, 'receiveData'], [$task, $load]), + $addStatusData ) ); }else{ @@ -313,7 +313,7 @@ class TcpSocket { */ protected function initResponse(Socket\ConnectionInterface $connection) : callable { return function(array $payload) use ($connection) : Promise\PromiseInterface { - $this->debug($connection, 'initResponse'); + $this->log('debug', $connection, 'initResponse', 'task "' . $payload['task'] . '" → init response'); $deferred = new Promise\Deferred(); $this->write($deferred, $connection, $payload); @@ -344,7 +344,7 @@ class TcpSocket { } if($write){ - $deferred->resolve('OK'); + $deferred->resolve($payload); }else{ $deferred->reject(new \Exception( sprintf(self::ERROR_STREAM_NOT_WRITABLE, $connection->getRemoteAddress()) @@ -360,7 +360,8 @@ class TcpSocket { * @param \Exception $e */ protected function connectionError(Socket\ConnectionInterface $connection, \Exception $e){ - $this->debug($connection, __FUNCTION__, $e->getMessage()); + $errorMessage = $e->getMessage(); + $this->log(['debug', 'error'], $connection, __FUNCTION__, $errorMessage); if($connection->isWritable()){ if('json' == $this->acceptType){ @@ -368,7 +369,7 @@ class TcpSocket { } // send "end" data, then close - $connection->end($this->newPayload('error', $e->getMessage())); + $connection->end($this->newPayload('error', $errorMessage, true)); }else{ // close connection $connection->close(); @@ -475,7 +476,9 @@ class TcpSocket { // update maxConnections count $this->maxConnections = max($this->connections->count(), $this->maxConnections); - $this->debug($connection, __FUNCTION__); + $this->log(['debug'], $connection, __FUNCTION__, 'add new connection'); + }else{ + $this->log(['debug'], $connection, __FUNCTION__, 'connection already exists'); } } @@ -485,7 +488,7 @@ class TcpSocket { */ protected function removeConnection(Socket\ConnectionInterface $connection){ if($this->hasConnection($connection)){ - $this->debug($connection, __FUNCTION__); + $this->log(['debug'], $connection, __FUNCTION__, 'remove connection'); $this->cancelTimers($connection); $this->connections->detach($connection); } @@ -495,15 +498,16 @@ class TcpSocket { * get new payload * @param string $task * @param null $load + * @param bool $addStats * @return array */ - protected function newPayload(string $task, $load = null) : array { + protected function newPayload(string $task, $load = null, bool $addStats = false) : array { $payload = [ 'task' => $task, 'load' => $load ]; - if($this->stats){ + if($addStats || $this->addStats){ // add socket statistics $payload['stats'] = $this->getStats(); } @@ -527,34 +531,21 @@ class TcpSocket { */ protected function getStats() : array { return [ - 'startup' => time() - $this->startupTime, - 'connections' => $this->connections->count(), - 'maxConnections' => $this->maxConnections + 'tcpSocket' => $this->getSocketStats(), + 'webSocket' => $this->handler->getSocketStats() ]; } /** - * echo debug messages - * @param Socket\ConnectionInterface $connection - * @param string $method - * @param string $message + * get TcpSocket stats data + * @return array */ - protected function debug(Socket\ConnectionInterface $connection, string $method, string $message = ''){ - if(self::DEFAULT_DEBUG){ - $backtrace = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 3); - $caller = array_shift($backtrace); - $callerOrig = array_shift($backtrace); - - $data = [ - date('Y-m-d H:i:s'), - 'DEBUG', - $connection->getRemoteAddress(), - $caller['file'] . ' line ' . $caller['line'], - $callerOrig['function'] . '()' . (($callerOrig['function'] !== $method) ? ' [' . $method . '()]' : ''), - $message - ]; - - echo implode(array_filter($data), ' | ') . PHP_EOL; - } + protected function getSocketStats() : array { + return [ + 'startup' => time() - $this->startupTime, + 'connections' => $this->connections->count(), + 'maxConnections' => $this->maxConnections, + 'logs' => array_reverse($this->logStore->getStore()) + ]; } } \ No newline at end of file diff --git a/app/WebSockets.php b/app/WebSockets.php index d5b5e0b..acc9730 100644 --- a/app/WebSockets.php +++ b/app/WebSockets.php @@ -9,6 +9,7 @@ namespace Exodus4D\Socket; +use Exodus4D\Socket\Log\Store; use Exodus4D\Socket\Socket\TcpSocket; use React\EventLoop; use React\Socket; @@ -33,16 +34,23 @@ class WebSockets { */ protected $wsListenHost; + /** + * @var int + */ + protected $debug; + /** * WebSockets constructor. * @param string $dsn * @param int $wsListenPort * @param string $wsListenHost + * @param int $debug */ - function __construct(string $dsn, int $wsListenPort, string $wsListenHost){ + function __construct(string $dsn, int $wsListenPort, string $wsListenHost, int $debug = 1){ $this->dsn = $dsn; $this->wsListenPort = $wsListenPort; $this->wsListenHost = $wsListenHost; + $this->debug = $debug; $this->startMapSocket(); } @@ -51,11 +59,22 @@ class WebSockets { // global EventLoop $loop = EventLoop\Factory::create(); - // global MessageComponent (main app) (handles all business logic) - $mapUpdate = new Main\MapUpdate(); + // new Stores for logging ------------------------------------------------------------------------------------- + $webSocketLogStore = new Store(Component\MapUpdate::COMPONENT_NAME); + $webSocketLogStore->setLogLevel($this->debug); + + $tcpSocketLogStore = new Store(TcpSocket::COMPONENT_NAME); + $tcpSocketLogStore->setLogLevel($this->debug); + + // global MessageComponent (main app) (handles all business logic) -------------------------------------------- + $mapUpdate = new Component\MapUpdate($webSocketLogStore); + + $loop->addPeriodicTimer(3, function(EventLoop\TimerInterface $timer) use ($mapUpdate) { + $mapUpdate->housekeeping($timer); + }); // TCP Socket ------------------------------------------------------------------------------------------------- - $tcpSocket = new TcpSocket($loop, $mapUpdate); + $tcpSocket = new TcpSocket($loop, $mapUpdate, $tcpSocketLogStore); // TCP Server (WebServer <-> TCPServer <-> TCPSocket communication) $server = new Socket\Server($this->dsn, $loop, [ 'tcp' => [ @@ -64,12 +83,12 @@ class WebSockets { ] ]); - $server->on('connection', function(Socket\ConnectionInterface $connection) use ($tcpSocket){ + $server->on('connection', function(Socket\ConnectionInterface $connection) use ($tcpSocket) { $tcpSocket->onConnect($connection); }); - $server->on('error', function(\Exception $e){ - echo 'error: ' . $e->getMessage() . PHP_EOL; + $server->on('error', function(\Exception $e) use ($tcpSocket) { + $tcpSocket->log(['debug', 'error'], null, 'onError', $e->getMessage()); }); // WebSocketServer -------------------------------------------------------------------------------------------- diff --git a/cmd.php b/cmd.php index 04a282b..5bf6508 100644 --- a/cmd.php +++ b/cmd.php @@ -1,33 +1,73 @@ default values + // The default values should be fine for 99% of you! + $longOpts = [ + 'wsHost:' => '0.0.0.0', // WebSocket connection (for WebClients => Browser). '0.0.0.0' <-- any client can connect! + 'wsPort:' => 8020, // ↪ default WebSocket URI: 127.0.0.1:8020. This is where Nginx must proxy WebSocket traffic to + 'tcpHost:' => '127.0.0.1', // TcpSocket connection (for WebServer ⇄ WebSocket) + 'tcpPort:' => 5555, // ↪ default TcpSocket URI: tcp://127.0.0.1:5555 + 'debug:' => 2 // Debug level [0-3] 0 = silent, 1 = errors, 2 = error + info, 3 = error + info + debug + ]; + + // get options from CLI parameter + default values + $cliOpts = getopt('', array_keys($longOpts)); + + $options = []; + array_walk($longOpts, function($defaultVal, $optKey) use ($cliOpts, &$options) { + $key = trim($optKey, ':'); + $val = $defaultVal; + if(array_key_exists($key, $cliOpts)){ + $val = is_int($defaultVal) ? (int)$cliOpts[$key] : $cliOpts[$key] ; + } + $options[$key] = $val; + }); /** - * WebSocket connection (for WebClients => Browser) - * default WebSocket URI: ws://127.0.0.1:8020 - * - * pf_client_ip '0.0.0.0' <-- any client can connect - * pf_ws_port 8020 <-- any client can connect + * print current config parameters to Shell + * @param array $longOpts + * @param array $options */ - $wsListenHost = (!empty($options['pf_listen_host'])) ? $options['pf_listen_host'] : '0.0.0.0' ; - $wsListenPort = (!empty($options['pf_listen_port'])) ? (int)$options['pf_listen_port'] : 8020 ; + $showHelp = function(array $longOpts, array $options){ + $optKeys = array_keys($longOpts); + $colors = new Socket\Log\ShellColors(); + $data = []; - $host = (!empty($options['pf_host'])) ? $options['pf_host'] : '127.0.0.1' ; - $port = (!empty($options['pf_port'])) ? (int)$options['pf_port'] : 5555 ; + // headline for CLI config parameters + $rowData = $colors->getColoredString(str_pad(' param', 12), 'white'); + $rowData .= $colors->getColoredString(str_pad('value', 18, ' ', STR_PAD_LEFT), 'white'); + $rowData .= $colors->getColoredString(str_pad('default', 15, ' ', STR_PAD_LEFT), 'white'); - $dsn = 'tcp://' . $host . ':' . $port; + $data[] = $rowData; + $data[] = str_pad(' ', 45, '-'); + + $i = 0; + foreach($options as $optKey => $optVal){ + $rowData = $colors->getColoredString(str_pad(' -' . $optKey, 12), 'yellow'); + $rowData .= $colors->getColoredString(str_pad($optVal, 18, ' ', STR_PAD_LEFT), 'light_purple'); + $rowData .= $colors->getColoredString(str_pad($longOpts[$optKeys[$i]], 15, ' ', STR_PAD_LEFT), 'dark_gray'); + $data[] = $rowData; + $i++; + } + $data[] = ''; + + echo implode($data, PHP_EOL) . PHP_EOL; + }; + + if($options['debug']){ + // print if -debug > 0 + $showHelp($longOpts, $options); + } + + $dsn = 'tcp://' . $options['tcpHost'] . ':' . $options['tcpPort']; + + new Socket\WebSockets($dsn, $options['wsPort'], $options['wsHost'], $options['debug']); - new Socket\WebSockets($dsn, $wsListenPort, $wsListenHost); }else{ echo "Script need to be called by CLI!"; }