affects en/decoding socket data */ const DEFAULT_ACCEPT_TYPE = 'json'; /** * default for: wait timeout * -> timeout until connection gets closed * timeout should be "reset" right after successful response send to client */ const DEFAULT_WAIT_TIMEOUT = 3.0; /** * default for: send response by end() method (rather than write()) * -> connection will get closed right after successful response send to client */ const DEFAULT_END_WITH_RESPONSE = true; /** * default for: add socket statistic to response payload */ const DEFAULT_STATS = true; /** * default for: echo debug messages */ const DEFAULT_DEBUG = false; /** * global server loop * @var EventLoop\LoopInterface */ private $loop; /** * @var MessageComponentInterface */ private $handler; /** * @see TcpSocket::DEFAULT_ACCEPT_TYPE * @var string */ private $acceptType = self::DEFAULT_ACCEPT_TYPE; /** * @see TcpSocket::DEFAULT_WAIT_TIMEOUT * @var float */ private $waitTimeout = self::DEFAULT_WAIT_TIMEOUT; /** * @see TcpSocket::DEFAULT_END_WITH_RESPONSE * @var bool */ private $endWithResponse = self::DEFAULT_END_WITH_RESPONSE; /** * @see TcpSocket::DEFAULT_STATS * @var bool */ private $stats = self::DEFAULT_STATS; /** * storage for all active connections * -> can be used to get current count of connected clients * @var \SplObjectStorage */ private $connections; /** * max count of concurrent open connections * -> represents number of active connected clients * @var int */ private $maxConnections = 0; /** * timestamp on startup * @var int */ private $startupTime = 0; /** * TcpSocket constructor. * @param EventLoop\LoopInterface $loop * @param MessageComponentInterface $handler * @param string $acceptType * @param float $waitTimeout * @param bool $endWithResponse */ public function __construct( EventLoop\LoopInterface $loop, MessageComponentInterface $handler, string $acceptType = self::DEFAULT_ACCEPT_TYPE, float $waitTimeout = self::DEFAULT_WAIT_TIMEOUT, bool $endWithResponse = self::DEFAULT_END_WITH_RESPONSE ){ $this->loop = $loop; $this->handler = $handler; $this->acceptType = $acceptType; $this->waitTimeout = $waitTimeout; $this->endWithResponse = $endWithResponse; $this->connections = new \SplObjectStorage(); $this->startupTime = time(); } /** * @param Socket\ConnectionInterface $connection */ public function onConnect(Socket\ConnectionInterface $connection){ $this->debug($connection, __FUNCTION__, '----------------------------------------'); if($this->isValidConnection($connection)){ // connection can be used // add connection to global connection pool $this->addConnection($connection); // set waitTimeout timer for connection $this->setTimerTimeout($connection, $this->waitTimeout); $this->debug($connection, __FUNCTION__); // register connection events ... ------------------------------------------------------------------------- $this->initRead($connection) ->then($this->initDispatch()) ->then($this->initResponse($connection)) ->then( function(string $message) use ($connection) { $this->debug($connection, 'DONE', $message); }, function(\Exception $e) use ($connection) { $this->debug($connection, 'ERROR', $e->getMessage()); $this->connectionError($connection, $e); }); $connection->on('end', function() use ($connection) { $this->debug($connection, 'onEnd'); }); $connection->on('close', function() use ($connection){ $this->debug($connection, 'onClose'); $this->removeConnection($connection); }); $connection->on('error', function(\Exception $e) use ($connection) { $this->debug($connection, 'onError', $e->getMessage()); }); }else{ // invalid connection -> can not be used $connection->close(); } } /** * @param Socket\ConnectionInterface $connection * @return Promise\PromiseInterface */ protected function initRead(Socket\ConnectionInterface $connection) : Promise\PromiseInterface { if($connection->isReadable()){ if('json' == $this->acceptType){ // new empty stream for processing JSON $stream = new Stream\ThroughStream(); $streamDecoded = new NDJson\Decoder($stream, true); // promise get resolved on first emit('data') $promise = Promise\Stream\first($streamDecoded); // register on('data') for main input stream $connection->on('data', function ($chunk) use ($stream) { // send current data chunk to processing stream -> resolves promise $stream->emit('data', [$chunk]); }); return $promise; }else{ return new Promise\RejectedPromise( new \InvalidArgumentException( sprintf(self::ERROR_ACCEPT_TYPE, $this->acceptType) ) ); } }else{ return new Promise\RejectedPromise( new \Exception( sprintf(self::ERROR_STREAM_NOT_READABLE, $connection->getRemoteAddress()) ) ); } } /** * init dispatcher for payload * @return callable */ protected function initDispatch() : callable { return function($payload) : Promise\PromiseInterface { $task = (string)$payload['task']; if(!empty($task)){ $load = $payload['load']; $deferred = new Promise\Deferred(); $this->dispatch($deferred, $task, $load); return $deferred->promise(); }else{ return new Promise\RejectedPromise( new \InvalidArgumentException(self::ERROR_TASK_MISSING) ); } }; } /** * @param Promise\Deferred $deferred * @param string $task * @param null $load */ protected function dispatch(Promise\Deferred $deferred, string $task, $load = null) : void { switch($task){ case 'getStats': $deferred->resolve($this->newPayload($task, null)); break; case 'healthCheck': case 'characterUpdate': case 'characterLogout': case 'mapConnectionAccess': case 'mapAccess': case 'mapUpdate': case 'mapDeleted': case 'logData': if(method_exists($this->handler, 'receiveData')){ $deferred->resolve( $this->newPayload( $task, call_user_func_array([$this->handler, 'receiveData'], [$task, $load]) ) ); }else{ $deferred->reject(new \Exception(sprintf(self::ERROR_METHOD_MISSING, 'receiveData'))); } break; default: $deferred->reject(new \InvalidArgumentException(sprintf(self::ERROR_TASK_UNKNOWN, $task))); } } /** * @param Socket\ConnectionInterface $connection * @return callable */ protected function initResponse(Socket\ConnectionInterface $connection) : callable { return function(array $payload) use ($connection) : Promise\PromiseInterface { $this->debug($connection, 'initResponse'); $deferred = new Promise\Deferred(); $this->write($deferred, $connection, $payload); return $deferred->promise(); }; } /** * @param Promise\Deferred $deferred * @param Socket\ConnectionInterface $connection * @param array $payload */ protected function write(Promise\Deferred $deferred, Socket\ConnectionInterface $connection, array $payload) : void { $write = false; if($connection->isWritable()){ if('json' == $this->acceptType){ $connection = new NDJson\Encoder($connection); } // write a new chunk of data to connection stream $write = $connection->write($payload); if($this->endWithResponse){ // connection should be closed (and removed from this socket server) $connection->end(); } } if($write){ $deferred->resolve('OK'); }else{ $deferred->reject(new \Exception( sprintf(self::ERROR_STREAM_NOT_WRITABLE, $connection->getRemoteAddress()) )); } } /** * $connection has error * -> if writable -> end() connection with $payload (close() is called by default) * -> if readable -> close() connection * @param Socket\ConnectionInterface $connection * @param \Exception $e */ protected function connectionError(Socket\ConnectionInterface $connection, \Exception $e){ $this->debug($connection, __FUNCTION__, $e->getMessage()); if($connection->isWritable()){ if('json' == $this->acceptType){ $connection = new NDJson\Encoder($connection); } // send "end" data, then close $connection->end($this->newPayload('error', $e->getMessage())); }else{ // close connection $connection->close(); } } /** * check if $connection is found in global pool * @param Socket\ConnectionInterface $connection * @return bool */ protected function hasConnection(Socket\ConnectionInterface $connection) : bool { return $this->connections->contains($connection); } /** * cancels a previously set timer callback for a $connection * @param Socket\ConnectionInterface $connection * @param string $timerName */ protected function cancelTimer(Socket\ConnectionInterface $connection, string $timerName){ if( $this->hasConnection($connection) && ($data = (array)$this->connections->offsetGet($connection)) && isset($data['timers']) && isset($data['timers'][$timerName]) && ($data['timers'][$timerName] instanceof EventLoop\Timer\TimerInterface) ){ $this->loop->cancelTimer($data['timers'][$timerName]); unset($data['timers'][$timerName]); $this->connections->offsetSet($connection, $data); } } /** * cancels all previously set timers for a $connection * @param Socket\ConnectionInterface $connection */ protected function cancelTimers(Socket\ConnectionInterface $connection){ if( $this->hasConnection($connection) && ($data = (array)$this->connections->offsetGet($connection)) && isset($data['timers']) ){ foreach((array)$data['timers'] as $timerName => $timer){ $this->loop->cancelTimer($timer); } $data['timers'] = []; $this->connections->offsetSet($connection, $data); } } /** * @param Socket\ConnectionInterface $connection * @param string $timerName * @param float $interval * @param callable $timerCallback */ protected function setTimer(Socket\ConnectionInterface $connection, string $timerName, float $interval, callable $timerCallback){ if( $this->hasConnection($connection) && ($data = (array)$this->connections->offsetGet($connection)) && isset($data['timers']) ){ $data['timers'][$timerName] = $this->loop->addTimer($interval, function() use ($connection, $timerCallback) { $timerCallback($connection); }); // store new timer to $connection $this->connections->offsetSet($connection, $data); } } /** * cancels and removes previous connection timeout timers * -> set new connection timeout * @param Socket\ConnectionInterface $connection * @param float $waitTimeout */ protected function setTimerTimeout(Socket\ConnectionInterface $connection, float $waitTimeout = self::DEFAULT_WAIT_TIMEOUT){ $this->cancelTimer($connection, 'disconnectTimer'); $this->setTimer($connection, 'disconnectTimer', $waitTimeout, function(Socket\ConnectionInterface $connection) use ($waitTimeout) { $errorMessage = sprintf(self::ERROR_WAIT_TIMEOUT, $waitTimeout); $this->connectionError( $connection, new Promise\Timer\TimeoutException($waitTimeout, $errorMessage) ); }); } /** * add new connection to global pool * @param Socket\ConnectionInterface $connection */ protected function addConnection(Socket\ConnectionInterface $connection){ if(!$this->hasConnection($connection)){ $this->connections->attach($connection, [ 'remoteAddress' => $connection->getRemoteAddress(), 'timers' => [] ]); // update maxConnections count $this->maxConnections = max($this->connections->count(), $this->maxConnections); $this->debug($connection, __FUNCTION__); } } /** * remove $connection from global connection pool * @param Socket\ConnectionInterface $connection */ protected function removeConnection(Socket\ConnectionInterface $connection){ if($this->hasConnection($connection)){ $this->debug($connection, __FUNCTION__); $this->cancelTimers($connection); $this->connections->detach($connection); } } /** * get new payload * @param string $task * @param null $load * @return array */ protected function newPayload(string $task, $load = null) : array { $payload = [ 'task' => $task, 'load' => $load ]; if($this->stats){ // add socket statistics $payload['stats'] = $this->getStats(); } return $payload; } /** * check if connection is "valid" and can be used for data transfer * @param Socket\ConnectionInterface $connection * @return bool */ protected function isValidConnection(Socket\ConnectionInterface $connection) : bool { return $connection->isReadable() || $connection->isWritable(); } /** * get socket server statistics * -> e.g. connected clients count * @return array */ protected function getStats() : array { return [ 'startup' => time() - $this->startupTime, 'connections' => $this->connections->count(), 'maxConnections' => $this->maxConnections ]; } /** * echo debug messages * @param Socket\ConnectionInterface $connection * @param string $method * @param string $message */ protected function debug(Socket\ConnectionInterface $connection, string $method, string $message = ''){ if(self::DEFAULT_DEBUG){ $data = [ date('Y-m-d H:i:s'), __CLASS__ . '()', $connection->getRemoteAddress(), $method . '()', $message ]; echo implode(array_filter($data), ' | ') . PHP_EOL; } } }