Implement websocket server

This commit is contained in:
David Majdandžić
2023-03-28 21:53:07 +02:00
parent c4d5a62a46
commit 44348e7a85
10 changed files with 493 additions and 32 deletions

View File

@@ -1,10 +1,10 @@
import EventEmitter from "events"; import EventEmitter from "events";
import {ClientEvents} from "../Client/ClientEvents"; import {ClientEvent} from "../Client/ClientEvent";
import {Job} from "../Job/Job"; import {Job} from "../Job/Job";
import {JobEvents} from "../Job/JobEvents"; import {JobEvents} from "../Job/JobEvents";
import Logger from "../Logger"; import Logger from "../Logger";
import {SmppSession} from "../SmppSession"; import {SmppSession} from "../SmppSession";
import {CenterEvents} from "./CenterEvents"; import {CenterEvent} from "./CenterEvent";
import CenterStatus from "./CenterStatus"; import CenterStatus from "./CenterStatus";
import {CenterPDUProcessor} from "./PDUProcessors/CenterPDUProcessor"; import {CenterPDUProcessor} from "./PDUProcessors/CenterPDUProcessor";
import {DebugProcessor} from "./PDUProcessors/DebugProcessor"; import {DebugProcessor} from "./PDUProcessors/DebugProcessor";
@@ -15,6 +15,7 @@ const smpp = require("smpp");
const MESSAGE_SEND_UPDATE_DELAY: number = Number(process.env.MESSAGE_SEND_UPDATE_DELAY) || 500; const MESSAGE_SEND_UPDATE_DELAY: number = Number(process.env.MESSAGE_SEND_UPDATE_DELAY) || 500;
export class Center implements SmppSession { export class Center implements SmppSession {
UPDATE_WS: string = "UPDATE_WS";
private pendingSessions: any[] = []; private pendingSessions: any[] = [];
private sessions: any[] = []; private sessions: any[] = [];
private nextSession: number = 0; private nextSession: number = 0;
@@ -33,6 +34,17 @@ export class Center implements SmppSession {
this.logger = new Logger(`Center-${id}`); this.logger = new Logger(`Center-${id}`);
// This does not work and I have not a clue as to why
// CenterEvent is undefined in fucking everywhere ever
// I don't get it at all
// I tried to bind the functions to this
// I tried to bind them to the event
// TODO: Fix this shit...
this.eventEmitter.on(CenterEvent.STATE_CHANGED, () => this.updateWs(CenterEvent.STATE_CHANGED));
this.eventEmitter.on(CenterEvent.STATUS_CHANGED, () => this.updateWs(CenterEvent.STATUS_CHANGED));
this.eventEmitter.on(CenterEvent.ANY_PDU, (pdu: any) => this.updateWs(CenterEvent.ANY_PDU, [pdu]));
this.eventEmitter.on(CenterEvent.MESSAGE_SEND_COUNTER_UPDATE_EVENT, (count: number) => this.updateWs(CenterEvent.MESSAGE_SEND_COUNTER_UPDATE_EVENT, [count]));
this.initialize(); this.initialize();
} }
@@ -55,7 +67,7 @@ export class Center implements SmppSession {
set processor(value: CenterPDUProcessor) { set processor(value: CenterPDUProcessor) {
this._processor = value; this._processor = value;
this.eventEmitter.emit(CenterEvents.STATE_CHANGED, this.serialize()); this.eventEmitter.emit(CenterEvent.STATE_CHANGED, this.serialize());
} }
private _defaultMultipleJob!: Job; private _defaultMultipleJob!: Job;
@@ -66,7 +78,7 @@ export class Center implements SmppSession {
set defaultMultipleJob(value: Job) { set defaultMultipleJob(value: Job) {
this._defaultMultipleJob = value; this._defaultMultipleJob = value;
this.eventEmitter.emit(CenterEvents.STATE_CHANGED, this.serialize()); this.eventEmitter.emit(CenterEvent.STATE_CHANGED, this.serialize());
} }
private _defaultSingleJob!: Job; private _defaultSingleJob!: Job;
@@ -77,7 +89,7 @@ export class Center implements SmppSession {
set defaultSingleJob(value: Job) { set defaultSingleJob(value: Job) {
this._defaultSingleJob = value; this._defaultSingleJob = value;
this.eventEmitter.emit(CenterEvents.STATE_CHANGED, this.serialize()); this.eventEmitter.emit(CenterEvent.STATE_CHANGED, this.serialize());
} }
private _password: string; private _password: string;
@@ -88,7 +100,7 @@ export class Center implements SmppSession {
set password(value: string) { set password(value: string) {
this._password = value; this._password = value;
this.eventEmitter.emit(CenterEvents.STATE_CHANGED, this.serialize()); this.eventEmitter.emit(CenterEvent.STATE_CHANGED, this.serialize());
} }
private _username: string; private _username: string;
@@ -99,7 +111,7 @@ export class Center implements SmppSession {
set username(value: string) { set username(value: string) {
this._username = value; this._username = value;
this.eventEmitter.emit(CenterEvents.STATE_CHANGED, this.serialize()); this.eventEmitter.emit(CenterEvent.STATE_CHANGED, this.serialize());
} }
private _status: CenterStatus = CenterStatus.WAITING_CONNECTION; private _status: CenterStatus = CenterStatus.WAITING_CONNECTION;
@@ -110,15 +122,44 @@ export class Center implements SmppSession {
set status(value: CenterStatus) { set status(value: CenterStatus) {
this._status = value; this._status = value;
this.eventEmitter.emit(CenterEvents.STATUS_CHANGED, this._status); this.eventEmitter.emit(CenterEvent.STATUS_CHANGED, this._status);
this.eventEmitter.emit(CenterEvents.STATE_CHANGED, this.serialize()); this.eventEmitter.emit(CenterEvent.STATE_CHANGED, this.serialize());
}
on(event: string, callback: (...args: any[]) => void): void {
this.eventEmitter.on(event, callback);
}
updateWs(event: string, args?: any[]): void {
this.logger.log1(`Update WS: ${event}`);
let message: {
type: string,
data?: string
} = {
type: event,
};
switch (event) {
case CenterEvent.STATE_CHANGED:
message.data = JSON.stringify(this.serialize());
break;
case CenterEvent.STATUS_CHANGED:
message.data = JSON.stringify(this._status);
break;
case CenterEvent.ANY_PDU:
message.data = JSON.stringify(args![0]);
break;
case CenterEvent.MESSAGE_SEND_COUNTER_UPDATE_EVENT:
message.data = JSON.stringify(args![0]);
break;
}
this.eventEmitter.emit(this.UPDATE_WS, message);
} }
initialize(): void { initialize(): void {
this._defaultSingleJob = Job.createEmptySingle(); this._defaultSingleJob = Job.createEmptySingle();
this._defaultMultipleJob = Job.createEmptyMultiple(); this._defaultMultipleJob = Job.createEmptyMultiple();
this._defaultSingleJob.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvents.STATE_CHANGED, this.serialize())); this._defaultSingleJob.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(CenterEvent.STATE_CHANGED, this.serialize()));
this._defaultMultipleJob.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvents.STATE_CHANGED, this.serialize())); this._defaultMultipleJob.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(CenterEvent.STATE_CHANGED, this.serialize()));
this.server = smpp.createServer({}, this.eventSessionConnected.bind(this)); this.server = smpp.createServer({}, this.eventSessionConnected.bind(this));
this.server.listen(this._port); this.server.listen(this._port);
@@ -169,7 +210,7 @@ export class Center implements SmppSession {
this.counterUpdateTimer = new NanoTimer(); this.counterUpdateTimer = new NanoTimer();
this.counterUpdateTimer.setInterval(() => { this.counterUpdateTimer.setInterval(() => {
if (previousUpdateCounter !== counter) { if (previousUpdateCounter !== counter) {
this.eventEmitter.emit(ClientEvents.MESSAGE_SEND_COUNTER_UPDATE_EVENT, counter); this.eventEmitter.emit(ClientEvent.MESSAGE_SEND_COUNTER_UPDATE_EVENT, counter);
previousUpdateCounter = counter; previousUpdateCounter = counter;
} }
}, '', `${MESSAGE_SEND_UPDATE_DELAY / 1000} s`); }, '', `${MESSAGE_SEND_UPDATE_DELAY / 1000} s`);
@@ -277,7 +318,7 @@ export class Center implements SmppSession {
session.on('bind_transceiver', this.eventBindTransceiver.bind(this, session)); session.on('bind_transceiver', this.eventBindTransceiver.bind(this, session));
session.on('pdu', this.eventAnyPdu.bind(this, session)); session.on('pdu', this.eventAnyPdu.bind(this, session));
this.updateStatus(); this.updateStatus();
this.eventEmitter.emit(CenterEvents.STATE_CHANGED, this.serialize()); this.eventEmitter.emit(CenterEvent.STATE_CHANGED, this.serialize());
} }
private eventSessionError(session: any): void { private eventSessionError(session: any): void {
@@ -303,7 +344,7 @@ export class Center implements SmppSession {
} }
private eventAnyPdu(session: any, pdu: any): void { private eventAnyPdu(session: any, pdu: any): void {
this.eventEmitter.emit(CenterEvents.ANY_PDU, pdu); this.eventEmitter.emit(CenterEvent.ANY_PDU, pdu);
this.processor.processPdu(session, pdu).then(() => { this.processor.processPdu(session, pdu).then(() => {
}, () => { }, () => {
}); });

View File

@@ -1,4 +1,4 @@
export class CenterEvents { export class CenterEvent {
static STATUS_CHANGED: "STATUS_CHANGED"; static STATUS_CHANGED: "STATUS_CHANGED";
static STATE_CHANGED: "STATE_CHANGED"; static STATE_CHANGED: "STATE_CHANGED";
static ANY_PDU: "ANY_PDU"; static ANY_PDU: "ANY_PDU";

View File

@@ -4,7 +4,7 @@ import {JobEvents} from "../Job/JobEvents";
import Logger from "../Logger"; import Logger from "../Logger";
import PersistentPromise from "../PersistentPromise"; import PersistentPromise from "../PersistentPromise";
import {SmppSession} from "../SmppSession"; import {SmppSession} from "../SmppSession";
import {ClientEvents} from "./ClientEvents"; import {ClientEvent} from "./ClientEvent";
import ClientStatus from "./ClientStatus"; import ClientStatus from "./ClientStatus";
const NanoTimer = require('nanotimer'); const NanoTimer = require('nanotimer');
@@ -16,13 +16,14 @@ const MESSAGE_SEND_UPDATE_DELAY: number = Number(process.env.MESSAGE_SEND_UPDATE
export class Client implements SmppSession { export class Client implements SmppSession {
defaultSingleJob!: Job; defaultSingleJob!: Job;
defaultMultipleJob!: Job; defaultMultipleJob!: Job;
private readonly eventEmitter: EventEmitter; UPDATE_WS: string = "UPDATE_WS";
private readonly eventEmitter: EventEmitter = new EventEmitter();
private readonly logger: Logger; private readonly logger: Logger;
private readonly _id: number; private readonly _id: number;
private session?: any; private session?: any;
private connectPromise: PersistentPromise | null = null; private connectPromise: PersistentPromise | null = null;
private bindPromise: PersistentPromise | null = null;
// TODO: Implement close promise // TODO: Implement close promise
private bindPromise: PersistentPromise | null = null;
// Apparently the sessions are not closed on a dime but instead a .close() call causes eventSessionClose // Apparently the sessions are not closed on a dime but instead a .close() call causes eventSessionClose
private sendTimer: any | null = null; private sendTimer: any | null = null;
private counterUpdateTimer: any | null = null; private counterUpdateTimer: any | null = null;
@@ -33,10 +34,14 @@ export class Client implements SmppSession {
this._username = username; this._username = username;
this._password = password; this._password = password;
this.eventEmitter = new EventEmitter();
this.logger = new Logger(`Client-${id}`); this.logger = new Logger(`Client-${id}`);
this.status = ClientStatus.NOT_CONNECTED; this.status = ClientStatus.NOT_CONNECTED;
this.eventEmitter.on(ClientEvent.STATE_CHANGED, () => this.updateWs(ClientEvent.STATE_CHANGED));
this.eventEmitter.on(ClientEvent.STATUS_CHANGED, () => this.updateWs(ClientEvent.STATUS_CHANGED));
this.eventEmitter.on(ClientEvent.ANY_PDU, (pdu: any) => this.updateWs(ClientEvent.ANY_PDU, [pdu]));
this.eventEmitter.on(ClientEvent.MESSAGE_SEND_COUNTER_UPDATE_EVENT, (count: number) => this.updateWs(ClientEvent.MESSAGE_SEND_COUNTER_UPDATE_EVENT, [count]));
this.initialize(); this.initialize();
} }
@@ -62,8 +67,33 @@ export class Client implements SmppSession {
set status(value: ClientStatus) { set status(value: ClientStatus) {
this._status = value; this._status = value;
this.eventEmitter.emit(ClientEvents.STATUS_CHANGED, this._status); this.eventEmitter.emit(ClientEvent.STATUS_CHANGED, this._status);
this.eventEmitter.emit(ClientEvents.STATE_CHANGED, this.serialize()); this.eventEmitter.emit(ClientEvent.STATE_CHANGED, this.serialize());
}
updateWs(event: string, args?: any[]): void {
this.logger.log1(`Update WS: ${event}`);
let message: {
type: string,
data?: string
} = {
type: event,
};
switch (event) {
case ClientEvent.STATE_CHANGED:
message.data = JSON.stringify(this.serialize());
break;
case ClientEvent.STATUS_CHANGED:
message.data = JSON.stringify(this._status);
break;
case ClientEvent.ANY_PDU:
message.data = JSON.stringify(args![0]);
break;
case ClientEvent.MESSAGE_SEND_COUNTER_UPDATE_EVENT:
message.data = JSON.stringify(args![0]);
break;
}
this.eventEmitter.emit(this.UPDATE_WS, message);
} }
getUrl(): string { getUrl(): string {
@@ -72,14 +102,14 @@ export class Client implements SmppSession {
setDefaultSingleJob(job: Job): void { setDefaultSingleJob(job: Job): void {
this.defaultSingleJob = job; this.defaultSingleJob = job;
this.eventEmitter.emit(ClientEvents.STATE_CHANGED, this.serialize()); this.eventEmitter.emit(ClientEvent.STATE_CHANGED, this.serialize());
job.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvents.STATE_CHANGED, this.serialize())); job.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvent.STATE_CHANGED, this.serialize()));
} }
setDefaultMultipleJob(job: Job): void { setDefaultMultipleJob(job: Job): void {
this.defaultMultipleJob = job; this.defaultMultipleJob = job;
this.eventEmitter.emit(ClientEvents.STATE_CHANGED, this.serialize()); this.eventEmitter.emit(ClientEvent.STATE_CHANGED, this.serialize());
job.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvents.STATE_CHANGED, this.serialize())); job.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvent.STATE_CHANGED, this.serialize()));
} }
getDefaultSingleJob(): Job { getDefaultSingleJob(): Job {
@@ -93,8 +123,8 @@ export class Client implements SmppSession {
initialize(): void { initialize(): void {
this.defaultSingleJob = Job.createEmptySingle(); this.defaultSingleJob = Job.createEmptySingle();
this.defaultMultipleJob = Job.createEmptyMultiple(); this.defaultMultipleJob = Job.createEmptyMultiple();
this.defaultSingleJob.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvents.STATE_CHANGED, this.serialize())); this.defaultSingleJob.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvent.STATE_CHANGED, this.serialize()));
this.defaultMultipleJob.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvents.STATE_CHANGED, this.serialize())); this.defaultMultipleJob.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvent.STATE_CHANGED, this.serialize()));
} }
doConnect(): PersistentPromise { doConnect(): PersistentPromise {
@@ -180,7 +210,7 @@ export class Client implements SmppSession {
this.counterUpdateTimer = new NanoTimer(); this.counterUpdateTimer = new NanoTimer();
this.counterUpdateTimer.setInterval(() => { this.counterUpdateTimer.setInterval(() => {
if (previousUpdateCounter !== counter) { if (previousUpdateCounter !== counter) {
this.eventEmitter.emit(ClientEvents.MESSAGE_SEND_COUNTER_UPDATE_EVENT, counter); this.eventEmitter.emit(ClientEvent.MESSAGE_SEND_COUNTER_UPDATE_EVENT, counter);
previousUpdateCounter = counter; previousUpdateCounter = counter;
} }
}, '', `${MESSAGE_SEND_UPDATE_DELAY / 1000} s`); }, '', `${MESSAGE_SEND_UPDATE_DELAY / 1000} s`);
@@ -255,7 +285,7 @@ export class Client implements SmppSession {
private eventAnyPdu(pdu: any): void { private eventAnyPdu(pdu: any): void {
this.logger.log6(`Client-${this._id} received PDU: ${JSON.stringify(pdu)}`); this.logger.log6(`Client-${this._id} received PDU: ${JSON.stringify(pdu)}`);
this.eventEmitter.emit(ClientEvents.ANY_PDU, pdu); this.eventEmitter.emit(ClientEvent.ANY_PDU, pdu);
} }
private eventSessionError(pdu: any): void { private eventSessionError(pdu: any): void {

View File

@@ -1,4 +1,4 @@
export class ClientEvents { export class ClientEvent {
static STATUS_CHANGED: "STATUS_CHANGED"; static STATUS_CHANGED: "STATUS_CHANGED";
static STATE_CHANGED: "STATE_CHANGED"; static STATE_CHANGED: "STATE_CHANGED";
static ANY_PDU: "ANY_PDU"; static ANY_PDU: "ANY_PDU";

View File

@@ -1,3 +1,4 @@
import EventEmitter from "events";
import fs from "fs"; import fs from "fs";
import {Client} from "./Client"; import {Client} from "./Client";
import {Job} from "../Job/Job"; import {Job} from "../Job/Job";
@@ -10,7 +11,10 @@ const CLIENT_SESSIONS_FILE: string = process.env.CLIENT_SESSIONS_FILE || "client
export default class ClientSessionManager implements SessionManager { export default class ClientSessionManager implements SessionManager {
sessionId: number; sessionId: number;
sessions: Client[]; sessions: Client[];
identifier: string = "client";
private readonly logger: any; private readonly logger: any;
readonly SESSION_ADDED_EVENT: string = "SESSION ADDED";
private readonly eventEmitter: EventEmitter = new EventEmitter();
constructor() { constructor() {
this.sessionId = 0; this.sessionId = 0;
@@ -18,10 +22,21 @@ export default class ClientSessionManager implements SessionManager {
this.logger = new Logger("ClientSessionManager"); this.logger = new Logger("ClientSessionManager");
} }
on(event: string, listener: (...args: any[]) => void): void {
this.eventEmitter.on(event, listener);
}
getSessions(): Promise<SmppSession[]> {
return new Promise<SmppSession[]>(resolve => {
resolve(this.sessions);
});
}
addSession(session: SmppSession): Promise<void> { addSession(session: SmppSession): Promise<void> {
return new Promise<void>((resolve, reject) => { return new Promise<void>((resolve, reject) => {
this.logger.log1(`Adding session with id ${session.getId()}`); this.logger.log1(`Adding session with id ${session.getId()}`);
this.sessions.push(session as Client); this.sessions.push(session as Client);
this.eventEmitter.emit(this.SESSION_ADDED_EVENT, session.getId());
resolve(); resolve();
}); });
} }

View File

@@ -3,6 +3,8 @@ import {SmppSession} from "./SmppSession";
export default interface SessionManager { export default interface SessionManager {
sessions: SmppSession[]; sessions: SmppSession[];
sessionId: number; sessionId: number;
identifier: string;
readonly SESSION_ADDED_EVENT: string;
addSession(session: SmppSession): Promise<void>; addSession(session: SmppSession): Promise<void>;
@@ -12,9 +14,13 @@ export default interface SessionManager {
getSession(id: number): Promise<SmppSession>; getSession(id: number): Promise<SmppSession>;
getSessions(): Promise<SmppSession[]>;
serialize(): object; serialize(): object;
cleanup(): void; cleanup(): void;
setup(): void; setup(): void;
on(event: string, listener: (...args: any[]) => void): void;
} }

View File

@@ -8,10 +8,14 @@ export interface SmppSession {
password: string, password: string,
defaultSingleJob: Job; defaultSingleJob: Job;
defaultMultipleJob: Job; defaultMultipleJob: Job;
readonly UPDATE_WS: string;
getDefaultSingleJob(): Job; getDefaultSingleJob(): Job;
setDefaultSingleJob(job: Job): void; setDefaultSingleJob(job: Job): void;
getDefaultMultipleJob(): Job; getDefaultMultipleJob(): Job;
setDefaultMultipleJob(job: Job): void; setDefaultMultipleJob(job: Job): void;
getId(): number; getId(): number;
@@ -19,9 +23,11 @@ export interface SmppSession {
sendPdu(pdu: object, force?: boolean): Promise<object>; sendPdu(pdu: object, force?: boolean): Promise<object>;
sendSingle(job: Job): Promise<object>; sendSingle(job: Job): Promise<object>;
sendSingleDefault(): Promise<object>; sendSingleDefault(): Promise<object>;
sendMultiple(job: Job): Promise<void>; sendMultiple(job: Job): Promise<void>;
sendMultipleDefault(): Promise<void>; sendMultipleDefault(): Promise<void>;
cancelSendInterval(): void; cancelSendInterval(): void;
@@ -31,4 +37,8 @@ export interface SmppSession {
initialize(): void; initialize(): void;
serialize(): object; serialize(): object;
on(event: string, callback: (...args: any[]) => void): void;
updateWs(event: string, args?: any[]): void;
} }

66
src/WS/ClientSet.ts Normal file
View File

@@ -0,0 +1,66 @@
import Logger from "../Logger";
import SessionManager from "../SessionManager";
import {SmppSession} from "../SmppSession";
export class ClientSet {
identifier: string;
private clients: any[];
private readonly type: string;
private readonly sessionId: number;
private readonly logger: Logger;
private readonly relevantSessionManager: SessionManager | undefined;
constructor(identifier: string, sessionManagers: SessionManager[]) {
this.clients = [];
this.identifier = identifier;
let data: string[] = identifier.split(':');
this.type = data[0];
this.sessionId = parseInt(data[1]);
this.logger = new Logger(`ClientSet-${this.type}-${this.sessionId}`);
this.relevantSessionManager = sessionManagers.find(sm => sm.identifier === this.type);
if (!this.relevantSessionManager) {
this.logger.log1(`No session manager found for type ${this.type}`);
return;
}
if (this.relevantSessionManager) {
this.relevantSessionManager.getSessions().then((sessions) => {
sessions.forEach((session) => {
this.attachListener(session);
});
});
}
this.relevantSessionManager.on(this.relevantSessionManager.SESSION_ADDED_EVENT, this.eventOnSessionAdded.bind(this));
}
eventOnSessionAdded(sessionId: number): void {
this.logger.log2(`Session added: ${sessionId}`);
this.relevantSessionManager?.getSession(sessionId).then((session) => {
this.attachListener(session);
})
}
add(ws: any): void {
this.logger.log2(`Added client`);
this.clients.push(ws);
ws.on('close', this.eventOnClose.bind(this));
}
eventOnClose(ws: any): void {
this.logger.log2(`Removed client`);
this.clients.splice(this.clients.indexOf(ws), 1);
}
notifyClients(message: string) {
this.logger.log2(`Notifying clients: ${message}`);
this.clients.forEach((ws) => {
ws.send(message);
});
}
private attachListener(session: SmppSession) {
session.on(session.UPDATE_WS, (message: string) => this.notifyClients(message));
}
}

291
src/WS/WSServer.ts Normal file
View File

@@ -0,0 +1,291 @@
import Logger from "../Logger";
import SessionManager from "../SessionManager";
import {ClientSet} from "./ClientSet";
const WebSocket = require("ws");
const WS_SERVER_PORT: number = Number(process.env.WS_SERVER_PORT) || 8191;
export class WSServer {
private readonly clients: ClientSet[];
private readonly unknownClients: any[];
private readonly server: any;
private readonly logger: Logger;
private readonly sessionManagers: SessionManager[];
constructor(sessionManagers: SessionManager[]) {
this.clients = [];
this.unknownClients = [];
this.server = new WebSocket.Server({port: WS_SERVER_PORT});
this.sessionManagers = sessionManagers;
this.logger = new Logger("WSServer");
this.server.on('connection', this.eventOnConnection.bind(this));
this.logger.log1(`WSServer listening atws://localhost:${WS_SERVER_PORT}`);
}
private eventOnConnection(ws: WebSocket): void {
this.logger.log1("New connection");
this.unknownClients.push(ws);
// @ts-ignore
ws.on('message', this.eventOnMessage.bind(this, ws));
// @ts-ignore
ws.on('close', this.eventOnClose.bind(this, ws));
}
private eventOnMessage(ws: any, message: string): void {
this.logger.log1("New message");
message = String(message);
this.unknownClients.splice(this.unknownClients.indexOf(ws), 1);
let clientSet: ClientSet | undefined = this.clients.find((clientSet: ClientSet) => clientSet.identifier === message);
if (!clientSet) {
clientSet = new ClientSet(message, this.sessionManagers);
}
clientSet.add(ws);
}
private eventOnClose(ws: any): void {
this.logger.log1("Connection closed");
this.unknownClients.splice(this.unknownClients.indexOf(ws), 1);
}
// constructor() {
// // @ts-ignore
// this.server = new WebSocket.Server({port: WS_SERVER_PORT});
// this.logger = new Logger("WSServer");
// this.server.on('connection', this.onConnection.bind(this));
// this.logger.log1(`WSServer listening at ws://localhost:${WS_SERVER_PORT}`);
// }
// onConnection(ws: WebSocket) {
// this.logger.log1("New connection");
// this.unknownClients.push(ws);
// ws.on('message', this.onMessage.bind(this, ws));
// ws.on('close', this.onClose.bind(this, ws));
// }
//
// addClient(ws, type, sessionId) {
// if (!this.clients[type]) {
// this.clients[type] = {};
// }
// if (!this.clients[type][sessionId]) {
// this.clients[type][sessionId] = [];
// }
// this.logger.log1(`Adding client ${ws.id} to ${type} session ${sessionId}`);
//
// if (type === "client") {
// if (this.listenersAlreadySetup.indexOf(`client-${sessionId}`) === -1) {
// let session = clientSessionManager.getSession(sessionId);
// if (!!session) {
// this.logger.log1(`Setting up listeners for client session ${sessionId}`);
// session.on(ClientSession.STATUS_CHANGED_EVENT, this.onClientSessionStatusChange.bind(this, sessionId));
// session.on(ClientSession.ANY_PDU_EVENT, this.onClientSessionPdu.bind(this, sessionId));
// session.on(ClientSession.MESSAGE_SEND_COUNTER_UPDATE_EVENT, this.onClientMessageCounterUpdate.bind(this, sessionId));
// }
// this.listenersAlreadySetup.push(`client-${sessionId}`);
// } else {
// this.logger.log1(`Listeners for client session ${sessionId} already set up`);
// }
// } else if (type === "center") {
// if (this.listenersAlreadySetup.indexOf(`center-${sessionId}`) === -1) {
// let session = centerSessionManager.getSession(sessionId);
// if (!!session) {
// this.logger.log1(`Setting up listeners for center session ${sessionId}`);
// session.on(CenterSession.STATUS_CHANGED_EVENT, this.onCenterStatusChange.bind(this, sessionId));
// session.on(CenterSession.ANY_PDU_EVENT, this.onCenterServerPdu.bind(this, sessionId));
// session.on(CenterSession.MODE_CHANGED_EVENT, this.onCenterModeChanged.bind(this, sessionId));
// session.on(CenterSession.SESSION_CHANGED_EVENT, this.onCenterSessionsChanged.bind(this, sessionId));
// session.on(ClientSession.MESSAGE_SEND_COUNTER_UPDATE_EVENT, this.onCenterMessageCounterUpdate.bind(this, sessionId));
// }
// this.listenersAlreadySetup.push(`center-${sessionId}`);
// } else {
// this.logger.log1(`Listeners for center session ${sessionId} already set up`);
// }
// }
//
// this.clients[type][sessionId].push(ws);
// this.logger.log1(`Now active ${this.clients[type][sessionId].length} clients in session ID: ${sessionId} of type ${type}`);
// }
//
// onMessage(ws, message) {
// this.logger.log1("New message");
// message = String(message);
// let data = message.split(":");
// let type = data[0];
// let sessionId = data[1];
//
// this.logger.log1(`Moving client to session ID: ${sessionId} of type ${type}`);
// delete this.unknownClients[ws];
// this.unknownClients = this.unknownClients.filter(Boolean);
//
// this.addClient(ws, type, sessionId);
// this.logger.log1(`Now active ${this.clients[type][sessionId].length} clients in session ID: ${sessionId} of type ${type}`);
// }
//
// onClose(ws) {
// this.removeClient(ws);
// // this.logger.log6(this.clients);
// this.logger.log1("Connection closed");
// }
//
// removeClient(ws) {
// this.clients.client = this.removeFromArray(this.clients.client, ws);
// this.clients.center = this.removeFromArray(this.clients.center, ws);
// }
//
// removeFromArray(array, element) {
// for (let sessionId in array) {
// let index = array[sessionId].indexOf(element);
// if (index > -1) {
// delete array[sessionId][index];
// }
// array[sessionId] = array[sessionId].filter(Boolean);
// if (array[sessionId].length === 0) {
// delete array[sessionId];
// }
// }
// return array;
// }
//
// onClientSessionStatusChange(sessionId, newStatus) {
// this.logger.log1(`Session with ID ${sessionId} changed`);
// let payload = {
// objectType: "client",
// type: 'status',
// sessionId: sessionId,
// value: newStatus
// }
// let clients = this.clients["client"][sessionId];
// if (!!clients) {
// this.logger.log1(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`);
// clients.forEach(client => {
// client.send(JSON.stringify(payload));
// });
// }
// }
//
// onClientSessionPdu(sessionId, pdu) {
// // TODO: Maybe move this to an "ignored" array against who the pdu.command is compared
// if (pdu.command === 'enquire_link_resp' || pdu.command === 'enquire_link') {
// return;
// }
// let clients = this.clients["client"][sessionId];
// if (!!clients) {
// this.logger.log2(`Session with ID ${sessionId} fired PDU`);
// let payload = {
// objectType: "client",
// type: 'pdu',
// sessionId: sessionId,
// value: pdu
// }
// this.logger.log2(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`);
// clients.forEach(client => {
// client.send(JSON.stringify(payload));
// });
// }
// }
//
// onClientMessageCounterUpdate(sessionId, counter) {
// this.logger.log2(`Session with ID ${sessionId} updating message send counter`);
// let payload = {
// objectType: "client",
// type: 'counterUpdate',
// sessionId: sessionId,
// value: counter
// }
// let clients = this.clients["client"][sessionId];
// if (!!clients) {
// this.logger.log2(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`);
// clients.forEach(client => {
// client.send(JSON.stringify(payload));
// });
// }
// }
//
// onCenterStatusChange(sessionId, newStatus) {
// this.logger.log1(`Session with ID ${sessionId} changed`);
// let payload = {
// objectType: "center",
// type: 'status',
// sessionId: sessionId,
// value: newStatus
// }
// let clients = this.clients["center"][sessionId];
// if (!!clients) {
// this.logger.log1(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`);
// clients.forEach(client => {
// client.send(JSON.stringify(payload));
// });
// }
// }
//
// onCenterServerPdu(sessionId, pdu) {
// if (pdu.command === 'enquire_link_resp' || pdu.command === 'enquire_link') {
// return;
// }
// let clients = this.clients["center"][sessionId];
// if (!!clients) {
// this.logger.log2(`Session with ID ${sessionId} fired PDU`);
// let payload = {
// objectType: "center",
// type: 'pdu',
// sessionId: sessionId,
// value: pdu
// }
// this.logger.log2(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`);
// clients.forEach(client => {
// client.send(JSON.stringify(payload));
// });
// }
// }
//
// onCenterModeChanged(sessionId, newMode) {
// this.logger.log1(`Session with ID ${sessionId} changed`);
// let payload = {
// objectType: "center",
// type: 'mode',
// sessionId: sessionId,
// value: newMode,
// text: CenterMode[newMode]
// }
// let clients = this.clients["center"][sessionId];
// if (!!clients) {
// this.logger.log1(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`);
// clients.forEach(client => {
// client.send(JSON.stringify(payload));
// });
// }
// }
//
// onCenterSessionsChanged(sessionId, newSession) {
// this.logger.log1(`Session with ID ${sessionId} changed`);
// let payload = {
// objectType: "center",
// type: 'sessions',
// sessionId: sessionId,
// value: newSession
// }
// let clients = this.clients["center"][sessionId];
// if (!!clients) {
// this.logger.log1(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`);
// clients.forEach(client => {
// client.send(JSON.stringify(payload));
// });
// }
// }
//
// onCenterMessageCounterUpdate(sessionId, counter) {
// this.logger.log2(`Session with ID ${sessionId} updating message send counter`);
// let payload = {
// objectType: "center",
// type: 'counterUpdate',
// sessionId: sessionId,
// value: counter
// }
// let clients = this.clients["center"][sessionId];
// if (!!clients) {
// this.logger.log2(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`);
// clients.forEach(client => {
// client.send(JSON.stringify(payload));
// });
// }
// }
}

View File

@@ -3,6 +3,7 @@ import {Client} from "./Client/Client";
import ClientSessionManager from "./Client/ClientSessionManager"; import ClientSessionManager from "./Client/ClientSessionManager";
import {Job} from "./Job/Job"; import {Job} from "./Job/Job";
import Logger from "./Logger"; import Logger from "./Logger";
import {WSServer} from "./WS/WSServer";
const smpp = require("smpp"); const smpp = require("smpp");
const fs = require("fs"); const fs = require("fs");
@@ -18,7 +19,6 @@ const {PDU} = require("smpp");
const app = express(); const app = express();
const SERVER_PORT: number = Number(process.env.SERVER_PORT) || 8190; const SERVER_PORT: number = Number(process.env.SERVER_PORT) || 8190;
const WS_SERVER_PORT: number = Number(process.env.WS_SERVER_PORT) || 8191;
const CENTER_SESSIONS_FILE: string = process.env.CENTER_SESSIONS_FILE || "center_sessions.json"; const CENTER_SESSIONS_FILE: string = process.env.CENTER_SESSIONS_FILE || "center_sessions.json";
const MESSAGE_SEND_UPDATE_DELAY: number = Number(process.env.MESSAGE_SEND_UPDATE_DELAY) || 500; const MESSAGE_SEND_UPDATE_DELAY: number = Number(process.env.MESSAGE_SEND_UPDATE_DELAY) || 500;
@@ -30,6 +30,8 @@ let logger = new Logger("main");
let clientManager: ClientSessionManager = new ClientSessionManager(); let clientManager: ClientSessionManager = new ClientSessionManager();
clientManager.setup(); clientManager.setup();
let wss: WSServer = new WSServer([clientManager]);
async function main() { async function main() {
// let client: Client = await clientManager.createSession("smpp://localhost:7000", "test", "test") as Client; // let client: Client = await clientManager.createSession("smpp://localhost:7000", "test", "test") as Client;
let client: Client = await clientManager.getSession(0) as Client; let client: Client = await clientManager.getSession(0) as Client;
@@ -65,7 +67,7 @@ async function main() {
// destination_addr: "1234567890", // destination_addr: "1234567890",
// short_message: "Hello World" // short_message: "Hello World"
// }), 100, 100)); // }), 100, 100));
center.close(); // center.close();
}, 1000); }, 1000);
} }