Merge pull request #12 from exodus4d/develop

v1.2.0
This commit is contained in:
Mark Friedrich
2019-05-10 16:27:23 +02:00
committed by GitHub
6 changed files with 719 additions and 70 deletions

111
README.md
View File

@@ -1,37 +1,100 @@
### [WIP] WebSocket server for [Pathfinder](https://github.com/exodus4d/pathfinder)
## WebSocket server for [Pathfinder](https://github.com/exodus4d/pathfinder)
####Requirements:
1. A working instance of *[Pathfinder](https://github.com/exodus4d/pathfinder)* **(>= v1.2.0)**.
2. A working installation of *[ØMQ](http://zeromq.org/area:download)* **(>= v4.2.0)**.
Which is a "network library" written in C (very fast) that handles TCP based socket connections
between your existing _Pathfinder_ installation and this WebSocket server extension. [download *ØMQ*](http://zeromq.org/area:download).
3. A new [PHP extension for *ØMQ*](http://zeromq.org/bindings:php) that handles the communication between this WebSocket server and *ØMQ*. **(>= v1.1.3)**
### Requirements
- A working instance of *[Pathfinder](https://github.com/exodus4d/pathfinder)* **( v1.2.0)**
- [_Composer_](https://getcomposer.org/download/) to install packages for the WebSocket server
####Install:
Make sure you meet the requirements before continue with the installation.
1. Install [Composer](https://getcomposer.org/download/)
### Install
1. Checkout this project in a **new** folder (NOT the install for _Pathfinder_ itself) e.g. `/var/www/websocket.pathfinder`
1. Install [_Composer_](https://getcomposer.org/download/)
2. Install Composer dependencies from `composer.json` file:
- `$ composer install` OR
- `$ php composer.phar install` (change composer.phar path to your Composer directory)
3. Start WebSocket server `php cmd.php`
- `$ cd /var/www/websocket.pathfinder`
- `$ composer install`
3. Start WebSocket server `$ php cmd.php`
### Configuration
#### Default
####Default Configuration
**Clients (WebBrowser) listen for connections**
- Host:`0.0.0.0.` (=> any client can connect)
- Port:`8020`
- URI:`127.0.0.1:8020` (Your WebServer (e.g. Nginx) should pass all WebSocket connections to this source)
- Host: `0.0.0.0.` (=> any client can connect)
- Port: `8020`
- URI: `127.0.0.1:8020` (Your WebServer (e.g. Nginx) should pass all WebSocket connections to this source)
**TCP Socket connection (Internal use fore WebServer <=> WebSocket communication)**
- Host:`127.0.0.1` (=> Assumed WebServer and WebSocket Server running on the same machine)
- Port:`5555`
**TCP TcpSocket connection (Internal use for WebServer WebSocket communication)**
- Host: `127.0.0.1` (=> Assumed WebServer and WebSocket Server running on the same machine)
- Port: `5555`
- URI: `tcp://127.0.0.1:5555`
#### Custom [Optional]
**[Optional]**
The default configuration should be fine for most installations.
You can change/overwrite the default **Host** and **Port** configuration by adding additional CLI parameters when starting the WebSocket server:
`php cmd.php --pf_listen_host [CLIENTS_HOST] --pf_listen_port [CLIENTS_PORT] --pf_host [TCP_HOST] --pf_port [TCP_PORT]`
`$ php cmd.php --pf_listen_host [CLIENTS_HOST] --pf_listen_port [CLIENTS_PORT] --pf_host [TCP_HOST] --pf_port [TCP_PORT]`
### Unix Service (systemd)
####Info:
#### New Service
It is recommended to wrap the `cmd.php` script in a Unix service, that over control the WebSocket server.
This creates a systemd service on CentOS7:
1. `$ cd /etc/systemd/system`
2. `$ vi websocket.pathfinder.service`
3. Copy script and adjust `ExecStart` and `WorkingDirectory` values:
```
[Unit]
Description = WebSocket server (Pathfinder) [LIVE] environment
After = multi-user.target
[Service]
Type = idle
ExecStart = /usr/bin/php /var/www/websocket.pathfinder/pathfinder_websocket/cmd.php
WorkingDirectory = /var/www/websocket.pathfinder/pathfinder_websocket
TimeoutStopSec = 0
Restart = always
LimitNOFILE = 10000
Nice = 10
[Install]
WantedBy = multi-user.target
```
Now you can use the service to start/stop/restart your WebSocket server
- `$ systemctl start websocket.pathfinder.service`
- `$ systemctl restart websocket.pathfinder.service`
- `$ systemctl stop websocket.pathfinder.service`
#### Auto-Restart the Service
You can automatically restart your service (e.g. on _EVE-Online_ downtime). Create a new "timer" for the automatic restart.
1. `$ cd /etc/systemd/system` (same dir as before)
2. `$ vi restart.websocket.pathfinder.timer`
3. Copy script:
```
[Unit]
Description = Restart timer (EVE downtime) for WebSocket server [LIVE]
[Timer]
OnCalendar = *-*-* 12:01:00
Persistent = true
[Install]
WantedBy = timer.target
```
Now we need a new "restart service" for the timer:
1. `$ cd /etc/systemd/system` (same dir as before)
2. `$ vi restart.websocket.pathfinder.service`
3. Copy script:
```
[Unit]
Description = Restart (periodically) WebSocket server [LIVE]
[Service]
Type = oneshot
ExecStart = /usr/bin/systemctl try-restart websocket.pathfinder.service
```
### Info
- [*Ratchet*](http://socketo.me/) - "WebSockets for PHP"

View File

@@ -360,7 +360,14 @@ class MapUpdate implements MessageComponentInterface {
* @return array
*/
private function getCharacterIdsByMapId(int $mapId) : array {
return array_keys((array)$this->subscriptions[$mapId]);
$characterIds = [];
if(
array_key_exists($mapId, $this->subscriptions) &&
is_array($this->subscriptions[$mapId])
){
$characterIds = array_keys($this->subscriptions[$mapId]);
}
return $characterIds;
}
/**
@@ -485,43 +492,44 @@ class MapUpdate implements MessageComponentInterface {
/**
* receive data from TCP socket (main App)
* -> send response back
* @param $data
* @param string $task
* @param null $load
* @return bool|float|int|null
*/
public function receiveData($data){
$data = (array)json_decode($data, true);
$load = $data['load'];
$task = $data['task'];
$response = false;
public function receiveData(string $task, $load = null){
$responseLoad = null;
switch($task){
case 'healthCheck':
$this->healthCheckToken = (float)$load;
$responseLoad = $this->healthCheckToken;
break;
case 'characterUpdate':
$response = $this->updateCharacterData($load);
$this->updateCharacterData($load);
$mapIds = $this->getMapIdsByCharacterId((int)$load['id']);
$this->broadcastMapSubscriptions('mapSubscriptions', $mapIds);
break;
case 'characterLogout':
$response = $this->unSubscribeCharacterIds($load);
$responseLoad = $this->unSubscribeCharacterIds($load);
break;
case 'mapConnectionAccess':
$response = $this->setConnectionAccess($load);
$responseLoad = $this->setConnectionAccess($load);
break;
case 'mapAccess':
$response = $this->setAccess($task, $load);
$responseLoad = $this->setAccess($task, $load);
break;
case 'mapUpdate':
$response = $this->broadcastMapUpdate($task, $load);
$responseLoad = $this->broadcastMapUpdate($task, $load);
break;
case 'mapDeleted':
$response = $this->deleteMapId($task, $load);
break;
case 'healthCheck':
$this->healthCheckToken = (float)$load;
$response = 'OK';
$responseLoad = $this->deleteMapId($task, $load);
break;
case 'logData':
$this->handleLogData((array)$load['meta'], (array)$load['log']);
break;
}
return $responseLoad;
}
private function setCharacterData(array $characterData){

549
app/Socket/TcpSocket.php Normal file
View File

@@ -0,0 +1,549 @@
<?php
/**
* Created by PhpStorm.
* User: Exodus 4D
* Date: 15.02.2019
* Time: 14:29
*/
namespace Exodus4D\Socket\Socket;
use React\EventLoop;
use React\Socket;
use React\Promise;
use React\Stream;
use Clue\React\NDJson;
use Ratchet\MessageComponentInterface;
class TcpSocket {
/**
* error message for unknown acceptType
* @see TcpSocket::DEFAULT_ACCEPT_TYPE
*/
const ERROR_ACCEPT_TYPE = "Unknown acceptType: '%s'";
/**
* error message for connected stream is not readable
*/
const ERROR_STREAM_NOT_READABLE = "Stream is not readable. Remote address: '%s'";
/**
* error message for connection stream is not writable
*/
const ERROR_STREAM_NOT_WRITABLE = "Stream is not writable. Remote address: '%s'";
/**
* error message for missing 'task' key in payload
*/
const ERROR_TASK_MISSING = "Missing 'task' in payload";
/**
* error message for unknown 'task' key in payload
*/
const ERROR_TASK_UNKNOWN = "Unknown 'task': '%s' in payload";
/**
* error message for missing method
*/
const ERROR_METHOD_MISSING = "Method '%S' not found";
/**
* error message for waitTimeout exceeds
* @see TcpSocket::DEFAULT_WAIT_TIMEOUT
*/
const ERROR_WAIT_TIMEOUT = "Exceeds 'waitTimeout': %ss";
/**
* default for: accepted data type
* -> affects en/decoding socket data
*/
const DEFAULT_ACCEPT_TYPE = 'json';
/**
* default for: wait timeout
* -> timeout until connection gets closed
* timeout should be "reset" right after successful response send to client
*/
const DEFAULT_WAIT_TIMEOUT = 3.0;
/**
* default for: send response by end() method (rather than write())
* -> connection will get closed right after successful response send to client
*/
const DEFAULT_END_WITH_RESPONSE = true;
/**
* default for: add socket statistic to response payload
*/
const DEFAULT_STATS = true;
/**
* default for: echo debug messages
*/
const DEFAULT_DEBUG = false;
/**
* global server loop
* @var EventLoop\LoopInterface
*/
private $loop;
/**
* @var MessageComponentInterface
*/
private $handler;
/**
* @see TcpSocket::DEFAULT_ACCEPT_TYPE
* @var string
*/
private $acceptType = self::DEFAULT_ACCEPT_TYPE;
/**
* @see TcpSocket::DEFAULT_WAIT_TIMEOUT
* @var float
*/
private $waitTimeout = self::DEFAULT_WAIT_TIMEOUT;
/**
* @see TcpSocket::DEFAULT_END_WITH_RESPONSE
* @var bool
*/
private $endWithResponse = self::DEFAULT_END_WITH_RESPONSE;
/**
* @see TcpSocket::DEFAULT_STATS
* @var bool
*/
private $stats = self::DEFAULT_STATS;
/**
* storage for all active connections
* -> can be used to get current count of connected clients
* @var \SplObjectStorage
*/
private $connections;
/**
* max count of concurrent open connections
* -> represents number of active connected clients
* @var int
*/
private $maxConnections = 0;
/**
* timestamp on startup
* @var int
*/
private $startupTime = 0;
/**
* TcpSocket constructor.
* @param EventLoop\LoopInterface $loop
* @param MessageComponentInterface $handler
* @param string $acceptType
* @param float $waitTimeout
* @param bool $endWithResponse
*/
public function __construct(
EventLoop\LoopInterface $loop,
MessageComponentInterface $handler,
string $acceptType = self::DEFAULT_ACCEPT_TYPE,
float $waitTimeout = self::DEFAULT_WAIT_TIMEOUT,
bool $endWithResponse = self::DEFAULT_END_WITH_RESPONSE
){
$this->loop = $loop;
$this->handler = $handler;
$this->acceptType = $acceptType;
$this->waitTimeout = $waitTimeout;
$this->endWithResponse = $endWithResponse;
$this->connections = new \SplObjectStorage();
$this->startupTime = time();
}
/**
* @param Socket\ConnectionInterface $connection
*/
public function onConnect(Socket\ConnectionInterface $connection){
$this->debug($connection, __FUNCTION__, '----------------------------------------');
if($this->isValidConnection($connection)){
// connection can be used
// add connection to global connection pool
$this->addConnection($connection);
// set waitTimeout timer for connection
$this->setTimerTimeout($connection, $this->waitTimeout);
$this->debug($connection, __FUNCTION__);
// register connection events ... -------------------------------------------------------------------------
$this->initRead($connection)
->then($this->initDispatch())
->then($this->initResponse($connection))
->then(
function(string $message) use ($connection) {
$this->debug($connection, 'DONE', $message);
},
function(\Exception $e) use ($connection) {
$this->debug($connection, 'ERROR', $e->getMessage());
$this->connectionError($connection, $e);
});
$connection->on('end', function() use ($connection) {
$this->debug($connection, 'onEnd');
});
$connection->on('close', function() use ($connection){
$this->debug($connection, 'onClose');
$this->removeConnection($connection);
});
$connection->on('error', function(\Exception $e) use ($connection) {
$this->debug($connection, 'onError', $e->getMessage());
});
}else{
// invalid connection -> can not be used
$connection->close();
}
}
/**
* @param Socket\ConnectionInterface $connection
* @return Promise\PromiseInterface
*/
protected function initRead(Socket\ConnectionInterface $connection) : Promise\PromiseInterface {
if($connection->isReadable()){
if('json' == $this->acceptType){
// new empty stream for processing JSON
$stream = new Stream\ThroughStream();
$streamDecoded = new NDJson\Decoder($stream, true);
// promise get resolved on first emit('data')
$promise = Promise\Stream\first($streamDecoded);
// register on('data') for main input stream
$connection->on('data', function ($chunk) use ($stream) {
// send current data chunk to processing stream -> resolves promise
$stream->emit('data', [$chunk]);
});
return $promise;
}else{
return new Promise\RejectedPromise(
new \InvalidArgumentException(
sprintf(self::ERROR_ACCEPT_TYPE, $this->acceptType)
)
);
}
}else{
return new Promise\RejectedPromise(
new \Exception(
sprintf(self::ERROR_STREAM_NOT_READABLE, $connection->getRemoteAddress())
)
);
}
}
/**
* init dispatcher for payload
* @return callable
*/
protected function initDispatch() : callable {
return function($payload) : Promise\PromiseInterface {
$task = (string)$payload['task'];
if(!empty($task)){
$load = $payload['load'];
$deferred = new Promise\Deferred();
$this->dispatch($deferred, $task, $load);
return $deferred->promise();
}else{
return new Promise\RejectedPromise(
new \InvalidArgumentException(self::ERROR_TASK_MISSING)
);
}
};
}
/**
* @param Promise\Deferred $deferred
* @param string $task
* @param null $load
*/
protected function dispatch(Promise\Deferred $deferred, string $task, $load = null) : void {
switch($task){
case 'getStats':
$deferred->resolve($this->newPayload($task, null));
break;
case 'healthCheck':
case 'characterUpdate':
case 'characterLogout':
case 'mapConnectionAccess':
case 'mapAccess':
case 'mapUpdate':
case 'mapDeleted':
case 'logData':
if(method_exists($this->handler, 'receiveData')){
$deferred->resolve(
$this->newPayload(
$task,
call_user_func_array([$this->handler, 'receiveData'], [$task, $load])
)
);
}else{
$deferred->reject(new \Exception(sprintf(self::ERROR_METHOD_MISSING, 'receiveData')));
}
break;
default:
$deferred->reject(new \InvalidArgumentException(sprintf(self::ERROR_TASK_UNKNOWN, $task)));
}
}
/**
* @param Socket\ConnectionInterface $connection
* @return callable
*/
protected function initResponse(Socket\ConnectionInterface $connection) : callable {
return function(array $payload) use ($connection) : Promise\PromiseInterface {
$this->debug($connection, 'initResponse');
$deferred = new Promise\Deferred();
$this->write($deferred, $connection, $payload);
return $deferred->promise();
};
}
/**
* @param Promise\Deferred $deferred
* @param Socket\ConnectionInterface $connection
* @param array $payload
*/
protected function write(Promise\Deferred $deferred, Socket\ConnectionInterface $connection, array $payload) : void {
$write = false;
if($connection->isWritable()){
if('json' == $this->acceptType){
$connection = new NDJson\Encoder($connection);
}
// write a new chunk of data to connection stream
$write = $connection->write($payload);
if($this->endWithResponse){
// connection should be closed (and removed from this socket server)
$connection->end();
}
}
if($write){
$deferred->resolve('OK');
}else{
$deferred->reject(new \Exception(
sprintf(self::ERROR_STREAM_NOT_WRITABLE, $connection->getRemoteAddress())
));
}
}
/**
* $connection has error
* -> if writable -> end() connection with $payload (close() is called by default)
* -> if readable -> close() connection
* @param Socket\ConnectionInterface $connection
* @param \Exception $e
*/
protected function connectionError(Socket\ConnectionInterface $connection, \Exception $e){
$this->debug($connection, __FUNCTION__, $e->getMessage());
if($connection->isWritable()){
if('json' == $this->acceptType){
$connection = new NDJson\Encoder($connection);
}
// send "end" data, then close
$connection->end($this->newPayload('error', $e->getMessage()));
}else{
// close connection
$connection->close();
}
}
/**
* check if $connection is found in global pool
* @param Socket\ConnectionInterface $connection
* @return bool
*/
protected function hasConnection(Socket\ConnectionInterface $connection) : bool {
return $this->connections->contains($connection);
}
/**
* cancels a previously set timer callback for a $connection
* @param Socket\ConnectionInterface $connection
* @param string $timerName
*/
protected function cancelTimer(Socket\ConnectionInterface $connection, string $timerName){
if(
$this->hasConnection($connection) &&
($data = (array)$this->connections->offsetGet($connection)) &&
isset($data['timers']) && isset($data['timers'][$timerName]) &&
($data['timers'][$timerName] instanceof EventLoop\Timer\TimerInterface)
){
$this->loop->cancelTimer($data['timers'][$timerName]);
unset($data['timers'][$timerName]);
$this->connections->offsetSet($connection, $data);
}
}
/**
* cancels all previously set timers for a $connection
* @param Socket\ConnectionInterface $connection
*/
protected function cancelTimers(Socket\ConnectionInterface $connection){
if(
$this->hasConnection($connection) &&
($data = (array)$this->connections->offsetGet($connection)) &&
isset($data['timers'])
){
foreach((array)$data['timers'] as $timerName => $timer){
$this->loop->cancelTimer($timer);
}
$data['timers'] = [];
$this->connections->offsetSet($connection, $data);
}
}
/**
* @param Socket\ConnectionInterface $connection
* @param string $timerName
* @param float $interval
* @param callable $timerCallback
*/
protected function setTimer(Socket\ConnectionInterface $connection, string $timerName, float $interval, callable $timerCallback){
if(
$this->hasConnection($connection) &&
($data = (array)$this->connections->offsetGet($connection)) &&
isset($data['timers'])
){
$data['timers'][$timerName] = $this->loop->addTimer($interval, function() use ($connection, $timerCallback) {
$timerCallback($connection);
});
// store new timer to $connection
$this->connections->offsetSet($connection, $data);
}
}
/**
* cancels and removes previous connection timeout timers
* -> set new connection timeout
* @param Socket\ConnectionInterface $connection
* @param float $waitTimeout
*/
protected function setTimerTimeout(Socket\ConnectionInterface $connection, float $waitTimeout = self::DEFAULT_WAIT_TIMEOUT){
$this->cancelTimer($connection, 'disconnectTimer');
$this->setTimer($connection, 'disconnectTimer', $waitTimeout, function(Socket\ConnectionInterface $connection) use ($waitTimeout) {
$errorMessage = sprintf(self::ERROR_WAIT_TIMEOUT, $waitTimeout);
$this->connectionError(
$connection,
new Promise\Timer\TimeoutException($waitTimeout, $errorMessage)
);
});
}
/**
* add new connection to global pool
* @param Socket\ConnectionInterface $connection
*/
protected function addConnection(Socket\ConnectionInterface $connection){
if(!$this->hasConnection($connection)){
$this->connections->attach($connection, [
'remoteAddress' => $connection->getRemoteAddress(),
'timers' => []
]);
// update maxConnections count
$this->maxConnections = max($this->connections->count(), $this->maxConnections);
$this->debug($connection, __FUNCTION__);
}
}
/**
* remove $connection from global connection pool
* @param Socket\ConnectionInterface $connection
*/
protected function removeConnection(Socket\ConnectionInterface $connection){
if($this->hasConnection($connection)){
$this->debug($connection, __FUNCTION__);
$this->cancelTimers($connection);
$this->connections->detach($connection);
}
}
/**
* get new payload
* @param string $task
* @param null $load
* @return array
*/
protected function newPayload(string $task, $load = null) : array {
$payload = [
'task' => $task,
'load' => $load
];
if($this->stats){
// add socket statistics
$payload['stats'] = $this->getStats();
}
return $payload;
}
/**
* check if connection is "valid" and can be used for data transfer
* @param Socket\ConnectionInterface $connection
* @return bool
*/
protected function isValidConnection(Socket\ConnectionInterface $connection) : bool {
return $connection->isReadable() || $connection->isWritable();
}
/**
* get socket server statistics
* -> e.g. connected clients count
* @return array
*/
protected function getStats() : array {
return [
'startup' => time() - $this->startupTime,
'connections' => $this->connections->count(),
'maxConnections' => $this->maxConnections
];
}
/**
* echo debug messages
* @param Socket\ConnectionInterface $connection
* @param string $method
* @param string $message
*/
protected function debug(Socket\ConnectionInterface $connection, string $method, string $message = ''){
if(self::DEFAULT_DEBUG){
$data = [
date('Y-m-d H:i:s'),
__CLASS__ . '()',
$connection->getRemoteAddress(),
$method . '()',
$message
];
echo implode(array_filter($data), ' | ') . PHP_EOL;
}
}
}

View File

@@ -8,52 +8,77 @@
namespace Exodus4D\Socket;
use Exodus4D\Socket\Socket\TcpSocket;
use React\EventLoop;
use React\Socket;
use Ratchet\Server\IoServer;
use Ratchet\Http\HttpServer;
use Ratchet\WebSocket\WsServer;
use React;
class WebSockets {
protected $dns;
/**
* @var string
*/
protected $dsn;
/**
* @var int
*/
protected $wsListenPort;
/**
* @var string
*/
protected $wsListenHost;
function __construct($dns, $wsListenPort, $wsListenHost){
$this->dns = $dns;
$this->wsListenPort = (int)$wsListenPort;
/**
* WebSockets constructor.
* @param string $dsn
* @param int $wsListenPort
* @param string $wsListenHost
*/
function __construct(string $dsn, int $wsListenPort, string $wsListenHost){
$this->dsn = $dsn;
$this->wsListenPort = $wsListenPort;
$this->wsListenHost = $wsListenHost;
$this->startMapSocket();
}
private function startMapSocket(){
$loop = React\EventLoop\Factory::create();
// global EventLoop
$loop = EventLoop\Factory::create();
// Listen for the web server to make a ZeroMQ push after an ajax request
$context = new React\ZMQ\Context($loop);
// Socket for map update data -------------------------------------------------------------
$pull = $context->getSocket(\ZMQ::SOCKET_PULL);
// Binding to 127.0.0.1 means, the only client that can connect is itself
$pull->bind( $this->dns );
// main app -> inject socket for response
// global MessageComponent (main app) (handles all business logic)
$mapUpdate = new Main\MapUpdate();
// "onMessage" listener
$pull->on('message', [$mapUpdate, 'receiveData']);
// Socket for log data --------------------------------------------------------------------
//$pullSocketLogs = $context->getSocket(\ZMQ::SOCKET_PULL);
//$pullSocketLogs->bind( "tcp://127.0.0.1:5555" );
//$pullSocketLogs->on('message', [$mapUpdate, 'receiveLogData']) ;
// TCP Socket -------------------------------------------------------------------------------------------------
$tcpSocket = new TcpSocket($loop, $mapUpdate);
// TCP Server (WebServer <-> TCPServer <-> TCPSocket communication)
$server = new Socket\Server($this->dsn, $loop, [
'tcp' => [
'backlog' => 20,
'so_reuseport' => true
]
]);
$server->on('connection', function(Socket\ConnectionInterface $connection) use ($tcpSocket){
$tcpSocket->onConnect($connection);
});
$server->on('error', function(\Exception $e){
echo 'error: ' . $e->getMessage() . PHP_EOL;
});
// WebSocketServer --------------------------------------------------------------------------------------------
// Binding to 0.0.0.0 means remotes can connect (Web Clients)
$webSocketURI = $this->wsListenHost . ':' . $this->wsListenPort;
// Set up our WebSocket server for clients wanting real-time updates
$webSock = new React\Socket\Server($webSocketURI, $loop);
// Set up our WebSocket server for clients subscriptions
$webSock = new Socket\TcpServer($webSocketURI, $loop);
new IoServer(
new HttpServer(
new WsServer(

View File

@@ -25,9 +25,9 @@ if(PHP_SAPI === 'cli'){
$host = (!empty($options['pf_host'])) ? $options['pf_host'] : '127.0.0.1' ;
$port = (!empty($options['pf_port'])) ? (int)$options['pf_port'] : 5555 ;
$dns = 'tcp://' . $host . ':' . $port;
$dsn = 'tcp://' . $host . ':' . $port;
new Socket\WebSockets($dns, $wsListenPort, $wsListenHost);
new Socket\WebSockets($dsn, $wsListenPort, $wsListenHost);
}else{
echo "Script need to be called by CLI!";
}

View File

@@ -15,9 +15,13 @@
}
},
"require": {
"php-64bit": ">=7.0",
"ext-zmq": ">=1.1.3",
"php-64bit": ">=7.1",
"ext-json": "*",
"cboden/ratchet": "0.4.x",
"react/zmq": "0.3.*"
"react/promise-stream": "1.1.*",
"clue/ndjson-react": "1.0.*"
},
"suggest": {
"ext-event": "If installed, 'ExtEventLoop' class will get used as default event loop. Better performance. https://pecl.php.net/package/event"
}
}