- removed requirement for ZMQ PHP extension and replaced the Socket stuff with native PHP code
This commit is contained in:
@@ -22,7 +22,7 @@ Make sure you meet the requirements before continue with the installation.
|
||||
- Port:`8020`
|
||||
- URI:`127.0.0.1:8020` (Your WebServer (e.g. Nginx) should pass all WebSocket connections to this source)
|
||||
|
||||
**TCP Socket connection (Internal use fore WebServer <=> WebSocket communication)**
|
||||
**TCP TcpSocket connection (Internal use fore WebServer <=> WebSocket communication)**
|
||||
- Host:`127.0.0.1` (=> Assumed WebServer and WebSocket Server running on the same machine)
|
||||
- Port:`5555`
|
||||
- URI: `tcp://127.0.0.1:5555`
|
||||
|
||||
@@ -485,43 +485,44 @@ class MapUpdate implements MessageComponentInterface {
|
||||
/**
|
||||
* receive data from TCP socket (main App)
|
||||
* -> send response back
|
||||
* @param $data
|
||||
* @param string $task
|
||||
* @param null $load
|
||||
* @return bool|float|int|null
|
||||
*/
|
||||
public function receiveData($data){
|
||||
$data = (array)json_decode($data, true);
|
||||
$load = $data['load'];
|
||||
$task = $data['task'];
|
||||
$response = false;
|
||||
public function receiveData(string $task, $load = null){
|
||||
$responseLoad = null;
|
||||
|
||||
switch($task){
|
||||
case 'healthCheck':
|
||||
$this->healthCheckToken = (float)$load;
|
||||
$responseLoad = $this->healthCheckToken;
|
||||
break;
|
||||
case 'characterUpdate':
|
||||
$response = $this->updateCharacterData($load);
|
||||
$this->updateCharacterData($load);
|
||||
$mapIds = $this->getMapIdsByCharacterId((int)$load['id']);
|
||||
$this->broadcastMapSubscriptions('mapSubscriptions', $mapIds);
|
||||
break;
|
||||
case 'characterLogout':
|
||||
$response = $this->unSubscribeCharacterIds($load);
|
||||
$responseLoad = $this->unSubscribeCharacterIds($load);
|
||||
break;
|
||||
case 'mapConnectionAccess':
|
||||
$response = $this->setConnectionAccess($load);
|
||||
$responseLoad = $this->setConnectionAccess($load);
|
||||
break;
|
||||
case 'mapAccess':
|
||||
$response = $this->setAccess($task, $load);
|
||||
$responseLoad = $this->setAccess($task, $load);
|
||||
break;
|
||||
case 'mapUpdate':
|
||||
$response = $this->broadcastMapUpdate($task, $load);
|
||||
$responseLoad = $this->broadcastMapUpdate($task, $load);
|
||||
break;
|
||||
case 'mapDeleted':
|
||||
$response = $this->deleteMapId($task, $load);
|
||||
break;
|
||||
case 'healthCheck':
|
||||
$this->healthCheckToken = (float)$load;
|
||||
$response = 'OK';
|
||||
$responseLoad = $this->deleteMapId($task, $load);
|
||||
break;
|
||||
case 'logData':
|
||||
$this->handleLogData((array)$load['meta'], (array)$load['log']);
|
||||
break;
|
||||
}
|
||||
|
||||
return $responseLoad;
|
||||
}
|
||||
|
||||
private function setCharacterData(array $characterData){
|
||||
|
||||
549
app/Socket/TcpSocket.php
Normal file
549
app/Socket/TcpSocket.php
Normal file
@@ -0,0 +1,549 @@
|
||||
<?php
|
||||
/**
|
||||
* Created by PhpStorm.
|
||||
* User: Exodus 4D
|
||||
* Date: 15.02.2019
|
||||
* Time: 14:29
|
||||
*/
|
||||
|
||||
namespace Exodus4D\Socket\Socket;
|
||||
|
||||
|
||||
use React\EventLoop;
|
||||
use React\Socket;
|
||||
use React\Promise;
|
||||
use React\Stream;
|
||||
use Clue\React\NDJson;
|
||||
use Ratchet\MessageComponentInterface;
|
||||
|
||||
class TcpSocket {
|
||||
|
||||
/**
|
||||
* error message for unknown acceptType
|
||||
* @see TcpSocket::DEFAULT_ACCEPT_TYPE
|
||||
*/
|
||||
const ERROR_ACCEPT_TYPE = "Unknown acceptType: '%s'";
|
||||
|
||||
/**
|
||||
* error message for connected stream is not readable
|
||||
*/
|
||||
const ERROR_STREAM_NOT_READABLE = "Stream is not readable. Remote address: '%s'";
|
||||
|
||||
/**
|
||||
* error message for connection stream is not writable
|
||||
*/
|
||||
const ERROR_STREAM_NOT_WRITABLE = "Stream is not writable. Remote address: '%s'";
|
||||
|
||||
/**
|
||||
* error message for missing 'task' key in payload
|
||||
*/
|
||||
const ERROR_TASK_MISSING = "Missing 'task' in payload";
|
||||
|
||||
/**
|
||||
* error message for unknown 'task' key in payload
|
||||
*/
|
||||
const ERROR_TASK_UNKNOWN = "Unknown 'task': '%s' in payload";
|
||||
|
||||
/**
|
||||
* error message for missing method
|
||||
*/
|
||||
const ERROR_METHOD_MISSING = "Method '%S' not found";
|
||||
|
||||
/**
|
||||
* error message for waitTimeout exceeds
|
||||
* @see TcpSocket::DEFAULT_WAIT_TIMEOUT
|
||||
*/
|
||||
const ERROR_WAIT_TIMEOUT = "Exceeds 'waitTimeout': %ss";
|
||||
|
||||
/**
|
||||
* default for: accepted data type
|
||||
* -> affects en/decoding socket data
|
||||
*/
|
||||
const DEFAULT_ACCEPT_TYPE = 'json';
|
||||
|
||||
/**
|
||||
* default for: wait timeout
|
||||
* -> timeout until connection gets closed
|
||||
* timeout should be "reset" right after successful response send to client
|
||||
*/
|
||||
const DEFAULT_WAIT_TIMEOUT = 3.0;
|
||||
|
||||
/**
|
||||
* default for: send response by end() method (rather than write())
|
||||
* -> connection will get closed right after successful response send to client
|
||||
*/
|
||||
const DEFAULT_END_WITH_RESPONSE = true;
|
||||
|
||||
/**
|
||||
* default for: add socket statistic to response payload
|
||||
*/
|
||||
const DEFAULT_STATS = true;
|
||||
|
||||
/**
|
||||
* default for: echo debug messages
|
||||
*/
|
||||
const DEFAULT_DEBUG = false;
|
||||
|
||||
/**
|
||||
* global server loop
|
||||
* @var EventLoop\LoopInterface
|
||||
*/
|
||||
private $loop;
|
||||
|
||||
/**
|
||||
* @var MessageComponentInterface
|
||||
*/
|
||||
private $handler;
|
||||
|
||||
/**
|
||||
* @see TcpSocket::DEFAULT_ACCEPT_TYPE
|
||||
* @var string
|
||||
*/
|
||||
private $acceptType = self::DEFAULT_ACCEPT_TYPE;
|
||||
|
||||
/**
|
||||
* @see TcpSocket::DEFAULT_WAIT_TIMEOUT
|
||||
* @var float
|
||||
*/
|
||||
private $waitTimeout = self::DEFAULT_WAIT_TIMEOUT;
|
||||
|
||||
/**
|
||||
* @see TcpSocket::DEFAULT_END_WITH_RESPONSE
|
||||
* @var bool
|
||||
*/
|
||||
private $endWithResponse = self::DEFAULT_END_WITH_RESPONSE;
|
||||
|
||||
/**
|
||||
* @see TcpSocket::DEFAULT_STATS
|
||||
* @var bool
|
||||
*/
|
||||
private $stats = self::DEFAULT_STATS;
|
||||
|
||||
/**
|
||||
* storage for all active connections
|
||||
* -> can be used to get current count of connected clients
|
||||
* @var \SplObjectStorage
|
||||
*/
|
||||
private $connections;
|
||||
|
||||
/**
|
||||
* max count of concurrent open connections
|
||||
* -> represents number of active connected clients
|
||||
* @var int
|
||||
*/
|
||||
private $maxConnections = 0;
|
||||
|
||||
/**
|
||||
* timestamp on startup
|
||||
* @var int
|
||||
*/
|
||||
private $startupTime = 0;
|
||||
|
||||
/**
|
||||
* TcpSocket constructor.
|
||||
* @param EventLoop\LoopInterface $loop
|
||||
* @param MessageComponentInterface $handler
|
||||
* @param string $acceptType
|
||||
* @param float $waitTimeout
|
||||
* @param bool $endWithResponse
|
||||
*/
|
||||
public function __construct(
|
||||
EventLoop\LoopInterface $loop,
|
||||
MessageComponentInterface $handler,
|
||||
string $acceptType = self::DEFAULT_ACCEPT_TYPE,
|
||||
float $waitTimeout = self::DEFAULT_WAIT_TIMEOUT,
|
||||
bool $endWithResponse = self::DEFAULT_END_WITH_RESPONSE
|
||||
){
|
||||
$this->loop = $loop;
|
||||
$this->handler = $handler;
|
||||
$this->acceptType = $acceptType;
|
||||
$this->waitTimeout = $waitTimeout;
|
||||
$this->endWithResponse = $endWithResponse;
|
||||
$this->connections = new \SplObjectStorage();
|
||||
$this->startupTime = time();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Socket\ConnectionInterface $connection
|
||||
*/
|
||||
public function onConnect(Socket\ConnectionInterface $connection){
|
||||
$this->debug($connection, __FUNCTION__, '----------------------------------------');
|
||||
|
||||
if($this->isValidConnection($connection)){
|
||||
// connection can be used
|
||||
// add connection to global connection pool
|
||||
$this->addConnection($connection);
|
||||
// set waitTimeout timer for connection
|
||||
$this->setTimerTimeout($connection, $this->waitTimeout);
|
||||
|
||||
$this->debug($connection, __FUNCTION__);
|
||||
|
||||
// register connection events ... -------------------------------------------------------------------------
|
||||
$this->initRead($connection)
|
||||
->then($this->initDispatch())
|
||||
->then($this->initResponse($connection))
|
||||
->then(
|
||||
function(string $message) use ($connection) {
|
||||
$this->debug($connection, 'DONE', $message);
|
||||
},
|
||||
function(\Exception $e) use ($connection) {
|
||||
$this->debug($connection, 'ERROR', $e->getMessage());
|
||||
$this->connectionError($connection, $e);
|
||||
});
|
||||
|
||||
$connection->on('end', function() use ($connection) {
|
||||
$this->debug($connection, 'onEnd');
|
||||
});
|
||||
|
||||
$connection->on('close', function() use ($connection){
|
||||
$this->debug($connection, 'onClose');
|
||||
$this->removeConnection($connection);
|
||||
});
|
||||
|
||||
$connection->on('error', function(\Exception $e) use ($connection) {
|
||||
$this->debug($connection, 'onError', $e->getMessage());
|
||||
});
|
||||
}else{
|
||||
// invalid connection -> can not be used
|
||||
$connection->close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Socket\ConnectionInterface $connection
|
||||
* @return Promise\PromiseInterface
|
||||
*/
|
||||
protected function initRead(Socket\ConnectionInterface $connection) : Promise\PromiseInterface {
|
||||
if($connection->isReadable()){
|
||||
if('json' == $this->acceptType){
|
||||
// new empty stream for processing JSON
|
||||
$stream = new Stream\ThroughStream();
|
||||
$streamDecoded = new NDJson\Decoder($stream, true);
|
||||
|
||||
// promise get resolved on first emit('data')
|
||||
$promise = Promise\Stream\first($streamDecoded);
|
||||
|
||||
// register on('data') for main input stream
|
||||
$connection->once('data', function ($chunk) use ($stream) {
|
||||
// send current data chunk to processing stream -> resolves promise
|
||||
$stream->emit('data', [$chunk]);
|
||||
});
|
||||
|
||||
return $promise;
|
||||
}else{
|
||||
return new Promise\RejectedPromise(
|
||||
new \InvalidArgumentException(
|
||||
sprintf(self::ERROR_ACCEPT_TYPE, $this->acceptType)
|
||||
)
|
||||
);
|
||||
}
|
||||
}else{
|
||||
return new Promise\RejectedPromise(
|
||||
new \Exception(
|
||||
sprintf(self::ERROR_STREAM_NOT_READABLE, $connection->getRemoteAddress())
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* init dispatcher for payload
|
||||
* @return callable
|
||||
*/
|
||||
protected function initDispatch() : callable {
|
||||
return function($payload) : Promise\PromiseInterface {
|
||||
$task = (string)$payload['task'];
|
||||
if(!empty($task)){
|
||||
$load = $payload['load'];
|
||||
$deferred = new Promise\Deferred();
|
||||
$this->dispatch($deferred, $task, $load);
|
||||
return $deferred->promise();
|
||||
}else{
|
||||
return new Promise\RejectedPromise(
|
||||
new \InvalidArgumentException(self::ERROR_TASK_MISSING)
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Promise\Deferred $deferred
|
||||
* @param string $task
|
||||
* @param null $load
|
||||
*/
|
||||
protected function dispatch(Promise\Deferred $deferred, string $task, $load = null) : void {
|
||||
|
||||
switch($task){
|
||||
case 'getStats':
|
||||
$deferred->resolve($this->newPayload($task, null));
|
||||
break;
|
||||
case 'healthCheck':
|
||||
case 'characterUpdate':
|
||||
case 'characterLogout':
|
||||
case 'mapConnectionAccess':
|
||||
case 'mapAccess':
|
||||
case 'mapUpdate':
|
||||
case 'mapDeleted':
|
||||
case 'logData':
|
||||
if(method_exists($this->handler, 'receiveData')){
|
||||
$deferred->resolve(
|
||||
$this->newPayload(
|
||||
$task,
|
||||
call_user_func_array([$this->handler, 'receiveData'], [$task, $load])
|
||||
)
|
||||
);
|
||||
}else{
|
||||
$deferred->reject(new \Exception(sprintf(self::ERROR_METHOD_MISSING, 'receiveData')));
|
||||
}
|
||||
break;
|
||||
default:
|
||||
$deferred->reject(new \InvalidArgumentException(sprintf(self::ERROR_TASK_UNKNOWN, $task)));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Socket\ConnectionInterface $connection
|
||||
* @return callable
|
||||
*/
|
||||
protected function initResponse(Socket\ConnectionInterface $connection) : callable {
|
||||
return function(array $payload) use ($connection) : Promise\PromiseInterface {
|
||||
$this->debug($connection, 'initResponse');
|
||||
|
||||
$deferred = new Promise\Deferred();
|
||||
$this->write($deferred, $connection, $payload);
|
||||
|
||||
return $deferred->promise();
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Promise\Deferred $deferred
|
||||
* @param Socket\ConnectionInterface $connection
|
||||
* @param array $payload
|
||||
*/
|
||||
protected function write(Promise\Deferred $deferred, Socket\ConnectionInterface $connection, array $payload) : void {
|
||||
$write = false;
|
||||
if($connection->isWritable()){
|
||||
if('json' == $this->acceptType){
|
||||
$connection = new NDJson\Encoder($connection);
|
||||
}
|
||||
|
||||
// write a new chunk of data to connection stream
|
||||
$write = $connection->write($payload);
|
||||
|
||||
if($this->endWithResponse){
|
||||
// connection should be closed (and removed from this socket server)
|
||||
$connection->end();
|
||||
}
|
||||
}
|
||||
|
||||
if($write){
|
||||
$deferred->resolve('OK');
|
||||
}else{
|
||||
$deferred->reject(new \Exception(
|
||||
sprintf(self::ERROR_STREAM_NOT_WRITABLE, $connection->getRemoteAddress())
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* $connection has error
|
||||
* -> if writable -> end() connection with $payload (close() is called by default)
|
||||
* -> if readable -> close() connection
|
||||
* @param Socket\ConnectionInterface $connection
|
||||
* @param \Exception $e
|
||||
*/
|
||||
protected function connectionError(Socket\ConnectionInterface $connection, \Exception $e){
|
||||
$this->debug($connection, __FUNCTION__, $e->getMessage());
|
||||
|
||||
if($connection->isWritable()){
|
||||
if('json' == $this->acceptType){
|
||||
$connection = new NDJson\Encoder($connection);
|
||||
}
|
||||
|
||||
// send "end" data, then close
|
||||
$connection->end($this->newPayload('error', $e->getMessage()));
|
||||
}else{
|
||||
// close connection
|
||||
$connection->close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* check if $connection is found in global pool
|
||||
* @param Socket\ConnectionInterface $connection
|
||||
* @return bool
|
||||
*/
|
||||
protected function hasConnection(Socket\ConnectionInterface $connection) : bool {
|
||||
return $this->connections->contains($connection);
|
||||
}
|
||||
|
||||
/**
|
||||
* cancels a previously set timer callback for a $connection
|
||||
* @param Socket\ConnectionInterface $connection
|
||||
* @param string $timerName
|
||||
*/
|
||||
protected function cancelTimer(Socket\ConnectionInterface $connection, string $timerName){
|
||||
if(
|
||||
$this->hasConnection($connection) &&
|
||||
($data = (array)$this->connections->offsetGet($connection)) &&
|
||||
isset($data['timers']) && isset($data['timers'][$timerName]) &&
|
||||
($data['timers'][$timerName] instanceof EventLoop\Timer\TimerInterface)
|
||||
){
|
||||
$this->loop->cancelTimer($data['timers'][$timerName]);
|
||||
|
||||
unset($data['timers'][$timerName]);
|
||||
$this->connections->offsetSet($connection, $data);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* cancels all previously set timers for a $connection
|
||||
* @param Socket\ConnectionInterface $connection
|
||||
*/
|
||||
protected function cancelTimers(Socket\ConnectionInterface $connection){
|
||||
if(
|
||||
$this->hasConnection($connection) &&
|
||||
($data = (array)$this->connections->offsetGet($connection)) &&
|
||||
isset($data['timers'])
|
||||
){
|
||||
foreach((array)$data['timers'] as $timerName => $timer){
|
||||
$this->loop->cancelTimer($timer);
|
||||
}
|
||||
|
||||
$data['timers'] = [];
|
||||
$this->connections->offsetSet($connection, $data);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Socket\ConnectionInterface $connection
|
||||
* @param string $timerName
|
||||
* @param float $interval
|
||||
* @param callable $timerCallback
|
||||
*/
|
||||
protected function setTimer(Socket\ConnectionInterface $connection, string $timerName, float $interval, callable $timerCallback){
|
||||
if(
|
||||
$this->hasConnection($connection) &&
|
||||
($data = (array)$this->connections->offsetGet($connection)) &&
|
||||
isset($data['timers'])
|
||||
){
|
||||
$data['timers'][$timerName] = $this->loop->addTimer($interval, function() use ($connection, $timerCallback) {
|
||||
$timerCallback($connection);
|
||||
});
|
||||
|
||||
// store new timer to $connection
|
||||
$this->connections->offsetSet($connection, $data);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* cancels and removes previous connection timeout timers
|
||||
* -> set new connection timeout
|
||||
* @param Socket\ConnectionInterface $connection
|
||||
* @param float $waitTimeout
|
||||
*/
|
||||
protected function setTimerTimeout(Socket\ConnectionInterface $connection, float $waitTimeout = self::DEFAULT_WAIT_TIMEOUT){
|
||||
$this->cancelTimer($connection, 'disconnectTimer');
|
||||
$this->setTimer($connection, 'disconnectTimer', $waitTimeout, function(Socket\ConnectionInterface $connection) use ($waitTimeout) {
|
||||
$errorMessage = sprintf(self::ERROR_WAIT_TIMEOUT, $waitTimeout);
|
||||
|
||||
$this->connectionError(
|
||||
$connection,
|
||||
new Promise\Timer\TimeoutException($waitTimeout, $errorMessage)
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* add new connection to global pool
|
||||
* @param Socket\ConnectionInterface $connection
|
||||
*/
|
||||
protected function addConnection(Socket\ConnectionInterface $connection){
|
||||
if(!$this->hasConnection($connection)){
|
||||
$this->connections->attach($connection, [
|
||||
'remoteAddress' => $connection->getRemoteAddress(),
|
||||
'timers' => []
|
||||
]);
|
||||
|
||||
// update maxConnections count
|
||||
$this->maxConnections = max($this->connections->count(), $this->maxConnections);
|
||||
|
||||
$this->debug($connection, __FUNCTION__);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* remove $connection from global connection pool
|
||||
* @param Socket\ConnectionInterface $connection
|
||||
*/
|
||||
protected function removeConnection(Socket\ConnectionInterface $connection){
|
||||
if($this->hasConnection($connection)){
|
||||
$this->debug($connection, __FUNCTION__);
|
||||
$this->cancelTimers($connection);
|
||||
$this->connections->detach($connection);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* get new payload
|
||||
* @param string $task
|
||||
* @param null $load
|
||||
* @return array
|
||||
*/
|
||||
protected function newPayload(string $task, $load = null) : array {
|
||||
$payload = [
|
||||
'task' => $task,
|
||||
'load' => $load
|
||||
];
|
||||
|
||||
if($this->stats){
|
||||
// add socket statistics
|
||||
$payload['stats'] = $this->getStats();
|
||||
}
|
||||
|
||||
return $payload;
|
||||
}
|
||||
|
||||
/**
|
||||
* check if connection is "valid" and can be used for data transfer
|
||||
* @param Socket\ConnectionInterface $connection
|
||||
* @return bool
|
||||
*/
|
||||
protected function isValidConnection(Socket\ConnectionInterface $connection) : bool {
|
||||
return $connection->isReadable() || $connection->isWritable();
|
||||
}
|
||||
|
||||
/**
|
||||
* get socket server statistics
|
||||
* -> e.g. connected clients count
|
||||
* @return array
|
||||
*/
|
||||
protected function getStats() : array {
|
||||
return [
|
||||
'startup' => time() - $this->startupTime,
|
||||
'connections' => $this->connections->count(),
|
||||
'maxConnections' => $this->maxConnections
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* echo debug messages
|
||||
* @param Socket\ConnectionInterface $connection
|
||||
* @param string $method
|
||||
* @param string $message
|
||||
*/
|
||||
protected function debug(Socket\ConnectionInterface $connection, string $method, string $message = ''){
|
||||
if(self::DEFAULT_DEBUG){
|
||||
$data = [
|
||||
date('Y-m-d H:i:s'),
|
||||
__CLASS__ . '()',
|
||||
$connection->getRemoteAddress(),
|
||||
$method . '()',
|
||||
$message
|
||||
];
|
||||
|
||||
echo implode(array_filter($data), ' | ') . PHP_EOL;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -8,52 +8,77 @@
|
||||
|
||||
namespace Exodus4D\Socket;
|
||||
|
||||
|
||||
use Exodus4D\Socket\Socket\TcpSocket;
|
||||
use React\EventLoop;
|
||||
use React\Socket;
|
||||
use Ratchet\Server\IoServer;
|
||||
use Ratchet\Http\HttpServer;
|
||||
use Ratchet\WebSocket\WsServer;
|
||||
|
||||
use React;
|
||||
|
||||
class WebSockets {
|
||||
|
||||
protected $dns;
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
protected $dsn;
|
||||
|
||||
/**
|
||||
* @var int
|
||||
*/
|
||||
protected $wsListenPort;
|
||||
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
protected $wsListenHost;
|
||||
|
||||
function __construct($dns, $wsListenPort, $wsListenHost){
|
||||
$this->dns = $dns;
|
||||
$this->wsListenPort = (int)$wsListenPort;
|
||||
/**
|
||||
* WebSockets constructor.
|
||||
* @param string $dsn
|
||||
* @param int $wsListenPort
|
||||
* @param string $wsListenHost
|
||||
*/
|
||||
function __construct(string $dsn, int $wsListenPort, string $wsListenHost){
|
||||
$this->dsn = $dsn;
|
||||
$this->wsListenPort = $wsListenPort;
|
||||
$this->wsListenHost = $wsListenHost;
|
||||
|
||||
$this->startMapSocket();
|
||||
}
|
||||
|
||||
private function startMapSocket(){
|
||||
$loop = React\EventLoop\Factory::create();
|
||||
// global EventLoop
|
||||
$loop = EventLoop\Factory::create();
|
||||
|
||||
// Listen for the web server to make a ZeroMQ push after an ajax request
|
||||
$context = new React\ZMQ\Context($loop);
|
||||
|
||||
// Socket for map update data -------------------------------------------------------------
|
||||
$pull = $context->getSocket(\ZMQ::SOCKET_PULL);
|
||||
// Binding to 127.0.0.1 means, the only client that can connect is itself
|
||||
$pull->bind( $this->dns );
|
||||
|
||||
// main app -> inject socket for response
|
||||
// global MessageComponent (main app) (handles all business logic)
|
||||
$mapUpdate = new Main\MapUpdate();
|
||||
// "onMessage" listener
|
||||
$pull->on('message', [$mapUpdate, 'receiveData']);
|
||||
|
||||
// Socket for log data --------------------------------------------------------------------
|
||||
//$pullSocketLogs = $context->getSocket(\ZMQ::SOCKET_PULL);
|
||||
//$pullSocketLogs->bind( "tcp://127.0.0.1:5555" );
|
||||
//$pullSocketLogs->on('message', [$mapUpdate, 'receiveLogData']) ;
|
||||
// TCP Socket -------------------------------------------------------------------------------------------------
|
||||
$tcpSocket = new TcpSocket($loop, $mapUpdate);
|
||||
// TCP Server (WebServer <-> TCPServer <-> TCPSocket communication)
|
||||
$server = new Socket\Server($this->dsn, $loop, [
|
||||
'tcp' => [
|
||||
'backlog' => 20,
|
||||
'so_reuseport' => true
|
||||
]
|
||||
]);
|
||||
|
||||
$server->on('connection', function(Socket\ConnectionInterface $connection) use ($tcpSocket){
|
||||
$tcpSocket->onConnect($connection);
|
||||
});
|
||||
|
||||
$server->on('error', function(\Exception $e){
|
||||
echo 'error: ' . $e->getMessage() . PHP_EOL;
|
||||
});
|
||||
|
||||
// WebSocketServer --------------------------------------------------------------------------------------------
|
||||
|
||||
// Binding to 0.0.0.0 means remotes can connect (Web Clients)
|
||||
$webSocketURI = $this->wsListenHost . ':' . $this->wsListenPort;
|
||||
|
||||
// Set up our WebSocket server for clients wanting real-time updates
|
||||
$webSock = new React\Socket\Server($webSocketURI, $loop);
|
||||
// Set up our WebSocket server for clients subscriptions
|
||||
$webSock = new Socket\TcpServer($webSocketURI, $loop);
|
||||
new IoServer(
|
||||
new HttpServer(
|
||||
new WsServer(
|
||||
|
||||
4
cmd.php
4
cmd.php
@@ -25,9 +25,9 @@ if(PHP_SAPI === 'cli'){
|
||||
$host = (!empty($options['pf_host'])) ? $options['pf_host'] : '127.0.0.1' ;
|
||||
$port = (!empty($options['pf_port'])) ? (int)$options['pf_port'] : 5555 ;
|
||||
|
||||
$dns = 'tcp://' . $host . ':' . $port;
|
||||
$dsn = 'tcp://' . $host . ':' . $port;
|
||||
|
||||
new Socket\WebSockets($dns, $wsListenPort, $wsListenHost);
|
||||
new Socket\WebSockets($dsn, $wsListenPort, $wsListenHost);
|
||||
}else{
|
||||
echo "Script need to be called by CLI!";
|
||||
}
|
||||
|
||||
@@ -15,9 +15,13 @@
|
||||
}
|
||||
},
|
||||
"require": {
|
||||
"php-64bit": ">=7.0",
|
||||
"ext-zmq": ">=1.1.3",
|
||||
"php-64bit": ">=7.1",
|
||||
"ext-json": "*",
|
||||
"cboden/ratchet": "0.4.x",
|
||||
"react/zmq": "0.3.*"
|
||||
"react/promise-stream": "1.1.*",
|
||||
"clue/ndjson-react": "1.0.*"
|
||||
},
|
||||
"suggest": {
|
||||
"ext-event": "If installed, 'ExtEventLoop' class will get used as default event loop. Better performance. https://pecl.php.net/package/event"
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user