diff --git a/README.md b/README.md index cf090b3..cb36f96 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ Make sure you meet the requirements before continue with the installation. - Port:`8020` - URI:`127.0.0.1:8020` (Your WebServer (e.g. Nginx) should pass all WebSocket connections to this source) -**TCP Socket connection (Internal use fore WebServer <=> WebSocket communication)** +**TCP TcpSocket connection (Internal use fore WebServer <=> WebSocket communication)** - Host:`127.0.0.1` (=> Assumed WebServer and WebSocket Server running on the same machine) - Port:`5555` - URI: `tcp://127.0.0.1:5555` diff --git a/app/Main/MapUpdate.php b/app/Main/MapUpdate.php index 9d45ab2..f950c01 100644 --- a/app/Main/MapUpdate.php +++ b/app/Main/MapUpdate.php @@ -485,43 +485,44 @@ class MapUpdate implements MessageComponentInterface { /** * receive data from TCP socket (main App) * -> send response back - * @param $data + * @param string $task + * @param null $load + * @return bool|float|int|null */ - public function receiveData($data){ - $data = (array)json_decode($data, true); - $load = $data['load']; - $task = $data['task']; - $response = false; + public function receiveData(string $task, $load = null){ + $responseLoad = null; switch($task){ + case 'healthCheck': + $this->healthCheckToken = (float)$load; + $responseLoad = $this->healthCheckToken; + break; case 'characterUpdate': - $response = $this->updateCharacterData($load); + $this->updateCharacterData($load); $mapIds = $this->getMapIdsByCharacterId((int)$load['id']); $this->broadcastMapSubscriptions('mapSubscriptions', $mapIds); break; case 'characterLogout': - $response = $this->unSubscribeCharacterIds($load); + $responseLoad = $this->unSubscribeCharacterIds($load); break; case 'mapConnectionAccess': - $response = $this->setConnectionAccess($load); + $responseLoad = $this->setConnectionAccess($load); break; case 'mapAccess': - $response = $this->setAccess($task, $load); + $responseLoad = $this->setAccess($task, $load); break; case 'mapUpdate': - $response = $this->broadcastMapUpdate($task, $load); + $responseLoad = $this->broadcastMapUpdate($task, $load); break; case 'mapDeleted': - $response = $this->deleteMapId($task, $load); - break; - case 'healthCheck': - $this->healthCheckToken = (float)$load; - $response = 'OK'; + $responseLoad = $this->deleteMapId($task, $load); break; case 'logData': $this->handleLogData((array)$load['meta'], (array)$load['log']); break; } + + return $responseLoad; } private function setCharacterData(array $characterData){ diff --git a/app/Socket/TcpSocket.php b/app/Socket/TcpSocket.php new file mode 100644 index 0000000..fdcea64 --- /dev/null +++ b/app/Socket/TcpSocket.php @@ -0,0 +1,549 @@ + affects en/decoding socket data + */ + const DEFAULT_ACCEPT_TYPE = 'json'; + + /** + * default for: wait timeout + * -> timeout until connection gets closed + * timeout should be "reset" right after successful response send to client + */ + const DEFAULT_WAIT_TIMEOUT = 3.0; + + /** + * default for: send response by end() method (rather than write()) + * -> connection will get closed right after successful response send to client + */ + const DEFAULT_END_WITH_RESPONSE = true; + + /** + * default for: add socket statistic to response payload + */ + const DEFAULT_STATS = true; + + /** + * default for: echo debug messages + */ + const DEFAULT_DEBUG = false; + + /** + * global server loop + * @var EventLoop\LoopInterface + */ + private $loop; + + /** + * @var MessageComponentInterface + */ + private $handler; + + /** + * @see TcpSocket::DEFAULT_ACCEPT_TYPE + * @var string + */ + private $acceptType = self::DEFAULT_ACCEPT_TYPE; + + /** + * @see TcpSocket::DEFAULT_WAIT_TIMEOUT + * @var float + */ + private $waitTimeout = self::DEFAULT_WAIT_TIMEOUT; + + /** + * @see TcpSocket::DEFAULT_END_WITH_RESPONSE + * @var bool + */ + private $endWithResponse = self::DEFAULT_END_WITH_RESPONSE; + + /** + * @see TcpSocket::DEFAULT_STATS + * @var bool + */ + private $stats = self::DEFAULT_STATS; + + /** + * storage for all active connections + * -> can be used to get current count of connected clients + * @var \SplObjectStorage + */ + private $connections; + + /** + * max count of concurrent open connections + * -> represents number of active connected clients + * @var int + */ + private $maxConnections = 0; + + /** + * timestamp on startup + * @var int + */ + private $startupTime = 0; + + /** + * TcpSocket constructor. + * @param EventLoop\LoopInterface $loop + * @param MessageComponentInterface $handler + * @param string $acceptType + * @param float $waitTimeout + * @param bool $endWithResponse + */ + public function __construct( + EventLoop\LoopInterface $loop, + MessageComponentInterface $handler, + string $acceptType = self::DEFAULT_ACCEPT_TYPE, + float $waitTimeout = self::DEFAULT_WAIT_TIMEOUT, + bool $endWithResponse = self::DEFAULT_END_WITH_RESPONSE + ){ + $this->loop = $loop; + $this->handler = $handler; + $this->acceptType = $acceptType; + $this->waitTimeout = $waitTimeout; + $this->endWithResponse = $endWithResponse; + $this->connections = new \SplObjectStorage(); + $this->startupTime = time(); + } + + /** + * @param Socket\ConnectionInterface $connection + */ + public function onConnect(Socket\ConnectionInterface $connection){ + $this->debug($connection, __FUNCTION__, '----------------------------------------'); + + if($this->isValidConnection($connection)){ + // connection can be used + // add connection to global connection pool + $this->addConnection($connection); + // set waitTimeout timer for connection + $this->setTimerTimeout($connection, $this->waitTimeout); + + $this->debug($connection, __FUNCTION__); + + // register connection events ... ------------------------------------------------------------------------- + $this->initRead($connection) + ->then($this->initDispatch()) + ->then($this->initResponse($connection)) + ->then( + function(string $message) use ($connection) { + $this->debug($connection, 'DONE', $message); + }, + function(\Exception $e) use ($connection) { + $this->debug($connection, 'ERROR', $e->getMessage()); + $this->connectionError($connection, $e); + }); + + $connection->on('end', function() use ($connection) { + $this->debug($connection, 'onEnd'); + }); + + $connection->on('close', function() use ($connection){ + $this->debug($connection, 'onClose'); + $this->removeConnection($connection); + }); + + $connection->on('error', function(\Exception $e) use ($connection) { + $this->debug($connection, 'onError', $e->getMessage()); + }); + }else{ + // invalid connection -> can not be used + $connection->close(); + } + } + + /** + * @param Socket\ConnectionInterface $connection + * @return Promise\PromiseInterface + */ + protected function initRead(Socket\ConnectionInterface $connection) : Promise\PromiseInterface { + if($connection->isReadable()){ + if('json' == $this->acceptType){ + // new empty stream for processing JSON + $stream = new Stream\ThroughStream(); + $streamDecoded = new NDJson\Decoder($stream, true); + + // promise get resolved on first emit('data') + $promise = Promise\Stream\first($streamDecoded); + + // register on('data') for main input stream + $connection->once('data', function ($chunk) use ($stream) { + // send current data chunk to processing stream -> resolves promise + $stream->emit('data', [$chunk]); + }); + + return $promise; + }else{ + return new Promise\RejectedPromise( + new \InvalidArgumentException( + sprintf(self::ERROR_ACCEPT_TYPE, $this->acceptType) + ) + ); + } + }else{ + return new Promise\RejectedPromise( + new \Exception( + sprintf(self::ERROR_STREAM_NOT_READABLE, $connection->getRemoteAddress()) + ) + ); + } + } + + /** + * init dispatcher for payload + * @return callable + */ + protected function initDispatch() : callable { + return function($payload) : Promise\PromiseInterface { + $task = (string)$payload['task']; + if(!empty($task)){ + $load = $payload['load']; + $deferred = new Promise\Deferred(); + $this->dispatch($deferred, $task, $load); + return $deferred->promise(); + }else{ + return new Promise\RejectedPromise( + new \InvalidArgumentException(self::ERROR_TASK_MISSING) + ); + } + }; + } + + /** + * @param Promise\Deferred $deferred + * @param string $task + * @param null $load + */ + protected function dispatch(Promise\Deferred $deferred, string $task, $load = null) : void { + + switch($task){ + case 'getStats': + $deferred->resolve($this->newPayload($task, null)); + break; + case 'healthCheck': + case 'characterUpdate': + case 'characterLogout': + case 'mapConnectionAccess': + case 'mapAccess': + case 'mapUpdate': + case 'mapDeleted': + case 'logData': + if(method_exists($this->handler, 'receiveData')){ + $deferred->resolve( + $this->newPayload( + $task, + call_user_func_array([$this->handler, 'receiveData'], [$task, $load]) + ) + ); + }else{ + $deferred->reject(new \Exception(sprintf(self::ERROR_METHOD_MISSING, 'receiveData'))); + } + break; + default: + $deferred->reject(new \InvalidArgumentException(sprintf(self::ERROR_TASK_UNKNOWN, $task))); + } + } + + /** + * @param Socket\ConnectionInterface $connection + * @return callable + */ + protected function initResponse(Socket\ConnectionInterface $connection) : callable { + return function(array $payload) use ($connection) : Promise\PromiseInterface { + $this->debug($connection, 'initResponse'); + + $deferred = new Promise\Deferred(); + $this->write($deferred, $connection, $payload); + + return $deferred->promise(); + }; + } + + /** + * @param Promise\Deferred $deferred + * @param Socket\ConnectionInterface $connection + * @param array $payload + */ + protected function write(Promise\Deferred $deferred, Socket\ConnectionInterface $connection, array $payload) : void { + $write = false; + if($connection->isWritable()){ + if('json' == $this->acceptType){ + $connection = new NDJson\Encoder($connection); + } + + // write a new chunk of data to connection stream + $write = $connection->write($payload); + + if($this->endWithResponse){ + // connection should be closed (and removed from this socket server) + $connection->end(); + } + } + + if($write){ + $deferred->resolve('OK'); + }else{ + $deferred->reject(new \Exception( + sprintf(self::ERROR_STREAM_NOT_WRITABLE, $connection->getRemoteAddress()) + )); + } + } + + /** + * $connection has error + * -> if writable -> end() connection with $payload (close() is called by default) + * -> if readable -> close() connection + * @param Socket\ConnectionInterface $connection + * @param \Exception $e + */ + protected function connectionError(Socket\ConnectionInterface $connection, \Exception $e){ + $this->debug($connection, __FUNCTION__, $e->getMessage()); + + if($connection->isWritable()){ + if('json' == $this->acceptType){ + $connection = new NDJson\Encoder($connection); + } + + // send "end" data, then close + $connection->end($this->newPayload('error', $e->getMessage())); + }else{ + // close connection + $connection->close(); + } + } + + /** + * check if $connection is found in global pool + * @param Socket\ConnectionInterface $connection + * @return bool + */ + protected function hasConnection(Socket\ConnectionInterface $connection) : bool { + return $this->connections->contains($connection); + } + + /** + * cancels a previously set timer callback for a $connection + * @param Socket\ConnectionInterface $connection + * @param string $timerName + */ + protected function cancelTimer(Socket\ConnectionInterface $connection, string $timerName){ + if( + $this->hasConnection($connection) && + ($data = (array)$this->connections->offsetGet($connection)) && + isset($data['timers']) && isset($data['timers'][$timerName]) && + ($data['timers'][$timerName] instanceof EventLoop\Timer\TimerInterface) + ){ + $this->loop->cancelTimer($data['timers'][$timerName]); + + unset($data['timers'][$timerName]); + $this->connections->offsetSet($connection, $data); + } + } + + /** + * cancels all previously set timers for a $connection + * @param Socket\ConnectionInterface $connection + */ + protected function cancelTimers(Socket\ConnectionInterface $connection){ + if( + $this->hasConnection($connection) && + ($data = (array)$this->connections->offsetGet($connection)) && + isset($data['timers']) + ){ + foreach((array)$data['timers'] as $timerName => $timer){ + $this->loop->cancelTimer($timer); + } + + $data['timers'] = []; + $this->connections->offsetSet($connection, $data); + } + } + + /** + * @param Socket\ConnectionInterface $connection + * @param string $timerName + * @param float $interval + * @param callable $timerCallback + */ + protected function setTimer(Socket\ConnectionInterface $connection, string $timerName, float $interval, callable $timerCallback){ + if( + $this->hasConnection($connection) && + ($data = (array)$this->connections->offsetGet($connection)) && + isset($data['timers']) + ){ + $data['timers'][$timerName] = $this->loop->addTimer($interval, function() use ($connection, $timerCallback) { + $timerCallback($connection); + }); + + // store new timer to $connection + $this->connections->offsetSet($connection, $data); + } + } + + /** + * cancels and removes previous connection timeout timers + * -> set new connection timeout + * @param Socket\ConnectionInterface $connection + * @param float $waitTimeout + */ + protected function setTimerTimeout(Socket\ConnectionInterface $connection, float $waitTimeout = self::DEFAULT_WAIT_TIMEOUT){ + $this->cancelTimer($connection, 'disconnectTimer'); + $this->setTimer($connection, 'disconnectTimer', $waitTimeout, function(Socket\ConnectionInterface $connection) use ($waitTimeout) { + $errorMessage = sprintf(self::ERROR_WAIT_TIMEOUT, $waitTimeout); + + $this->connectionError( + $connection, + new Promise\Timer\TimeoutException($waitTimeout, $errorMessage) + ); + }); + } + + /** + * add new connection to global pool + * @param Socket\ConnectionInterface $connection + */ + protected function addConnection(Socket\ConnectionInterface $connection){ + if(!$this->hasConnection($connection)){ + $this->connections->attach($connection, [ + 'remoteAddress' => $connection->getRemoteAddress(), + 'timers' => [] + ]); + + // update maxConnections count + $this->maxConnections = max($this->connections->count(), $this->maxConnections); + + $this->debug($connection, __FUNCTION__); + } + } + + /** + * remove $connection from global connection pool + * @param Socket\ConnectionInterface $connection + */ + protected function removeConnection(Socket\ConnectionInterface $connection){ + if($this->hasConnection($connection)){ + $this->debug($connection, __FUNCTION__); + $this->cancelTimers($connection); + $this->connections->detach($connection); + } + } + + /** + * get new payload + * @param string $task + * @param null $load + * @return array + */ + protected function newPayload(string $task, $load = null) : array { + $payload = [ + 'task' => $task, + 'load' => $load + ]; + + if($this->stats){ + // add socket statistics + $payload['stats'] = $this->getStats(); + } + + return $payload; + } + + /** + * check if connection is "valid" and can be used for data transfer + * @param Socket\ConnectionInterface $connection + * @return bool + */ + protected function isValidConnection(Socket\ConnectionInterface $connection) : bool { + return $connection->isReadable() || $connection->isWritable(); + } + + /** + * get socket server statistics + * -> e.g. connected clients count + * @return array + */ + protected function getStats() : array { + return [ + 'startup' => time() - $this->startupTime, + 'connections' => $this->connections->count(), + 'maxConnections' => $this->maxConnections + ]; + } + + /** + * echo debug messages + * @param Socket\ConnectionInterface $connection + * @param string $method + * @param string $message + */ + protected function debug(Socket\ConnectionInterface $connection, string $method, string $message = ''){ + if(self::DEFAULT_DEBUG){ + $data = [ + date('Y-m-d H:i:s'), + __CLASS__ . '()', + $connection->getRemoteAddress(), + $method . '()', + $message + ]; + + echo implode(array_filter($data), ' | ') . PHP_EOL; + } + } +} \ No newline at end of file diff --git a/app/WebSockets.php b/app/WebSockets.php index 6e4ca61..d5b5e0b 100644 --- a/app/WebSockets.php +++ b/app/WebSockets.php @@ -8,52 +8,77 @@ namespace Exodus4D\Socket; + +use Exodus4D\Socket\Socket\TcpSocket; +use React\EventLoop; +use React\Socket; use Ratchet\Server\IoServer; use Ratchet\Http\HttpServer; use Ratchet\WebSocket\WsServer; -use React; - class WebSockets { - protected $dns; + /** + * @var string + */ + protected $dsn; + + /** + * @var int + */ protected $wsListenPort; + + /** + * @var string + */ protected $wsListenHost; - function __construct($dns, $wsListenPort, $wsListenHost){ - $this->dns = $dns; - $this->wsListenPort = (int)$wsListenPort; + /** + * WebSockets constructor. + * @param string $dsn + * @param int $wsListenPort + * @param string $wsListenHost + */ + function __construct(string $dsn, int $wsListenPort, string $wsListenHost){ + $this->dsn = $dsn; + $this->wsListenPort = $wsListenPort; $this->wsListenHost = $wsListenHost; $this->startMapSocket(); } private function startMapSocket(){ - $loop = React\EventLoop\Factory::create(); + // global EventLoop + $loop = EventLoop\Factory::create(); - // Listen for the web server to make a ZeroMQ push after an ajax request - $context = new React\ZMQ\Context($loop); - - // Socket for map update data ------------------------------------------------------------- - $pull = $context->getSocket(\ZMQ::SOCKET_PULL); - // Binding to 127.0.0.1 means, the only client that can connect is itself - $pull->bind( $this->dns ); - - // main app -> inject socket for response + // global MessageComponent (main app) (handles all business logic) $mapUpdate = new Main\MapUpdate(); - // "onMessage" listener - $pull->on('message', [$mapUpdate, 'receiveData']); - // Socket for log data -------------------------------------------------------------------- - //$pullSocketLogs = $context->getSocket(\ZMQ::SOCKET_PULL); - //$pullSocketLogs->bind( "tcp://127.0.0.1:5555" ); - //$pullSocketLogs->on('message', [$mapUpdate, 'receiveLogData']) ; + // TCP Socket ------------------------------------------------------------------------------------------------- + $tcpSocket = new TcpSocket($loop, $mapUpdate); + // TCP Server (WebServer <-> TCPServer <-> TCPSocket communication) + $server = new Socket\Server($this->dsn, $loop, [ + 'tcp' => [ + 'backlog' => 20, + 'so_reuseport' => true + ] + ]); + + $server->on('connection', function(Socket\ConnectionInterface $connection) use ($tcpSocket){ + $tcpSocket->onConnect($connection); + }); + + $server->on('error', function(\Exception $e){ + echo 'error: ' . $e->getMessage() . PHP_EOL; + }); + + // WebSocketServer -------------------------------------------------------------------------------------------- // Binding to 0.0.0.0 means remotes can connect (Web Clients) $webSocketURI = $this->wsListenHost . ':' . $this->wsListenPort; - // Set up our WebSocket server for clients wanting real-time updates - $webSock = new React\Socket\Server($webSocketURI, $loop); + // Set up our WebSocket server for clients subscriptions + $webSock = new Socket\TcpServer($webSocketURI, $loop); new IoServer( new HttpServer( new WsServer( diff --git a/cmd.php b/cmd.php index 7d1c44b..04a282b 100644 --- a/cmd.php +++ b/cmd.php @@ -25,9 +25,9 @@ if(PHP_SAPI === 'cli'){ $host = (!empty($options['pf_host'])) ? $options['pf_host'] : '127.0.0.1' ; $port = (!empty($options['pf_port'])) ? (int)$options['pf_port'] : 5555 ; - $dns = 'tcp://' . $host . ':' . $port; + $dsn = 'tcp://' . $host . ':' . $port; - new Socket\WebSockets($dns, $wsListenPort, $wsListenHost); + new Socket\WebSockets($dsn, $wsListenPort, $wsListenHost); }else{ echo "Script need to be called by CLI!"; } diff --git a/composer.json b/composer.json index 79d635d..6e2c5f9 100644 --- a/composer.json +++ b/composer.json @@ -15,9 +15,13 @@ } }, "require": { - "php-64bit": ">=7.0", - "ext-zmq": ">=1.1.3", + "php-64bit": ">=7.1", + "ext-json": "*", "cboden/ratchet": "0.4.x", - "react/zmq": "0.3.*" + "react/promise-stream": "1.1.*", + "clue/ndjson-react": "1.0.*" + }, + "suggest": { + "ext-event": "If installed, 'ExtEventLoop' class will get used as default event loop. Better performance. https://pecl.php.net/package/event" } } \ No newline at end of file