- improved logging

- fixed multiple problem with map sync
- added some "garbage collection" for unsubscribed connection data
- updated README.md
This commit is contained in:
Mark Friedrich
2019-07-01 19:59:52 +02:00
parent a25cf73daa
commit cbca53ad73
12 changed files with 1285 additions and 283 deletions

View File

@@ -1,6 +1,7 @@
## WebSocket server for [Pathfinder](https://github.com/exodus4d/pathfinder)
### Requirements
- _PHP_ **(≥ v7.1)**
- A working instance of *[Pathfinder](https://github.com/exodus4d/pathfinder)* **(≥ v1.2.0)**
- [_Composer_](https://getcomposer.org/download/) to install packages for the WebSocket server
@@ -19,20 +20,49 @@
**Clients (WebBrowser) listen for connections**
- Host: `0.0.0.0.` (=> any client can connect)
- Port: `8020`
- URI: `127.0.0.1:8020` (Your WebServer (e.g. Nginx) should pass all WebSocket connections to this source)
- URI: `127.0.0.1:8020`
**TCP TcpSocket connection (Internal use for WebServer ⇄ WebSocket communication)**
- Host: `127.0.0.1` (=> Assumed WebServer and WebSocket Server running on the same machine)
(=> Your WebServer (e.g. Nginx) should proxy all WebSocket connections to this source)
**TCP TcpSocket connection (Internal use for WebServer ⇄ WebSocket server communication)**
- Host: `127.0.0.1` (=> Assumed WebServer and WebSocket server running on the same machine)
- Port: `5555`
- URI: `tcp://127.0.0.1:5555`
- URI: `tcp://127.0.0.1:5555`
(=> Where _Pathfinder_ reaches the WebSocket server. This must match `SOCKET_HOST`, `SOCKET_PORT` options in `environment.ini`)
#### Custom [Optional]
#### Start parameters [Optional]
The default configuration should be fine for most installations.
You can change/overwrite the default **Host** and **Port** configuration by adding additional CLI parameters when starting the WebSocket server:
`$ php cmd.php --pf_listen_host [CLIENTS_HOST] --pf_listen_port [CLIENTS_PORT] --pf_host [TCP_HOST] --pf_port [TCP_PORT]`
`$ php cmd.php --wsHost [CLIENTS_HOST] --wsPort [CLIENTS_PORT] --tcpHost [TCP_HOST] --tcpPort [TCP_PORT] --debug 0`
For example: If you want to change the the WebSocket port and increase debug output:
`$ php cmd.php --wsPort 8030 --debug 3`
##### --debug (default `--debug 2`)
Allows you to set log output level from `0` (silent) - errors are not logged, to `3` (debug) for detailed logging.
![alt text](https://i.imgur.com/KfNF4lk.png)
### WebSocket UI
There is a WebSocket section on _Pathinders_ `/setup` page. After the WebSocket server is started, you should check it if everything works.
You see the most recent WebSocket log entries, the current connection state, the current number of active connections and all maps that have subscriptions
![alt text](https://i.imgur.com/dDUrnx2.png)
Log entry view. Depending on the `--debug` parameter, the most recent (max 50) entries will be shown:
![alt text](https://i.imgur.com/LIn9aNm.png)
Subscriptions for each map:
![alt text](https://i.imgur.com/fANYwho.gif)
### Unix Service (systemd)
#### New Service
@@ -97,4 +127,5 @@ ExecStart = /usr/bin/systemctl try-restart websocket.pathfinder.service
```
### Info
- [*Ratchet*](http://socketo.me/) - "WebSockets for PHP"
- [*Ratchet*](http://socketo.me) - "WebSockets for PHP"
- [*ReactPHP*](https://reactphp.org) - "Event-driven, non-blocking I/O with PHP"

View File

@@ -0,0 +1,269 @@
<?php
namespace Exodus4D\Socket\Component;
use Exodus4D\Socket\Data\Payload;
use Exodus4D\Socket\Log\Store;
use Ratchet\ConnectionInterface;
use Ratchet\MessageComponentInterface;
use React\EventLoop\TimerInterface;
abstract class AbstractMessageComponent implements MessageComponentInterface {
/**
* unique name for this component
* -> should be overwritten in child instances
* -> is used as "log store" name
*/
const COMPONENT_NAME = 'default';
/**
* log message server start
*/
const LOG_TEXT_SERVER_START = 'start WebSocket server…';
/**
* store for logs
* @var Store
*/
protected $logStore;
/**
* stores all active connections
* -> regardless of its subscription state
* [
* '$conn1->resourceId' => [
* 'connection' => $conn1,
* 'data' => null
* ],
* '$conn2->resourceId' => [
* 'connection' => $conn2,
* 'data' => null
* ]
* ]
* @var array
*/
private $connections;
/**
* max count of concurrent open connections
* @var int
*/
private $maxConnections = 0;
/**
* AbstractMessageComponent constructor.
* @param Store $store
*/
public function __construct(Store $store){
$this->connections = [];
$this->logStore = $store;
$this->log(['debug', 'info'], null, 'START', static::LOG_TEXT_SERVER_START);
}
// Connection callbacks from MessageComponentInterface ============================================================
/**
* new client connection onOpen
* @param ConnectionInterface $conn
*/
public function onOpen(ConnectionInterface $conn){
$this->log(['debug'], $conn, __FUNCTION__, 'open connection');
$this->addConnection($conn);
}
/**
* client connection onClose
* @param ConnectionInterface $conn
*/
public function onClose(ConnectionInterface $conn){
$this->log(['debug'], $conn, __FUNCTION__, 'close connection');
$this->removeConnection($conn);
}
/**
* client connection onError
* @param ConnectionInterface $conn
* @param \Exception $e
*/
public function onError(ConnectionInterface $conn, \Exception $e){
$this->log(['debug', 'error'], $conn, __FUNCTION__, $e->getMessage());
}
/**
* new message received from client connection
* @param ConnectionInterface $conn
* @param string $msg
*/
public function onMessage(ConnectionInterface $conn, $msg){
// parse message into payload object
$payload = $this->getPayloadFromMessage($msg);
if($payload){
$this->dispatchWebSocketPayload($conn, $payload);
}
}
// Connection handling ============================================================================================
/**
* add connection
* @param ConnectionInterface $conn
*/
private function addConnection(ConnectionInterface $conn) : void {
$this->connections[$conn->resourceId] = [
'connection' => $conn,
];
$this->maxConnections = max(count($this->connections), $this->maxConnections);
}
/**
* remove connection
* @param ConnectionInterface $conn
*/
private function removeConnection(ConnectionInterface $conn) : void {
if($this->hasConnection($conn)){
unset($this->connections[$conn->resourceId]);
}
}
/**
* @param ConnectionInterface $conn
* @return bool
*/
protected function hasConnection(ConnectionInterface $conn) : bool {
return isset($this->connections[$conn->resourceId]);
}
/**
* @param int $resourceId
* @return bool
*/
protected function hasConnectionId(int $resourceId) : bool {
return isset($this->connections[$resourceId]);
}
/**
* @param int $resourceId
* @return ConnectionInterface|null
*/
protected function getConnection(int $resourceId) : ?ConnectionInterface {
return $this->hasConnectionId($resourceId) ? $this->connections[$resourceId]['connection'] : null;
}
/**
* update meta data for $conn
* @param ConnectionInterface $conn
*/
protected function updateConnection(ConnectionInterface $conn){
if($this->hasConnection($conn)){
$meta = [
'mTimeSend' => microtime(true)
];
$this->connections[$conn->resourceId]['data'] = array_merge($this->getConnectionData($conn), $meta);
}
}
/**
* get meta data from $conn
* @param ConnectionInterface $conn
* @return array
*/
protected function getConnectionData(ConnectionInterface $conn) : array {
$meta = [];
if($this->hasConnection($conn)){
$meta = (array)$this->connections[$conn->resourceId]['data'];
}
return $meta;
}
/**
* wrapper for ConnectionInterface->send()
* -> this stores some meta data to the $conn
* @param ConnectionInterface $conn
* @param $data
*/
protected function send(ConnectionInterface $conn, $data){
$conn->send($data);
$this->updateConnection($conn);
}
/**
* @param ConnectionInterface $conn
* @param Payload $payload
*/
abstract protected function dispatchWebSocketPayload(ConnectionInterface $conn, Payload $payload) : void;
/**
* get Payload class from client message
* @param mixed $msg
* @return Payload|null
*/
protected function getPayloadFromMessage($msg) : ?Payload {
$payload = null;
$msg = (array)json_decode($msg, true);
if(isset($msg['task'], $msg['load'])){
$payload = $this->newPayload((string)$msg['task'], $msg['load']);
}
return $payload;
}
/**
* @param string $task
* @param null $load
* @param array|null $characterIds
* @return Payload|null
*/
protected function newPayload(string $task, $load = null, ?array $characterIds = null) : ?Payload {
$payload = null;
try{
$payload = new Payload($task, $load, $characterIds);
}catch(\Exception $e){
$this->log(['debug', 'error'], null, __FUNCTION__, $e->getMessage());
}
return $payload;
}
/**
* get WebSocket stats data
* @return array
*/
public function getSocketStats() : array {
return [
'connections' => count($this->connections),
'maxConnections' => $this->maxConnections,
'logs' => array_reverse($this->logStore->getStore())
];
}
/**
* @param $logTypes
* @param ConnectionInterface|null $connection
* @param string $action
* @param string $message
*/
protected function log($logTypes, ?ConnectionInterface $connection, string $action, string $message = '') : void {
if($this->logStore){
$remoteAddress = $connection ? $connection->remoteAddress : null;
$resourceId = $connection ? $connection->resourceId : null;
$this->logStore->log($logTypes, $remoteAddress, $resourceId, $action, $message);
}
}
/**
*
* @param TimerInterface $timer
*/
public function housekeeping(TimerInterface $timer) : void {
}
}

View File

@@ -6,7 +6,7 @@
* Time: 13:09
*/
namespace Exodus4D\Socket\Main\Formatter;
namespace Exodus4D\Socket\Component\Formatter;
class SubscriptionFormatter{

View File

@@ -6,7 +6,7 @@
* Time: 17:02
*/
namespace Exodus4D\Socket\Main\Handler;
namespace Exodus4D\Socket\Component\Handler;
class LogFileHandler {

View File

@@ -6,19 +6,67 @@
* Time: 22:29
*/
namespace Exodus4D\Socket\Main;
namespace Exodus4D\Socket\Component;
use Exodus4D\Socket\Main\Handler\LogFileHandler;
use Exodus4D\Socket\Main\Formatter\SubscriptionFormatter;
use Ratchet\MessageComponentInterface;
use Exodus4D\Socket\Component\Handler\LogFileHandler;
use Exodus4D\Socket\Component\Formatter\SubscriptionFormatter;
use Exodus4D\Socket\Data\Payload;
use Exodus4D\Socket\Log\Store;
use Ratchet\ConnectionInterface;
class MapUpdate implements MessageComponentInterface {
class MapUpdate extends AbstractMessageComponent {
/**
* unique name for this component
* -> should be overwritten in child instances
* -> is used as "log store" name
*/
const COMPONENT_NAME = 'webSock';
/**
* log message unknown task name
*/
const LOG_TEXT_TASK_UNKNOWN = 'unknown task: %s';
/**
* log message for denied subscription attempt. -> character data unknown
*/
const LOG_TEXT_SUBSCRIBE_DENY = 'sub. denied for charId: %d';
/**
* log message for invalid subscription data
*/
const LOG_TEXT_SUBSCRIBE_INVALID = 'sub. data invalid';
/**
* log message for subscribe characterId
*/
const LOG_TEXT_SUBSCRIBE = 'sub. charId: %s to mapIds: [%s]';
/**
* log message unsubscribe characterId
*/
const LOG_TEXT_UNSUBSCRIBE = 'unsub. charId: %d from mapIds: [%s]';
/**
* log message for map data updated broadcast
*/
const LOG_TEXT_MAP_UPDATE = 'update map data, mapId: %d → broadcast to %d connections';
/**
* log message for map subscriptions data updated broadcast
*/
const LOG_TEXT_MAP_SUBSCRIPTIONS = 'update map subscriptions data, mapId: %d. → broadcast to %d connections';
/**
* log message for delete mapId broadcast
*/
const LOG_TEXT_MAP_DELETE = 'delete mapId: $d → broadcast to %d connections';
/**
* timestamp (ms) from last healthCheck ping
* -> timestamp received from remote TCP socket
* @var
* @var int|null
*/
protected $healthCheckToken;
@@ -48,7 +96,7 @@ class MapUpdate implements MessageComponentInterface {
* [
* 'token' => $characterToken3,
* 'expire' => $expireTime3,
* 'characterData' => $characterData2
* 'characterData' => $characterData2
* ]
* ]
* ]
@@ -108,86 +156,127 @@ class MapUpdate implements MessageComponentInterface {
protected $characterData;
/**
* enable debug output
* -> check debug() for more information
* @var bool
* MapUpdate constructor.
* @param Store $store
*/
protected $debug = false;
public function __construct(Store $store){
parent::__construct($store);
public function __construct() {
$this->characterAccessData = [];
$this->mapAccessData = [];
$this->characters = [];
$this->subscriptions = [];
$this->characterData = [];
$this->log('Server START ------------------------------------------');
$this->characterAccessData = [];
$this->mapAccessData = [];
$this->characters = [];
$this->subscriptions = [];
$this->characterData = [];
}
public function onOpen(ConnectionInterface $conn) {
$this->log('NEW connection. ID (' . $conn->resourceId .') ');
}
public function onMessage(ConnectionInterface $conn, $msg) {
$msg = json_decode($msg, true);
if(
isset($msg['task']) &&
isset($msg['load'])
){
$task = $msg['task'];
$load = $msg['load'];
switch($task){
case 'subscribe':
$this->subscribe($conn, $load);
break;
case 'healthCheck':
$this->validateHealthCheck($conn, $load);
break;
default:
break;
}
}
/**
* new client connection
* @param ConnectionInterface $conn
*/
public function onOpen(ConnectionInterface $conn){
parent::onOpen($conn);
}
/**
* @param ConnectionInterface $conn
*/
public function onClose(ConnectionInterface $conn) {
$this->unSubscribeConnection($conn);
public function onClose(ConnectionInterface $conn){
parent::onClose($conn);
$this->log('DISCONNECTED connection. ID (' . $conn->resourceId .') ');
$this->unSubscribeConnection($conn);
}
/**
* @param ConnectionInterface $conn
* @param \Exception $e
*/
public function onError(ConnectionInterface $conn, \Exception $e) {
$this->unSubscribeConnection($conn);
$this->log('ERROR "' . $e->getMessage() . '" ID (' . $conn->resourceId .') ');
public function onError(ConnectionInterface $conn, \Exception $e){
parent::onError($conn, $e);
// close connection should trigger the onClose() callback for unSubscribe
$conn->close();
}
/**
* Check token (timestamp from initial TCP healthCheck poke against token send from client
* @param ConnectionInterface $conn
* @param $token
* @param string $msg
*/
private function validateHealthCheck($conn, $token){
$isValid = 0;
public function onMessage(ConnectionInterface $conn, $msg){
parent::onMessage($conn, $msg);
}
if(
$token && $this->healthCheckToken &&
(int)$token === (int)$this->healthCheckToken
){
$isValid = 1;
/**
* @param ConnectionInterface $conn
* @param Payload $payload
*/
protected function dispatchWebSocketPayload(ConnectionInterface $conn, Payload $payload) : void {
switch($payload->task){
case 'healthCheck':
$this->broadcastHealthCheck($conn, $payload);
break;
case 'subscribe':
$this->subscribe($conn, (array)$payload->load);
break;
case 'unsubscribe':
// make sure characterIds got from client are valid
// -> intersect with subscribed characterIds for current $conn
$characterIds = array_intersect((array)$payload->load, $this->getCharacterIdsByConnection($conn));
if(!empty($characterIds)){
$this->unSubscribeCharacterIds($characterIds, $conn);
}
break;
default:
$this->log(['debug', 'error'], $conn, __FUNCTION__, sprintf(static::LOG_TEXT_TASK_UNKNOWN, $payload->task));
break;
}
}
/**
* checks healthCheck $token and respond with validation status + subscription stats
* @param ConnectionInterface $conn
* @param Payload $payload
*/
private function broadcastHealthCheck(ConnectionInterface $conn, Payload $payload) : void {
$isValid = $this->validateHealthCheckToken((int)$payload->load);
$load = [
'isValid' => $isValid,
];
// Make sure WebSocket client request is valid
if($isValid){
// set new healthCheckToken for next check
$load['token'] = $this->setHealthCheckToken(microtime(true));
// add subscription stats if $token is valid
$load['subStats'] = $this->getSubscriptionStats();
}
$payload->setLoad($load);
$connections = new \SplObjectStorage();
$connections->attach($conn);
$this->broadcast($connections, $payload);
}
/**
* compare token (timestamp from initial TCP healthCheck message) with token send from WebSocket
* @param int $token
* @return bool
*/
private function validateHealthCheckToken(int $token) : bool {
$isValid = false;
if($token && $this->healthCheckToken && $token === (int)$this->healthCheckToken){
$isValid = true;
}
$conn->send( json_encode($isValid) );
// reset token
$this->healthCheckToken = null;
return $isValid;
}
/**
@@ -195,9 +284,9 @@ class MapUpdate implements MessageComponentInterface {
* @param ConnectionInterface $conn
* @param $subscribeData
*/
private function subscribe(ConnectionInterface $conn, $subscribeData){
private function subscribe(ConnectionInterface $conn, array $subscribeData) : void {
$characterId = (int)$subscribeData['id'];
$characterToken = $subscribeData['token'];
$characterToken = (string)$subscribeData['token'];
if($characterId && $characterToken){
// check if character access token is valid (exists and not expired in $this->characterAccessData)
@@ -213,21 +302,33 @@ class MapUpdate implements MessageComponentInterface {
$changedSubscriptionsMapIds = [];
foreach((array)$subscribeData['mapData'] as $data){
$mapId = (int)$data['id'];
$mapToken = $data['token'];
$mapToken = (string)$data['token'];
$mapName = (string)$data['name'];
if($mapId && $mapToken){
// check if token is valid (exists and not expired) in $this->mapAccessData
if( $this->checkMapAccess($characterId, $mapId, $mapToken) ){
if($this->checkMapAccess($characterId, $mapId, $mapToken)){
// valid map subscribe request
$this->subscriptions[$mapId][$characterId] = $characterId;
$this->subscriptions[$mapId]['characterIds'][$characterId] = $characterId;
$this->subscriptions[$mapId]['data']['name'] = $mapName;
$changedSubscriptionsMapIds[] = $mapId;
}
}
}
sort($changedSubscriptionsMapIds, SORT_NUMERIC);
$this->log(['debug', 'info'], $conn, __FUNCTION__,
sprintf(static::LOG_TEXT_SUBSCRIBE, $characterId, implode($changedSubscriptionsMapIds, ','))
);
// broadcast all active subscriptions to subscribed connections -------------------------------------------
$this->broadcastMapSubscriptions('mapSubscriptions', $changedSubscriptionsMapIds);
$this->broadcastMapSubscriptions($changedSubscriptionsMapIds);
}else{
$this->log(['debug', 'info'], $conn, __FUNCTION__, sprintf(static::LOG_TEXT_SUBSCRIBE_DENY, $characterId));
}
}else{
$this->log(['debug', 'error'], $conn, __FUNCTION__, static::LOG_TEXT_SUBSCRIBE_INVALID);
}
}
@@ -244,10 +345,10 @@ class MapUpdate implements MessageComponentInterface {
* unSubscribe a $characterId from ALL maps
* -> if $conn is set -> just unSub the $characterId from this $conn
* @param int $characterId
* @param null $conn
* @param ConnectionInterface|null $conn
* @return bool
*/
private function unSubscribeCharacterId($characterId, $conn = null){
private function unSubscribeCharacterId(int $characterId, ?ConnectionInterface $conn = null) : bool {
if($characterId){
// unSub from $this->characters ---------------------------------------------------------------------------
if($conn){
@@ -258,7 +359,7 @@ class MapUpdate implements MessageComponentInterface {
// no connection left for this character
unset($this->characters[$characterId]);
}
// TODO unset $this->>$characterData if $characterid does not have any other map subscribed to
// TODO unset $this->>$characterData if $characterId does not have any other map subscribed to
}else{
// unSub ALL connections from a character (e.g. multiple browsers)
unset($this->characters[$characterId]);
@@ -269,11 +370,11 @@ class MapUpdate implements MessageComponentInterface {
// unSub from $this->subscriptions ------------------------------------------------------------------------
$changedSubscriptionsMapIds = [];
foreach($this->subscriptions as $mapId => $characterIds){
if(array_key_exists($characterId, $characterIds)){
unset($this->subscriptions[$mapId][$characterId]);
foreach($this->subscriptions as $mapId => $subData){
if(array_key_exists($characterId, (array)$subData['characterIds'])){
unset($this->subscriptions[$mapId]['characterIds'][$characterId]);
if( !count($this->subscriptions[$mapId]) ){
if( !count($this->subscriptions[$mapId]['characterIds']) ){
// no characters left on this map
unset($this->subscriptions[$mapId]);
}
@@ -282,8 +383,14 @@ class MapUpdate implements MessageComponentInterface {
}
}
sort($changedSubscriptionsMapIds, SORT_NUMERIC);
$this->log(['debug', 'info'], $conn, __FUNCTION__,
sprintf(static::LOG_TEXT_UNSUBSCRIBE, $characterId, implode($changedSubscriptionsMapIds, ','))
);
// broadcast all active subscriptions to subscribed connections -------------------------------------------
$this->broadcastMapSubscriptions('mapSubscriptions', $changedSubscriptionsMapIds);
$this->broadcastMapSubscriptions($changedSubscriptionsMapIds);
}
return true;
@@ -292,11 +399,11 @@ class MapUpdate implements MessageComponentInterface {
/**
* unSubscribe $characterIds from ALL maps
* -> if $conn is set -> just unSub the $characterId from this $conn
* @param array $characterIds
* @param null $conn
* @param int[] $characterIds
* @param ConnectionInterface|null $conn
* @return bool
*/
private function unSubscribeCharacterIds(array $characterIds, $conn = null): bool{
private function unSubscribeCharacterIds(array $characterIds, ?ConnectionInterface $conn = null) : bool {
$response = false;
foreach($characterIds as $characterId){
$response = $this->unSubscribeCharacterId($characterId, $conn);
@@ -310,26 +417,30 @@ class MapUpdate implements MessageComponentInterface {
* @param int $mapId
* @return int
*/
private function deleteMapId(string $task, $mapId){
$connectionCount = $this->broadcastMapData($task, $mapId, $mapId);
private function deleteMapId(string $task, int $mapId) : int {
$connectionCount = $this->broadcastMapData($task, $mapId, $mapId);
// remove map from subscriptions
if( isset($this->subscriptions[$mapId]) ){
if(isset($this->subscriptions[$mapId])){
unset($this->subscriptions[$mapId]);
}
$this->log(['debug', 'info'], null, __FUNCTION__,
sprintf(static::LOG_TEXT_MAP_DELETE, $mapId, $connectionCount)
);
return $connectionCount;
}
/**
* get all mapIds a characterId has subscribed to
* @param int $characterId
* @return array
* @return int[]
*/
private function getMapIdsByCharacterId(int $characterId) : array {
$mapIds = [];
foreach($this->subscriptions as $mapId => $characterIds){
if(array_key_exists($characterId, $characterIds)){
foreach($this->subscriptions as $mapId => $subData) {
if(array_key_exists($characterId, (array)$subData['characterIds'])){
$mapIds[] = $mapId;
}
}
@@ -340,7 +451,7 @@ class MapUpdate implements MessageComponentInterface {
* @param ConnectionInterface $conn
* @return int[]
*/
private function getCharacterIdsByConnection(ConnectionInterface $conn){
private function getCharacterIdsByConnection(ConnectionInterface $conn) : array {
$characterIds = [];
$resourceId = $conn->resourceId;
@@ -363,31 +474,43 @@ class MapUpdate implements MessageComponentInterface {
$characterIds = [];
if(
array_key_exists($mapId, $this->subscriptions) &&
is_array($this->subscriptions[$mapId])
is_array($this->subscriptions[$mapId]['characterIds'])
){
$characterIds = array_keys($this->subscriptions[$mapId]);
$characterIds = array_keys($this->subscriptions[$mapId]['characterIds']);
}
return $characterIds;
}
/**
* get connection objects by characterIds
* get connections by $characterIds
* @param int[] $characterIds
* @return \SplObjectStorage
*/
private function getConnectionsByCharacterIds($characterIds){
private function getConnectionsByCharacterIds(array $characterIds) : \SplObjectStorage {
$connections = new \SplObjectStorage;
foreach($characterIds as $characterId){
if($charConnections = (array)$this->characters[$characterId] ){
foreach($charConnections as $conn){
if( !$connections->contains($conn) ){
$connections->attach($conn);
}
$connections->addAll($this->getConnectionsByCharacterId($characterId));
}
return $connections;
}
/**
* get connections by $characterId
* @param int $characterId
* @return \SplObjectStorage
*/
private function getConnectionsByCharacterId(int $characterId) : \SplObjectStorage {
$connections = new \SplObjectStorage;
if(isset($this->characters[$characterId])){
foreach(array_keys($this->characters[$characterId]) as $resourceId){
if(
$this->hasConnectionId($resourceId) &&
!$connections->contains($conn = $this->getConnection($resourceId))
){
$connections->attach($conn);
}
}
}
return $connections;
}
@@ -397,12 +520,13 @@ class MapUpdate implements MessageComponentInterface {
* @param $characterToken
* @return array
*/
private function checkCharacterAccess($characterId, $characterToken) : array {
private function checkCharacterAccess(int $characterId, string $characterToken) : array {
$characterData = [];
if( !empty($characterAccessData = (array)$this->characterAccessData[$characterId]) ){
// check expire for $this->characterAccessData -> check ALL characters and remove expired
foreach($characterAccessData as $i => $data){
$deleteToken = false;
// check expire for $this->characterAccessData -> check ALL characters and remove expired
if( ((int)$data['expire'] - time()) > 0 ){
// still valid -> check token
if($characterToken === $data['token']){
@@ -425,6 +549,7 @@ class MapUpdate implements MessageComponentInterface {
}
}
}
return $characterData;
}
@@ -435,7 +560,7 @@ class MapUpdate implements MessageComponentInterface {
* @param $mapToken
* @return bool
*/
private function checkMapAccess($characterId, $mapId, $mapToken){
private function checkMapAccess(int $characterId, int $mapId, string $mapToken) : bool {
$access = false;
if( !empty($mapAccessData = (array)$this->mapAccessData[$mapId][$characterId]) ){
foreach($mapAccessData as $i => $data){
@@ -469,21 +594,14 @@ class MapUpdate implements MessageComponentInterface {
}
/**
* broadcast data ($load) to $connections
* @param ConnectionInterface[] | \SplObjectStorage $connections
* @param $task
* @param $load
* @param int[] $characterIds optional, recipients (e.g if multiple browser tabs are open)
* broadcast $payload to $connections
* @param \SplObjectStorage $connections
* @param Payload $payload
*/
private function broadcastData($connections, string $task, $load, array $characterIds = []){
$response = [
'task' => $task,
'characterIds' => $characterIds,
'load' => $load
];
private function broadcast(\SplObjectStorage $connections, Payload $payload) : void {
$data = json_encode($payload);
foreach($connections as $conn){
$conn->send( json_encode($response) );
$this->send($conn, $data);
}
}
@@ -493,7 +611,7 @@ class MapUpdate implements MessageComponentInterface {
* receive data from TCP socket (main App)
* -> send response back
* @param string $task
* @param null $load
* @param null|int|array $load
* @return bool|float|int|null
*/
public function receiveData(string $task, $load = null){
@@ -501,16 +619,15 @@ class MapUpdate implements MessageComponentInterface {
switch($task){
case 'healthCheck':
$this->healthCheckToken = (float)$load;
$responseLoad = $this->healthCheckToken;
$responseLoad = $this->setHealthCheckToken((float)$load);
break;
case 'characterUpdate':
$this->updateCharacterData($load);
$this->updateCharacterData((array)$load);
$mapIds = $this->getMapIdsByCharacterId((int)$load['id']);
$this->broadcastMapSubscriptions('mapSubscriptions', $mapIds);
$this->broadcastMapSubscriptions($mapIds);
break;
case 'characterLogout':
$responseLoad = $this->unSubscribeCharacterIds($load);
$responseLoad = $this->unSubscribeCharacterIds((array)$load);
break;
case 'mapConnectionAccess':
$responseLoad = $this->setConnectionAccess($load);
@@ -519,10 +636,10 @@ class MapUpdate implements MessageComponentInterface {
$responseLoad = $this->setAccess($task, $load);
break;
case 'mapUpdate':
$responseLoad = $this->broadcastMapUpdate($task, $load);
$responseLoad = $this->broadcastMapUpdate($task, (array)$load);
break;
case 'mapDeleted':
$responseLoad = $this->deleteMapId($task, $load);
$responseLoad = $this->deleteMapId($task, (int)$load);
break;
case 'logData':
$this->handleLogData((array)$load['meta'], (array)$load['log']);
@@ -532,39 +649,63 @@ class MapUpdate implements MessageComponentInterface {
return $responseLoad;
}
private function setCharacterData(array $characterData){
$characterId = (int)$characterData['id'];
if($characterId){
/**
* @param float $token
* @return float
*/
private function setHealthCheckToken(float $token) : float {
$this->healthCheckToken = $token;
return $this->healthCheckToken;
}
/**
* @param array $characterData
*/
private function setCharacterData(array $characterData) : void {
if($characterId = (int)$characterData['id']){
$this->characterData[$characterId] = $characterData;
}
}
/**
* @param int $characterId
* @return array
*/
private function getCharacterData(int $characterId) : array {
return empty($this->characterData[$characterId]) ? [] : $this->characterData[$characterId];
}
/**
* @param array $characterIds
* @return array
*/
private function getCharactersData(array $characterIds) : array {
return array_filter($this->characterData, function($characterId) use($characterIds) {
return in_array($characterId, $characterIds);
}, ARRAY_FILTER_USE_KEY);
}
private function updateCharacterData(array $characterData){
/**
* @param array $characterData
*/
private function updateCharacterData(array $characterData) : void {
$characterId = (int)$characterData['id'];
if($this->getCharacterData($characterId)){
$this->setCharacterData($characterData);
}
}
private function deleteCharacterData(int $characterId){
/**
* @param int $characterId
*/
private function deleteCharacterData(int $characterId) : void {
unset($this->characterData[$characterId]);
}
/**
* @param string $task
* @param array $mapIds
*/
private function broadcastMapSubscriptions(string $task, array $mapIds){
private function broadcastMapSubscriptions(array $mapIds) : void {
$mapIds = array_unique($mapIds);
foreach($mapIds as $mapId){
@@ -578,7 +719,11 @@ class MapUpdate implements MessageComponentInterface {
$mapUserData->config = (object)['id' => $mapId];
$mapUserData->data = (object)['systems' => $systems];
$this->broadcastMapData($task, $mapId, $mapUserData);
$connectionCount = $this->broadcastMapData('mapSubscriptions', $mapId, $mapUserData);
$this->log(['debug'], null, __FUNCTION__,
sprintf(static::LOG_TEXT_MAP_SUBSCRIPTIONS, $mapId, $connectionCount)
);
}
}
}
@@ -588,10 +733,15 @@ class MapUpdate implements MessageComponentInterface {
* @param array $mapData
* @return int
*/
private function broadcastMapUpdate(string $task, $mapData){
private function broadcastMapUpdate(string $task, array $mapData) : int {
$mapId = (int)$mapData['config']['id'];
$connectionCount = $this->broadcastMapData($task, $mapId, $mapData);
return $this->broadcastMapData($task, $mapId, $mapData);
$this->log(['debug'], null, __FUNCTION__,
sprintf(static::LOG_TEXT_MAP_UPDATE, $mapId, $connectionCount)
);
return $connectionCount;
}
/**
@@ -605,7 +755,8 @@ class MapUpdate implements MessageComponentInterface {
$characterIds = $this->getCharacterIdsByMapId($mapId);
$connections = $this->getConnectionsByCharacterIds($characterIds);
$this->broadcastData($connections, $task, $load, $characterIds);
$this->broadcast($connections, $this->newPayload($task, $load, $characterIds));
return count($connections);
}
@@ -619,6 +770,7 @@ class MapUpdate implements MessageComponentInterface {
$newMapCharacterIds = [];
if($mapId = (int)$accessData['id']){
$mapName = (string)$accessData['name'];
$characterIds = (array)$accessData['characterIds'];
// check all charactersIds that have map access... --------------------------------------------------------
foreach($characterIds as $characterId){
@@ -632,21 +784,23 @@ class MapUpdate implements MessageComponentInterface {
}
}
$currentMapCharacterIds = (array)$this->subscriptions[$mapId];
$currentMapCharacterIds = (array)$this->subscriptions[$mapId]['characterIds'];
// broadcast "map delete" to no longer valid characters ---------------------------------------------------
$removedMapCharacterIds = array_keys(array_diff_key($currentMapCharacterIds, $newMapCharacterIds));
$removedMapCharacterConnections = $this->getConnectionsByCharacterIds($removedMapCharacterIds);
$this->broadcastData($removedMapCharacterConnections, $task, $mapId, $removedMapCharacterIds);
$this->broadcast($removedMapCharacterConnections, $this->newPayload($task, $mapId, $removedMapCharacterIds));
// update map subscriptions -------------------------------------------------------------------------------
if( !empty($newMapCharacterIds) ){
// set new characters that have map access (overwrites existing subscriptions for that map)
$this->subscriptions[$mapId] = $newMapCharacterIds;
$this->subscriptions[$mapId]['characterIds'] = $newMapCharacterIds;
$this->subscriptions[$mapId]['data']['name'] = $mapName;
// check if subscriptions have changed
if( !$this->arraysEqualKeys($currentMapCharacterIds, $newMapCharacterIds) ){
$this->broadcastMapSubscriptions('mapSubscriptions', [$mapId]);
$this->broadcastMapSubscriptions([$mapId]);
}
}else{
// no characters (left) on this map
@@ -661,7 +815,7 @@ class MapUpdate implements MessageComponentInterface {
* @param $connectionAccessData
* @return bool
*/
private function setConnectionAccess($connectionAccessData) {
private function setConnectionAccess($connectionAccessData){
$response = false;
$characterId = (int)$connectionAccessData['id'];
$characterData = $connectionAccessData['characterData'];
@@ -697,6 +851,75 @@ class MapUpdate implements MessageComponentInterface {
return $response;
}
/**
* get stats data
* -> lists all channels, subscribed characters + connection info
* @return array
*/
protected function getSubscriptionStats() : array {
$uniqueConnections = [];
$uniqueSubscriptions = [];
$channelsStats = [];
foreach($this->subscriptions as $mapId => $subData){
$characterIds = $this->getCharacterIdsByMapId($mapId);
$uniqueMapConnections = [];
$channelStats = [
'channelId' => $mapId,
'channelName' => $subData['data']['name'],
'countSub' => count($characterIds),
'countCon' => 0,
'subscriptions' => []
];
foreach($characterIds as $characterId){
$characterData = $this->getCharacterData($characterId);
$connections = $this->getConnectionsByCharacterId($characterId);
$characterStats = [
'characterId' => $characterId,
'characterName' => isset($characterData['name']) ? $characterData['name'] : null,
'countCon' => $connections->count(),
'connections' => []
];
foreach($connections as $connection){
if(!in_array($connection->resourceId, $uniqueMapConnections)){
$uniqueMapConnections[] = $connection->resourceId;
}
$metaData = $this->getConnectionData($connection);
$microTime = (float)$metaData['mTimeSend'];
$logTime = Store::getDateTimeFromMicrotime($microTime);
$characterStats['connections'][] = [
'resourceId' => $connection->resourceId,
'remoteAddress' => $connection->remoteAddress,
'mTimeSend' => $microTime,
'mTimeSendFormat1' => $logTime->format('Y-m-d H:i:s.u'),
'mTimeSendFormat2' => $logTime->format('H:i:s')
];
}
$channelStats['subscriptions'][] = $characterStats;
}
$uniqueConnections = array_unique(array_merge($uniqueConnections, $uniqueMapConnections));
$uniqueSubscriptions = array_unique(array_merge($uniqueSubscriptions, $characterIds));
$channelStats['countCon'] = count($uniqueMapConnections);
$channelsStats[] = $channelStats;
}
return [
'countSub' => count($uniqueSubscriptions),
'countCon' => count($uniqueConnections),
'channels' => $channelsStats
];
}
/**
* compare two assoc arrays by keys. Key order is ignored
* -> if all keys from array1 exist in array2 && all keys from array2 exist in array 1, arrays are supposed to be equal
@@ -717,46 +940,4 @@ class MapUpdate implements MessageComponentInterface {
$logHandler = new LogFileHandler((string)$meta['stream']);
$logHandler->write($log);
}
// logging ========================================================================================================
/**
* outputs a custom log text
* -> The output can be written to a *.log file if running the webSocket server (cmd.php) as a service
* @param $text
*/
protected function log($text){
$text = date('Y-m-d H:i:s') . ' ' . $text;
echo $text . "\n";
$this->debug();
}
protected function debug(){
if( $this->debug ){
$mapId = 1;
$characterId = 1946320202;
$subscriptions = $this->subscriptions[$mapId];
$connectionsForChar = count($this->characters[$characterId]);
$mapAccessData = $this->mapAccessData[$mapId][$characterId];
echo "\n" . "========== START ==========" . "\n";
echo "-> characterAccessData: " . "\n";
var_dump( $this->characterAccessData );
echo "\n" . "-> Subscriptions mapId: " . $mapId . " " . "\n";
var_dump($subscriptions);
echo "\n" . "-> connectionsForChar characterId: " . $characterId . " count: " . $connectionsForChar . " " . "\n";
echo "-> mapAccessData: " . "\n";
var_dump($mapAccessData);
echo "\n" . "========== END ==========" . "\n";
}
}
}

94
app/Data/Payload.php Normal file
View File

@@ -0,0 +1,94 @@
<?php
namespace Exodus4D\Socket\Data;
/**
* Class Payload
* @package Exodus4D\Socket\Data
* @property string $task
* @property mixed $load
*/
class Payload implements \JsonSerializable {
/**
* error message for missing 'task' name
*/
const ERROR_TASK_MISSING = "'task' must be a not empty string";
/**
* task name
* @var string
*/
private $task = '';
/**
* payload data
* @var mixed
*/
private $load;
/**
* optional characterId array -> recipients
* -> e.g if multiple browser tabs are open
* @var null|array
*/
private $characterIds;
/**
* Payload constructor.
* @param string $task
* @param null $load
* @param array|null $characterIds
*/
public function __construct(string $task, $load = null, ?array $characterIds = null){
$this->setTask($task);
$this->setLoad($load);
$this->setCharacterIds($characterIds);
}
/**
* @param string $task
*/
public function setTask(string $task){
if($task){
$this->task = $task;
}else{
throw new \InvalidArgumentException(self::ERROR_TASK_MISSING);
}
}
/**
* @param null $load
*/
public function setLoad($load = null){
$this->load = $load;
}
/**
* @param array|null $characterIds
*/
public function setCharacterIds(?array $characterIds){
if(is_array($characterIds)){
$this->characterIds = $characterIds;
}else{
$this->characterIds = null;
}
}
/**
* @param $name
* @return mixed
*/
public function __get($name){
return $this->$name;
}
/**
* @return array|mixed
*/
public function jsonSerialize(){
return get_object_vars($this);
}
}

90
app/Log/ShellColors.php Normal file
View File

@@ -0,0 +1,90 @@
<?php
namespace Exodus4D\Socket\Log;
class ShellColors {
/**
* all foreground color codes
* @var array
*/
private $foregroundColors = [];
/**
* all background color codes
* @var array
*/
private $backgroundColors = [];
public function __construct() {
// set up "Shell" colors
$this->foregroundColors['black'] = '0;30';
$this->foregroundColors['dark_gray'] = '1;30';
$this->foregroundColors['blue'] = '0;34';
$this->foregroundColors['light_blue'] = '1;34';
$this->foregroundColors['green'] = '0;32';
$this->foregroundColors['light_green'] = '1;32';
$this->foregroundColors['cyan'] = '0;36';
$this->foregroundColors['light_cyan'] = '1;36';
$this->foregroundColors['red'] = '0;31';
$this->foregroundColors['light_red'] = '1;31';
$this->foregroundColors['purple'] = '0;35';
$this->foregroundColors['light_purple'] = '1;35';
$this->foregroundColors['brown'] = '0;33';
$this->foregroundColors['yellow'] = '1;33';
$this->foregroundColors['light_gray'] = '0;37';
$this->foregroundColors['white'] = '1;37';
$this->backgroundColors['black'] = '40';
$this->backgroundColors['red'] = '41';
$this->backgroundColors['green'] = '42';
$this->backgroundColors['yellow'] = '43';
$this->backgroundColors['blue'] = '44';
$this->backgroundColors['magenta'] = '45';
$this->backgroundColors['cyan'] = '46';
$this->backgroundColors['light_gray'] = '47';
}
/**
* get colored string
* @param string $string
* @param string|null $foregroundColor
* @param string|null $backgroundColor
* @return string
*/
public function getColoredString(string $string, ?string $foregroundColor = null, ?string $backgroundColor = null) : string {
$coloredString = "";
// Check if given foreground color found
if (isset($this->foregroundColors[$foregroundColor])) {
$coloredString .= "\033[" . $this->foregroundColors[$foregroundColor] . "m";
}
// Check if given background color found
if (isset($this->backgroundColors[$backgroundColor])) {
$coloredString .= "\033[" . $this->backgroundColors[$backgroundColor] . "m";
}
// Add string and end coloring
$coloredString .= $string . "\033[0m";
return $coloredString;
}
/**
* returns all foreground color names
* @return array
*/
public function getForegroundColors() : array {
return array_keys($this->foregroundColors);
}
/**
* returns all background color names
* @return array
*/
public function getBackgroundColors() : array {
return array_keys($this->backgroundColors);
}
}

220
app/Log/Store.php Normal file
View File

@@ -0,0 +1,220 @@
<?php
namespace Exodus4D\Socket\Log;
class Store {
/**
* default for: unique store name
*/
const DEFAULT_NAME = 'store';
/**
* default for: echo log data in terminal
*/
const DEFAULT_LOG_TO_STDOUT = true;
/**
* default for: max cached log entries
*/
const DEFAULT_LOG_STORE_SIZE = 50;
/**
* @see Store::DEFAULT_NAME
* @var string
*/
private $name = self::DEFAULT_NAME;
/**
* log store for log entries
* -> store size should be limited for memory reasons
* @var array
*/
private $store = [];
/**
* all valid types for custom log events
* if value is false, logs for this type are ignored
* @var array
*/
protected $logTypes = [
'error' => true,
'info' => true,
'debug' => true
];
/**
* if Store is locked, current state can not be changed
* @var bool
*/
protected $locked = false;
/**
* @var ShellColors
*/
static $colors;
/**
* Store constructor.
* @param string $name
*/
public function __construct(string $name){
$this->name = $name;
}
/**
* get all stored log entries
* @return array
*/
public function getStore() : array {
return $this->store;
}
/**
* @param bool $locked
*/
public function setLocked(bool $locked){
$this->locked = $locked;
}
/**
* @return bool
*/
public function isLocked() : bool {
return $this->locked;
}
/**
* @param int $logLevel
*/
public function setLogLevel(int $logLevel){
switch($logLevel){
case 3:
$this->logTypes['error'] = true;
$this->logTypes['info'] = true;
$this->logTypes['debug'] = true;
break;
case 2:
$this->logTypes['error'] = true;
$this->logTypes['info'] = true;
$this->logTypes['debug'] = false;
break;
case 1:
$this->logTypes['error'] = true;
$this->logTypes['info'] = false;
$this->logTypes['debug'] = false;
break;
case 0:
default:
$this->setLocked(true); // no logging
}
}
/**
* this is used for custom log events like 'error', 'debug',...
* works as dispatcher method that calls individual log*() methods
* @param $logTypes
* @param string|null $remoteAddress
* @param int|null $resourceId
* @param string $action
* @param string $message
*/
public function log($logTypes, ?string $remoteAddress, ?int $resourceId, string $action, string $message = '') : void {
if(!$this->isLocked()){
// filter out logTypes that should not be logged
$logTypes = array_filter((array)$logTypes, function(string $type) : bool {
return array_key_exists($type, $this->logTypes) && $this->logTypes[$type];
});
if($logTypes){
// get log entry data
$logData = $this->getLogData($logTypes, $remoteAddress, $resourceId, $action, $message);
if(self::DEFAULT_LOG_TO_STDOUT){
$this->echoLog($logData);
}
// add entry to local store and check size limit for store
$this->store[] = $logData;
$this->store = array_slice($this->store, self::DEFAULT_LOG_STORE_SIZE * -1);
}
}
}
/**
* get log data as array for a custom log entry
* @param array $logTypes
* @param string|null $remoteAddress
* @param int|null $resourceId
* @param string $action
* @param string $message
* @return array
*/
private function getLogData(array $logTypes, ?string $remoteAddress, ?int $resourceId, string $action, string $message = '') : array {
$file = null;
$lineNum = null;
$function = null;
$traceIndex = 4;
$backtrace = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, $traceIndex);
if(count($backtrace) == $traceIndex){
$caller = $backtrace[$traceIndex - 2];
$callerOrig = $backtrace[$traceIndex - 1];
$file = substr($caller['file'], strlen(dirname(dirname(dirname($caller['file'])))) + 1);
$lineNum = $caller['line'];
$function = $callerOrig['function'];
}
$microTime = microtime(true);
$logTime = self::getDateTimeFromMicrotime($microTime);
return [
'store' => $this->name,
'mTime' => $microTime,
'mTimeFormat1' => $logTime->format('Y-m-d H:i:s.u'),
'mTimeFormat2' => $logTime->format('H:i:s'),
'logTypes' => $logTypes,
'remoteAddress' => $remoteAddress,
'resourceId' => $resourceId,
'fileName' => $file,
'lineNumber' => $lineNum,
'function' => $function,
'action' => $action,
'message' => $message
];
}
/**
* echo log data to stdout -> terminal
* @param array $logData
*/
private function echoLog(array $logData) : void {
if(!self::$colors){
self::$colors = new ShellColors();
}
$data = [
self::$colors->getColoredString($logData['mTimeFormat1'], 'dark_gray'),
self::$colors->getColoredString($logData['store'], $logData['store'] == 'webSock' ? 'brown' : 'cyan'),
$logData['remoteAddress'] . ($logData['resourceId'] ? ' #' . $logData['resourceId'] : ''),
self::$colors->getColoredString($logData['fileName'] . ' line ' . $logData['lineNumber'], 'dark_gray'),
self::$colors->getColoredString($logData['function'] . '()' . (($logData['function'] !== $logData['action']) ? ' [' . $logData['action'] . ']' : ''), 'dark_gray'),
implode((array)$logData['logTypes'], ','),
self::$colors->getColoredString($logData['message'], 'light_purple')
];
echo implode(array_filter($data), ' | ') . PHP_EOL;
}
/**
* @see https://stackoverflow.com/a/29598719/4329969
* @param float $mTime
* @return \DateTime
*/
public static function getDateTimeFromMicrotime(float $mTime) : \DateTime {
return \DateTime::createFromFormat('U.u', number_format($mTime, 6, '.', ''));
}
}

View File

@@ -0,0 +1,67 @@
<?php
namespace Exodus4D\Socket\Socket;
use Exodus4D\Socket\Log\Store;
use React\EventLoop;
use React\Socket;
use Ratchet\MessageComponentInterface;
abstract class AbstractSocket {
/**
* unique name for this component
* -> should be overwritten in child instances
* -> is used as "log store" name
*/
const COMPONENT_NAME = 'default';
/**
* global server loop
* @var EventLoop\LoopInterface
*/
protected $loop;
/**
* @var MessageComponentInterface
*/
protected $handler;
/**
* @var Store
*/
protected $logStore;
/**
* AbstractSocket constructor.
* @param EventLoop\LoopInterface $loop
* @param MessageComponentInterface $handler
* @param Store $store
*/
public function __construct(
EventLoop\LoopInterface $loop,
MessageComponentInterface $handler,
Store $store
){
$this->loop = $loop;
$this->handler = $handler;
$this->logStore = $store;
$this->log(['debug', 'info'], null, 'START', 'start Socket server…');
}
/**
* @param $logTypes
* @param Socket\ConnectionInterface|null $connection
* @param string $action
* @param string $message
*/
public function log($logTypes, ?Socket\ConnectionInterface $connection, string $action, string $message = '') : void {
if(!$this->logStore->isLocked()){
$remoteAddress = $connection ? $connection->getRemoteAddress() : null;
$this->logStore->log($logTypes, $remoteAddress, null, $action, $message);
}
}
}

View File

@@ -9,6 +9,7 @@
namespace Exodus4D\Socket\Socket;
use Exodus4D\Socket\Log\Store;
use React\EventLoop;
use React\Socket;
use React\Promise;
@@ -16,7 +17,14 @@ use React\Stream;
use Clue\React\NDJson;
use Ratchet\MessageComponentInterface;
class TcpSocket {
class TcpSocket extends AbstractSocket{
/**
* unique name for this component
* -> should be overwritten in child instances
* -> is used as "log store" name
*/
const COMPONENT_NAME = 'tcpSock';
/**
* error message for unknown acceptType
@@ -77,12 +85,7 @@ class TcpSocket {
/**
* default for: add socket statistic to response payload
*/
const DEFAULT_STATS = true;
/**
* default for: echo debug messages
*/
const DEFAULT_DEBUG = false;
const DEFAULT_ADD_STATS = false;
/**
* max length for JSON data string
@@ -90,17 +93,6 @@ class TcpSocket {
*/
const JSON_DECODE_MAX_LENGTH = 65536 * 4;
/**
* global server loop
* @var EventLoop\LoopInterface
*/
private $loop;
/**
* @var MessageComponentInterface
*/
private $handler;
/**
* @see TcpSocket::DEFAULT_ACCEPT_TYPE
* @var string
@@ -123,7 +115,7 @@ class TcpSocket {
* @see TcpSocket::DEFAULT_STATS
* @var bool
*/
private $stats = self::DEFAULT_STATS;
private $addStats = self::DEFAULT_ADD_STATS;
/**
* storage for all active connections
@@ -137,18 +129,19 @@ class TcpSocket {
* -> represents number of active connected clients
* @var int
*/
private $maxConnections = 0;
private $maxConnections = 0;
/**
* timestamp on startup
* @var int
*/
private $startupTime = 0;
private $startupTime = 0;
/**
* TcpSocket constructor.
* @param EventLoop\LoopInterface $loop
* @param MessageComponentInterface $handler
* @param Store $store
* @param string $acceptType
* @param float $waitTimeout
* @param bool $endWithResponse
@@ -156,12 +149,13 @@ class TcpSocket {
public function __construct(
EventLoop\LoopInterface $loop,
MessageComponentInterface $handler,
Store $store,
string $acceptType = self::DEFAULT_ACCEPT_TYPE,
float $waitTimeout = self::DEFAULT_WAIT_TIMEOUT,
bool $endWithResponse = self::DEFAULT_END_WITH_RESPONSE
){
$this->loop = $loop;
$this->handler = $handler;
parent::__construct($loop, $handler, $store);
$this->acceptType = $acceptType;
$this->waitTimeout = $waitTimeout;
$this->endWithResponse = $endWithResponse;
@@ -173,7 +167,7 @@ class TcpSocket {
* @param Socket\ConnectionInterface $connection
*/
public function onConnect(Socket\ConnectionInterface $connection){
$this->debug($connection, __FUNCTION__, '----------------------------------------');
$this->log('debug', $connection, __FUNCTION__, 'open connection…');
if($this->isValidConnection($connection)){
// connection can be used
@@ -182,32 +176,30 @@ class TcpSocket {
// set waitTimeout timer for connection
$this->setTimerTimeout($connection, $this->waitTimeout);
$this->debug($connection, __FUNCTION__);
// register connection events ... -------------------------------------------------------------------------
$this->initRead($connection)
->then($this->initDispatch())
->then($this->initDispatch($connection))
->then($this->initResponse($connection))
->then(
function(string $message) use ($connection) {
$this->debug($connection, 'DONE', $message);
function(array $payload) use ($connection) {
$this->log(['debug', 'info'], $connection,'DONE', 'task "' . $payload['task'] . '" done → response send');
},
function(\Exception $e) use ($connection) {
$this->debug($connection, 'ERROR', $e->getMessage());
$this->log(['debug', 'error'], $connection, 'ERROR', $e->getMessage());
$this->connectionError($connection, $e);
});
$connection->on('end', function() use ($connection) {
$this->debug($connection, 'onEnd');
$this->log('debug', $connection, 'onEnd');
});
$connection->on('close', function() use ($connection){
$this->debug($connection, 'onClose');
$connection->on('close', function() use ($connection) {
$this->log(['debug'], $connection, 'onClose', 'close connection');
$this->removeConnection($connection);
});
$connection->on('error', function(\Exception $e) use ($connection) {
$this->debug($connection, 'onError', $e->getMessage());
$this->log(['debug', 'error'], $connection, 'onError', $e->getMessage());
});
}else{
// invalid connection -> can not be used
@@ -254,15 +246,16 @@ class TcpSocket {
/**
* init dispatcher for payload
* @param Socket\ConnectionInterface $connection
* @return callable
*/
protected function initDispatch() : callable {
return function($payload) : Promise\PromiseInterface {
protected function initDispatch(Socket\ConnectionInterface $connection) : callable {
return function(array $payload) use ($connection) : Promise\PromiseInterface {
$task = (string)$payload['task'];
if(!empty($task)){
$load = $payload['load'];
$deferred = new Promise\Deferred();
$this->dispatch($deferred, $task, $load);
$this->dispatch($connection, $deferred, $task, $load);
return $deferred->promise();
}else{
return new Promise\RejectedPromise(
@@ -273,17 +266,21 @@ class TcpSocket {
}
/**
* @param Socket\ConnectionInterface $connection
* @param Promise\Deferred $deferred
* @param string $task
* @param null $load
*/
protected function dispatch(Promise\Deferred $deferred, string $task, $load = null) : void {
protected function dispatch(Socket\ConnectionInterface $connection, Promise\Deferred $deferred, string $task, $load = null) : void {
$addStatusData = false;
switch($task){
case 'getStats':
$deferred->resolve($this->newPayload($task, null));
$addStatusData = true;
$deferred->resolve($this->newPayload($task, null, $addStatusData));
break;
case 'healthCheck':
$addStatusData = true;
case 'characterUpdate':
case 'characterLogout':
case 'mapConnectionAccess':
@@ -292,10 +289,13 @@ class TcpSocket {
case 'mapDeleted':
case 'logData':
if(method_exists($this->handler, 'receiveData')){
$this->log(['info'], $connection, __FUNCTION__, 'task "' . $task . '" processing…');
$deferred->resolve(
$this->newPayload(
$task,
call_user_func_array([$this->handler, 'receiveData'], [$task, $load])
call_user_func_array([$this->handler, 'receiveData'], [$task, $load]),
$addStatusData
)
);
}else{
@@ -313,7 +313,7 @@ class TcpSocket {
*/
protected function initResponse(Socket\ConnectionInterface $connection) : callable {
return function(array $payload) use ($connection) : Promise\PromiseInterface {
$this->debug($connection, 'initResponse');
$this->log('debug', $connection, 'initResponse', 'task "' . $payload['task'] . '" → init response');
$deferred = new Promise\Deferred();
$this->write($deferred, $connection, $payload);
@@ -344,7 +344,7 @@ class TcpSocket {
}
if($write){
$deferred->resolve('OK');
$deferred->resolve($payload);
}else{
$deferred->reject(new \Exception(
sprintf(self::ERROR_STREAM_NOT_WRITABLE, $connection->getRemoteAddress())
@@ -360,7 +360,8 @@ class TcpSocket {
* @param \Exception $e
*/
protected function connectionError(Socket\ConnectionInterface $connection, \Exception $e){
$this->debug($connection, __FUNCTION__, $e->getMessage());
$errorMessage = $e->getMessage();
$this->log(['debug', 'error'], $connection, __FUNCTION__, $errorMessage);
if($connection->isWritable()){
if('json' == $this->acceptType){
@@ -368,7 +369,7 @@ class TcpSocket {
}
// send "end" data, then close
$connection->end($this->newPayload('error', $e->getMessage()));
$connection->end($this->newPayload('error', $errorMessage, true));
}else{
// close connection
$connection->close();
@@ -475,7 +476,9 @@ class TcpSocket {
// update maxConnections count
$this->maxConnections = max($this->connections->count(), $this->maxConnections);
$this->debug($connection, __FUNCTION__);
$this->log(['debug'], $connection, __FUNCTION__, 'add new connection');
}else{
$this->log(['debug'], $connection, __FUNCTION__, 'connection already exists');
}
}
@@ -485,7 +488,7 @@ class TcpSocket {
*/
protected function removeConnection(Socket\ConnectionInterface $connection){
if($this->hasConnection($connection)){
$this->debug($connection, __FUNCTION__);
$this->log(['debug'], $connection, __FUNCTION__, 'remove connection');
$this->cancelTimers($connection);
$this->connections->detach($connection);
}
@@ -495,15 +498,16 @@ class TcpSocket {
* get new payload
* @param string $task
* @param null $load
* @param bool $addStats
* @return array
*/
protected function newPayload(string $task, $load = null) : array {
protected function newPayload(string $task, $load = null, bool $addStats = false) : array {
$payload = [
'task' => $task,
'load' => $load
];
if($this->stats){
if($addStats || $this->addStats){
// add socket statistics
$payload['stats'] = $this->getStats();
}
@@ -527,34 +531,21 @@ class TcpSocket {
*/
protected function getStats() : array {
return [
'startup' => time() - $this->startupTime,
'connections' => $this->connections->count(),
'maxConnections' => $this->maxConnections
'tcpSocket' => $this->getSocketStats(),
'webSocket' => $this->handler->getSocketStats()
];
}
/**
* echo debug messages
* @param Socket\ConnectionInterface $connection
* @param string $method
* @param string $message
* get TcpSocket stats data
* @return array
*/
protected function debug(Socket\ConnectionInterface $connection, string $method, string $message = ''){
if(self::DEFAULT_DEBUG){
$backtrace = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 3);
$caller = array_shift($backtrace);
$callerOrig = array_shift($backtrace);
$data = [
date('Y-m-d H:i:s'),
'DEBUG',
$connection->getRemoteAddress(),
$caller['file'] . ' line ' . $caller['line'],
$callerOrig['function'] . '()' . (($callerOrig['function'] !== $method) ? ' [' . $method . '()]' : ''),
$message
];
echo implode(array_filter($data), ' | ') . PHP_EOL;
}
protected function getSocketStats() : array {
return [
'startup' => time() - $this->startupTime,
'connections' => $this->connections->count(),
'maxConnections' => $this->maxConnections,
'logs' => array_reverse($this->logStore->getStore())
];
}
}

View File

@@ -9,6 +9,7 @@
namespace Exodus4D\Socket;
use Exodus4D\Socket\Log\Store;
use Exodus4D\Socket\Socket\TcpSocket;
use React\EventLoop;
use React\Socket;
@@ -33,16 +34,23 @@ class WebSockets {
*/
protected $wsListenHost;
/**
* @var int
*/
protected $debug;
/**
* WebSockets constructor.
* @param string $dsn
* @param int $wsListenPort
* @param string $wsListenHost
* @param int $debug
*/
function __construct(string $dsn, int $wsListenPort, string $wsListenHost){
function __construct(string $dsn, int $wsListenPort, string $wsListenHost, int $debug = 1){
$this->dsn = $dsn;
$this->wsListenPort = $wsListenPort;
$this->wsListenHost = $wsListenHost;
$this->debug = $debug;
$this->startMapSocket();
}
@@ -51,11 +59,22 @@ class WebSockets {
// global EventLoop
$loop = EventLoop\Factory::create();
// global MessageComponent (main app) (handles all business logic)
$mapUpdate = new Main\MapUpdate();
// new Stores for logging -------------------------------------------------------------------------------------
$webSocketLogStore = new Store(Component\MapUpdate::COMPONENT_NAME);
$webSocketLogStore->setLogLevel($this->debug);
$tcpSocketLogStore = new Store(TcpSocket::COMPONENT_NAME);
$tcpSocketLogStore->setLogLevel($this->debug);
// global MessageComponent (main app) (handles all business logic) --------------------------------------------
$mapUpdate = new Component\MapUpdate($webSocketLogStore);
$loop->addPeriodicTimer(3, function(EventLoop\TimerInterface $timer) use ($mapUpdate) {
$mapUpdate->housekeeping($timer);
});
// TCP Socket -------------------------------------------------------------------------------------------------
$tcpSocket = new TcpSocket($loop, $mapUpdate);
$tcpSocket = new TcpSocket($loop, $mapUpdate, $tcpSocketLogStore);
// TCP Server (WebServer <-> TCPServer <-> TCPSocket communication)
$server = new Socket\Server($this->dsn, $loop, [
'tcp' => [
@@ -64,12 +83,12 @@ class WebSockets {
]
]);
$server->on('connection', function(Socket\ConnectionInterface $connection) use ($tcpSocket){
$server->on('connection', function(Socket\ConnectionInterface $connection) use ($tcpSocket) {
$tcpSocket->onConnect($connection);
});
$server->on('error', function(\Exception $e){
echo 'error: ' . $e->getMessage() . PHP_EOL;
$server->on('error', function(\Exception $e) use ($tcpSocket) {
$tcpSocket->log(['debug', 'error'], null, 'onError', $e->getMessage());
});
// WebSocketServer --------------------------------------------------------------------------------------------

76
cmd.php
View File

@@ -1,33 +1,73 @@
<?php
require 'vendor/autoload.php';
use Exodus4D\Socket;
if(PHP_SAPI === 'cli'){
// optional CLI params
$options = getopt('', [
'pf_listen_host:',
'pf_listen_port:',
'pf_host:',
'pf_port:'
]);
// optional CLI params -> default values
// The default values should be fine for 99% of you!
$longOpts = [
'wsHost:' => '0.0.0.0', // WebSocket connection (for WebClients => Browser). '0.0.0.0' <-- any client can connect!
'wsPort:' => 8020, // ↪ default WebSocket URI: 127.0.0.1:8020. This is where Nginx must proxy WebSocket traffic to
'tcpHost:' => '127.0.0.1', // TcpSocket connection (for WebServer ⇄ WebSocket)
'tcpPort:' => 5555, // ↪ default TcpSocket URI: tcp://127.0.0.1:5555
'debug:' => 2 // Debug level [0-3] 0 = silent, 1 = errors, 2 = error + info, 3 = error + info + debug
];
// get options from CLI parameter + default values
$cliOpts = getopt('', array_keys($longOpts));
$options = [];
array_walk($longOpts, function($defaultVal, $optKey) use ($cliOpts, &$options) {
$key = trim($optKey, ':');
$val = $defaultVal;
if(array_key_exists($key, $cliOpts)){
$val = is_int($defaultVal) ? (int)$cliOpts[$key] : $cliOpts[$key] ;
}
$options[$key] = $val;
});
/**
* WebSocket connection (for WebClients => Browser)
* default WebSocket URI: ws://127.0.0.1:8020
*
* pf_client_ip '0.0.0.0' <-- any client can connect
* pf_ws_port 8020 <-- any client can connect
* print current config parameters to Shell
* @param array $longOpts
* @param array $options
*/
$wsListenHost = (!empty($options['pf_listen_host'])) ? $options['pf_listen_host'] : '0.0.0.0' ;
$wsListenPort = (!empty($options['pf_listen_port'])) ? (int)$options['pf_listen_port'] : 8020 ;
$showHelp = function(array $longOpts, array $options){
$optKeys = array_keys($longOpts);
$colors = new Socket\Log\ShellColors();
$data = [];
$host = (!empty($options['pf_host'])) ? $options['pf_host'] : '127.0.0.1' ;
$port = (!empty($options['pf_port'])) ? (int)$options['pf_port'] : 5555 ;
// headline for CLI config parameters
$rowData = $colors->getColoredString(str_pad(' param', 12), 'white');
$rowData .= $colors->getColoredString(str_pad('value', 18, ' ', STR_PAD_LEFT), 'white');
$rowData .= $colors->getColoredString(str_pad('default', 15, ' ', STR_PAD_LEFT), 'white');
$dsn = 'tcp://' . $host . ':' . $port;
$data[] = $rowData;
$data[] = str_pad(' ', 45, '-');
$i = 0;
foreach($options as $optKey => $optVal){
$rowData = $colors->getColoredString(str_pad(' -' . $optKey, 12), 'yellow');
$rowData .= $colors->getColoredString(str_pad($optVal, 18, ' ', STR_PAD_LEFT), 'light_purple');
$rowData .= $colors->getColoredString(str_pad($longOpts[$optKeys[$i]], 15, ' ', STR_PAD_LEFT), 'dark_gray');
$data[] = $rowData;
$i++;
}
$data[] = '';
echo implode($data, PHP_EOL) . PHP_EOL;
};
if($options['debug']){
// print if -debug > 0
$showHelp($longOpts, $options);
}
$dsn = 'tcp://' . $options['tcpHost'] . ':' . $options['tcpPort'];
new Socket\WebSockets($dsn, $options['wsPort'], $options['wsHost'], $options['debug']);
new Socket\WebSockets($dsn, $wsListenPort, $wsListenHost);
}else{
echo "Script need to be called by CLI!";
}