diff --git a/src/Center/Center.ts b/src/Center/Center.ts index 67ae0ea..564cd41 100644 --- a/src/Center/Center.ts +++ b/src/Center/Center.ts @@ -1,10 +1,10 @@ import EventEmitter from "events"; -import {ClientEvents} from "../Client/ClientEvents"; +import {ClientEvent} from "../Client/ClientEvent"; import {Job} from "../Job/Job"; import {JobEvents} from "../Job/JobEvents"; import Logger from "../Logger"; import {SmppSession} from "../SmppSession"; -import {CenterEvents} from "./CenterEvents"; +import {CenterEvent} from "./CenterEvent"; import CenterStatus from "./CenterStatus"; import {CenterPDUProcessor} from "./PDUProcessors/CenterPDUProcessor"; import {DebugProcessor} from "./PDUProcessors/DebugProcessor"; @@ -15,6 +15,7 @@ const smpp = require("smpp"); const MESSAGE_SEND_UPDATE_DELAY: number = Number(process.env.MESSAGE_SEND_UPDATE_DELAY) || 500; export class Center implements SmppSession { + UPDATE_WS: string = "UPDATE_WS"; private pendingSessions: any[] = []; private sessions: any[] = []; private nextSession: number = 0; @@ -33,6 +34,17 @@ export class Center implements SmppSession { this.logger = new Logger(`Center-${id}`); + // This does not work and I have not a clue as to why + // CenterEvent is undefined in fucking everywhere ever + // I don't get it at all + // I tried to bind the functions to this + // I tried to bind them to the event + // TODO: Fix this shit... + this.eventEmitter.on(CenterEvent.STATE_CHANGED, () => this.updateWs(CenterEvent.STATE_CHANGED)); + this.eventEmitter.on(CenterEvent.STATUS_CHANGED, () => this.updateWs(CenterEvent.STATUS_CHANGED)); + this.eventEmitter.on(CenterEvent.ANY_PDU, (pdu: any) => this.updateWs(CenterEvent.ANY_PDU, [pdu])); + this.eventEmitter.on(CenterEvent.MESSAGE_SEND_COUNTER_UPDATE_EVENT, (count: number) => this.updateWs(CenterEvent.MESSAGE_SEND_COUNTER_UPDATE_EVENT, [count])); + this.initialize(); } @@ -55,7 +67,7 @@ export class Center implements SmppSession { set processor(value: CenterPDUProcessor) { this._processor = value; - this.eventEmitter.emit(CenterEvents.STATE_CHANGED, this.serialize()); + this.eventEmitter.emit(CenterEvent.STATE_CHANGED, this.serialize()); } private _defaultMultipleJob!: Job; @@ -66,7 +78,7 @@ export class Center implements SmppSession { set defaultMultipleJob(value: Job) { this._defaultMultipleJob = value; - this.eventEmitter.emit(CenterEvents.STATE_CHANGED, this.serialize()); + this.eventEmitter.emit(CenterEvent.STATE_CHANGED, this.serialize()); } private _defaultSingleJob!: Job; @@ -77,7 +89,7 @@ export class Center implements SmppSession { set defaultSingleJob(value: Job) { this._defaultSingleJob = value; - this.eventEmitter.emit(CenterEvents.STATE_CHANGED, this.serialize()); + this.eventEmitter.emit(CenterEvent.STATE_CHANGED, this.serialize()); } private _password: string; @@ -88,7 +100,7 @@ export class Center implements SmppSession { set password(value: string) { this._password = value; - this.eventEmitter.emit(CenterEvents.STATE_CHANGED, this.serialize()); + this.eventEmitter.emit(CenterEvent.STATE_CHANGED, this.serialize()); } private _username: string; @@ -99,7 +111,7 @@ export class Center implements SmppSession { set username(value: string) { this._username = value; - this.eventEmitter.emit(CenterEvents.STATE_CHANGED, this.serialize()); + this.eventEmitter.emit(CenterEvent.STATE_CHANGED, this.serialize()); } private _status: CenterStatus = CenterStatus.WAITING_CONNECTION; @@ -110,15 +122,44 @@ export class Center implements SmppSession { set status(value: CenterStatus) { this._status = value; - this.eventEmitter.emit(CenterEvents.STATUS_CHANGED, this._status); - this.eventEmitter.emit(CenterEvents.STATE_CHANGED, this.serialize()); + this.eventEmitter.emit(CenterEvent.STATUS_CHANGED, this._status); + this.eventEmitter.emit(CenterEvent.STATE_CHANGED, this.serialize()); + } + + on(event: string, callback: (...args: any[]) => void): void { + this.eventEmitter.on(event, callback); + } + + updateWs(event: string, args?: any[]): void { + this.logger.log1(`Update WS: ${event}`); + let message: { + type: string, + data?: string + } = { + type: event, + }; + switch (event) { + case CenterEvent.STATE_CHANGED: + message.data = JSON.stringify(this.serialize()); + break; + case CenterEvent.STATUS_CHANGED: + message.data = JSON.stringify(this._status); + break; + case CenterEvent.ANY_PDU: + message.data = JSON.stringify(args![0]); + break; + case CenterEvent.MESSAGE_SEND_COUNTER_UPDATE_EVENT: + message.data = JSON.stringify(args![0]); + break; + } + this.eventEmitter.emit(this.UPDATE_WS, message); } initialize(): void { this._defaultSingleJob = Job.createEmptySingle(); this._defaultMultipleJob = Job.createEmptyMultiple(); - this._defaultSingleJob.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvents.STATE_CHANGED, this.serialize())); - this._defaultMultipleJob.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvents.STATE_CHANGED, this.serialize())); + this._defaultSingleJob.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(CenterEvent.STATE_CHANGED, this.serialize())); + this._defaultMultipleJob.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(CenterEvent.STATE_CHANGED, this.serialize())); this.server = smpp.createServer({}, this.eventSessionConnected.bind(this)); this.server.listen(this._port); @@ -169,7 +210,7 @@ export class Center implements SmppSession { this.counterUpdateTimer = new NanoTimer(); this.counterUpdateTimer.setInterval(() => { if (previousUpdateCounter !== counter) { - this.eventEmitter.emit(ClientEvents.MESSAGE_SEND_COUNTER_UPDATE_EVENT, counter); + this.eventEmitter.emit(ClientEvent.MESSAGE_SEND_COUNTER_UPDATE_EVENT, counter); previousUpdateCounter = counter; } }, '', `${MESSAGE_SEND_UPDATE_DELAY / 1000} s`); @@ -277,7 +318,7 @@ export class Center implements SmppSession { session.on('bind_transceiver', this.eventBindTransceiver.bind(this, session)); session.on('pdu', this.eventAnyPdu.bind(this, session)); this.updateStatus(); - this.eventEmitter.emit(CenterEvents.STATE_CHANGED, this.serialize()); + this.eventEmitter.emit(CenterEvent.STATE_CHANGED, this.serialize()); } private eventSessionError(session: any): void { @@ -303,7 +344,7 @@ export class Center implements SmppSession { } private eventAnyPdu(session: any, pdu: any): void { - this.eventEmitter.emit(CenterEvents.ANY_PDU, pdu); + this.eventEmitter.emit(CenterEvent.ANY_PDU, pdu); this.processor.processPdu(session, pdu).then(() => { }, () => { }); diff --git a/src/Center/CenterEvents.ts b/src/Center/CenterEvent.ts similarity index 87% rename from src/Center/CenterEvents.ts rename to src/Center/CenterEvent.ts index a47f8bc..8d3f870 100644 --- a/src/Center/CenterEvents.ts +++ b/src/Center/CenterEvent.ts @@ -1,4 +1,4 @@ -export class CenterEvents { +export class CenterEvent { static STATUS_CHANGED: "STATUS_CHANGED"; static STATE_CHANGED: "STATE_CHANGED"; static ANY_PDU: "ANY_PDU"; diff --git a/src/Client/Client.ts b/src/Client/Client.ts index 747706a..67b03bd 100644 --- a/src/Client/Client.ts +++ b/src/Client/Client.ts @@ -4,7 +4,7 @@ import {JobEvents} from "../Job/JobEvents"; import Logger from "../Logger"; import PersistentPromise from "../PersistentPromise"; import {SmppSession} from "../SmppSession"; -import {ClientEvents} from "./ClientEvents"; +import {ClientEvent} from "./ClientEvent"; import ClientStatus from "./ClientStatus"; const NanoTimer = require('nanotimer'); @@ -16,13 +16,14 @@ const MESSAGE_SEND_UPDATE_DELAY: number = Number(process.env.MESSAGE_SEND_UPDATE export class Client implements SmppSession { defaultSingleJob!: Job; defaultMultipleJob!: Job; - private readonly eventEmitter: EventEmitter; + UPDATE_WS: string = "UPDATE_WS"; + private readonly eventEmitter: EventEmitter = new EventEmitter(); private readonly logger: Logger; private readonly _id: number; private session?: any; private connectPromise: PersistentPromise | null = null; - private bindPromise: PersistentPromise | null = null; // TODO: Implement close promise + private bindPromise: PersistentPromise | null = null; // Apparently the sessions are not closed on a dime but instead a .close() call causes eventSessionClose private sendTimer: any | null = null; private counterUpdateTimer: any | null = null; @@ -33,10 +34,14 @@ export class Client implements SmppSession { this._username = username; this._password = password; - this.eventEmitter = new EventEmitter(); this.logger = new Logger(`Client-${id}`); this.status = ClientStatus.NOT_CONNECTED; + this.eventEmitter.on(ClientEvent.STATE_CHANGED, () => this.updateWs(ClientEvent.STATE_CHANGED)); + this.eventEmitter.on(ClientEvent.STATUS_CHANGED, () => this.updateWs(ClientEvent.STATUS_CHANGED)); + this.eventEmitter.on(ClientEvent.ANY_PDU, (pdu: any) => this.updateWs(ClientEvent.ANY_PDU, [pdu])); + this.eventEmitter.on(ClientEvent.MESSAGE_SEND_COUNTER_UPDATE_EVENT, (count: number) => this.updateWs(ClientEvent.MESSAGE_SEND_COUNTER_UPDATE_EVENT, [count])); + this.initialize(); } @@ -62,8 +67,33 @@ export class Client implements SmppSession { set status(value: ClientStatus) { this._status = value; - this.eventEmitter.emit(ClientEvents.STATUS_CHANGED, this._status); - this.eventEmitter.emit(ClientEvents.STATE_CHANGED, this.serialize()); + this.eventEmitter.emit(ClientEvent.STATUS_CHANGED, this._status); + this.eventEmitter.emit(ClientEvent.STATE_CHANGED, this.serialize()); + } + + updateWs(event: string, args?: any[]): void { + this.logger.log1(`Update WS: ${event}`); + let message: { + type: string, + data?: string + } = { + type: event, + }; + switch (event) { + case ClientEvent.STATE_CHANGED: + message.data = JSON.stringify(this.serialize()); + break; + case ClientEvent.STATUS_CHANGED: + message.data = JSON.stringify(this._status); + break; + case ClientEvent.ANY_PDU: + message.data = JSON.stringify(args![0]); + break; + case ClientEvent.MESSAGE_SEND_COUNTER_UPDATE_EVENT: + message.data = JSON.stringify(args![0]); + break; + } + this.eventEmitter.emit(this.UPDATE_WS, message); } getUrl(): string { @@ -72,14 +102,14 @@ export class Client implements SmppSession { setDefaultSingleJob(job: Job): void { this.defaultSingleJob = job; - this.eventEmitter.emit(ClientEvents.STATE_CHANGED, this.serialize()); - job.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvents.STATE_CHANGED, this.serialize())); + this.eventEmitter.emit(ClientEvent.STATE_CHANGED, this.serialize()); + job.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvent.STATE_CHANGED, this.serialize())); } setDefaultMultipleJob(job: Job): void { this.defaultMultipleJob = job; - this.eventEmitter.emit(ClientEvents.STATE_CHANGED, this.serialize()); - job.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvents.STATE_CHANGED, this.serialize())); + this.eventEmitter.emit(ClientEvent.STATE_CHANGED, this.serialize()); + job.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvent.STATE_CHANGED, this.serialize())); } getDefaultSingleJob(): Job { @@ -93,8 +123,8 @@ export class Client implements SmppSession { initialize(): void { this.defaultSingleJob = Job.createEmptySingle(); this.defaultMultipleJob = Job.createEmptyMultiple(); - this.defaultSingleJob.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvents.STATE_CHANGED, this.serialize())); - this.defaultMultipleJob.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvents.STATE_CHANGED, this.serialize())); + this.defaultSingleJob.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvent.STATE_CHANGED, this.serialize())); + this.defaultMultipleJob.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(ClientEvent.STATE_CHANGED, this.serialize())); } doConnect(): PersistentPromise { @@ -180,7 +210,7 @@ export class Client implements SmppSession { this.counterUpdateTimer = new NanoTimer(); this.counterUpdateTimer.setInterval(() => { if (previousUpdateCounter !== counter) { - this.eventEmitter.emit(ClientEvents.MESSAGE_SEND_COUNTER_UPDATE_EVENT, counter); + this.eventEmitter.emit(ClientEvent.MESSAGE_SEND_COUNTER_UPDATE_EVENT, counter); previousUpdateCounter = counter; } }, '', `${MESSAGE_SEND_UPDATE_DELAY / 1000} s`); @@ -255,7 +285,7 @@ export class Client implements SmppSession { private eventAnyPdu(pdu: any): void { this.logger.log6(`Client-${this._id} received PDU: ${JSON.stringify(pdu)}`); - this.eventEmitter.emit(ClientEvents.ANY_PDU, pdu); + this.eventEmitter.emit(ClientEvent.ANY_PDU, pdu); } private eventSessionError(pdu: any): void { diff --git a/src/Client/ClientEvents.ts b/src/Client/ClientEvent.ts similarity index 87% rename from src/Client/ClientEvents.ts rename to src/Client/ClientEvent.ts index 68b9234..7fcc5ed 100644 --- a/src/Client/ClientEvents.ts +++ b/src/Client/ClientEvent.ts @@ -1,4 +1,4 @@ -export class ClientEvents { +export class ClientEvent { static STATUS_CHANGED: "STATUS_CHANGED"; static STATE_CHANGED: "STATE_CHANGED"; static ANY_PDU: "ANY_PDU"; diff --git a/src/Client/ClientSessionManager.ts b/src/Client/ClientSessionManager.ts index 9779122..38fc2e6 100644 --- a/src/Client/ClientSessionManager.ts +++ b/src/Client/ClientSessionManager.ts @@ -1,3 +1,4 @@ +import EventEmitter from "events"; import fs from "fs"; import {Client} from "./Client"; import {Job} from "../Job/Job"; @@ -10,7 +11,10 @@ const CLIENT_SESSIONS_FILE: string = process.env.CLIENT_SESSIONS_FILE || "client export default class ClientSessionManager implements SessionManager { sessionId: number; sessions: Client[]; + identifier: string = "client"; private readonly logger: any; + readonly SESSION_ADDED_EVENT: string = "SESSION ADDED"; + private readonly eventEmitter: EventEmitter = new EventEmitter(); constructor() { this.sessionId = 0; @@ -18,10 +22,21 @@ export default class ClientSessionManager implements SessionManager { this.logger = new Logger("ClientSessionManager"); } + on(event: string, listener: (...args: any[]) => void): void { + this.eventEmitter.on(event, listener); + } + + getSessions(): Promise { + return new Promise(resolve => { + resolve(this.sessions); + }); + } + addSession(session: SmppSession): Promise { return new Promise((resolve, reject) => { this.logger.log1(`Adding session with id ${session.getId()}`); this.sessions.push(session as Client); + this.eventEmitter.emit(this.SESSION_ADDED_EVENT, session.getId()); resolve(); }); } diff --git a/src/SessionManager.ts b/src/SessionManager.ts index ab64687..df838a4 100644 --- a/src/SessionManager.ts +++ b/src/SessionManager.ts @@ -3,6 +3,8 @@ import {SmppSession} from "./SmppSession"; export default interface SessionManager { sessions: SmppSession[]; sessionId: number; + identifier: string; + readonly SESSION_ADDED_EVENT: string; addSession(session: SmppSession): Promise; @@ -12,9 +14,13 @@ export default interface SessionManager { getSession(id: number): Promise; + getSessions(): Promise; + serialize(): object; cleanup(): void; setup(): void; + + on(event: string, listener: (...args: any[]) => void): void; } \ No newline at end of file diff --git a/src/SmppSession.ts b/src/SmppSession.ts index d245474..41696e6 100644 --- a/src/SmppSession.ts +++ b/src/SmppSession.ts @@ -8,10 +8,14 @@ export interface SmppSession { password: string, defaultSingleJob: Job; defaultMultipleJob: Job; + readonly UPDATE_WS: string; getDefaultSingleJob(): Job; + setDefaultSingleJob(job: Job): void; + getDefaultMultipleJob(): Job; + setDefaultMultipleJob(job: Job): void; getId(): number; @@ -19,9 +23,11 @@ export interface SmppSession { sendPdu(pdu: object, force?: boolean): Promise; sendSingle(job: Job): Promise; + sendSingleDefault(): Promise; sendMultiple(job: Job): Promise; + sendMultipleDefault(): Promise; cancelSendInterval(): void; @@ -31,4 +37,8 @@ export interface SmppSession { initialize(): void; serialize(): object; + + on(event: string, callback: (...args: any[]) => void): void; + + updateWs(event: string, args?: any[]): void; } \ No newline at end of file diff --git a/src/WS/ClientSet.ts b/src/WS/ClientSet.ts new file mode 100644 index 0000000..c68208b --- /dev/null +++ b/src/WS/ClientSet.ts @@ -0,0 +1,66 @@ +import Logger from "../Logger"; +import SessionManager from "../SessionManager"; +import {SmppSession} from "../SmppSession"; + +export class ClientSet { + identifier: string; + private clients: any[]; + private readonly type: string; + private readonly sessionId: number; + private readonly logger: Logger; + private readonly relevantSessionManager: SessionManager | undefined; + + constructor(identifier: string, sessionManagers: SessionManager[]) { + this.clients = []; + this.identifier = identifier; + + let data: string[] = identifier.split(':'); + this.type = data[0]; + this.sessionId = parseInt(data[1]); + + this.logger = new Logger(`ClientSet-${this.type}-${this.sessionId}`); + + this.relevantSessionManager = sessionManagers.find(sm => sm.identifier === this.type); + if (!this.relevantSessionManager) { + this.logger.log1(`No session manager found for type ${this.type}`); + return; + } + if (this.relevantSessionManager) { + this.relevantSessionManager.getSessions().then((sessions) => { + sessions.forEach((session) => { + this.attachListener(session); + }); + }); + } + this.relevantSessionManager.on(this.relevantSessionManager.SESSION_ADDED_EVENT, this.eventOnSessionAdded.bind(this)); + } + + eventOnSessionAdded(sessionId: number): void { + this.logger.log2(`Session added: ${sessionId}`); + this.relevantSessionManager?.getSession(sessionId).then((session) => { + this.attachListener(session); + }) + } + + add(ws: any): void { + this.logger.log2(`Added client`); + this.clients.push(ws); + ws.on('close', this.eventOnClose.bind(this)); + } + + eventOnClose(ws: any): void { + this.logger.log2(`Removed client`); + this.clients.splice(this.clients.indexOf(ws), 1); + } + + notifyClients(message: string) { + this.logger.log2(`Notifying clients: ${message}`); + this.clients.forEach((ws) => { + ws.send(message); + }); + } + + private attachListener(session: SmppSession) { + session.on(session.UPDATE_WS, (message: string) => this.notifyClients(message)); + } +} \ No newline at end of file diff --git a/src/WS/WSServer.ts b/src/WS/WSServer.ts new file mode 100644 index 0000000..2d0498e --- /dev/null +++ b/src/WS/WSServer.ts @@ -0,0 +1,291 @@ +import Logger from "../Logger"; +import SessionManager from "../SessionManager"; +import {ClientSet} from "./ClientSet"; +const WebSocket = require("ws"); + +const WS_SERVER_PORT: number = Number(process.env.WS_SERVER_PORT) || 8191; + +export class WSServer { + private readonly clients: ClientSet[]; + private readonly unknownClients: any[]; + private readonly server: any; + private readonly logger: Logger; + private readonly sessionManagers: SessionManager[]; + + constructor(sessionManagers: SessionManager[]) { + this.clients = []; + this.unknownClients = []; + this.server = new WebSocket.Server({port: WS_SERVER_PORT}); + this.sessionManagers = sessionManagers; + this.logger = new Logger("WSServer"); + this.server.on('connection', this.eventOnConnection.bind(this)); + this.logger.log1(`WSServer listening atws://localhost:${WS_SERVER_PORT}`); + } + + private eventOnConnection(ws: WebSocket): void { + this.logger.log1("New connection"); + this.unknownClients.push(ws); + // @ts-ignore + ws.on('message', this.eventOnMessage.bind(this, ws)); + // @ts-ignore + ws.on('close', this.eventOnClose.bind(this, ws)); + } + + private eventOnMessage(ws: any, message: string): void { + this.logger.log1("New message"); + message = String(message); + this.unknownClients.splice(this.unknownClients.indexOf(ws), 1); + let clientSet: ClientSet | undefined = this.clients.find((clientSet: ClientSet) => clientSet.identifier === message); + if (!clientSet) { + clientSet = new ClientSet(message, this.sessionManagers); + } + clientSet.add(ws); + } + + private eventOnClose(ws: any): void { + this.logger.log1("Connection closed"); + this.unknownClients.splice(this.unknownClients.indexOf(ws), 1); + } + + // constructor() { + // // @ts-ignore + // this.server = new WebSocket.Server({port: WS_SERVER_PORT}); + // this.logger = new Logger("WSServer"); + // this.server.on('connection', this.onConnection.bind(this)); + // this.logger.log1(`WSServer listening at ws://localhost:${WS_SERVER_PORT}`); + // } + + // onConnection(ws: WebSocket) { + // this.logger.log1("New connection"); + // this.unknownClients.push(ws); + // ws.on('message', this.onMessage.bind(this, ws)); + // ws.on('close', this.onClose.bind(this, ws)); + // } + // + // addClient(ws, type, sessionId) { + // if (!this.clients[type]) { + // this.clients[type] = {}; + // } + // if (!this.clients[type][sessionId]) { + // this.clients[type][sessionId] = []; + // } + // this.logger.log1(`Adding client ${ws.id} to ${type} session ${sessionId}`); + // + // if (type === "client") { + // if (this.listenersAlreadySetup.indexOf(`client-${sessionId}`) === -1) { + // let session = clientSessionManager.getSession(sessionId); + // if (!!session) { + // this.logger.log1(`Setting up listeners for client session ${sessionId}`); + // session.on(ClientSession.STATUS_CHANGED_EVENT, this.onClientSessionStatusChange.bind(this, sessionId)); + // session.on(ClientSession.ANY_PDU_EVENT, this.onClientSessionPdu.bind(this, sessionId)); + // session.on(ClientSession.MESSAGE_SEND_COUNTER_UPDATE_EVENT, this.onClientMessageCounterUpdate.bind(this, sessionId)); + // } + // this.listenersAlreadySetup.push(`client-${sessionId}`); + // } else { + // this.logger.log1(`Listeners for client session ${sessionId} already set up`); + // } + // } else if (type === "center") { + // if (this.listenersAlreadySetup.indexOf(`center-${sessionId}`) === -1) { + // let session = centerSessionManager.getSession(sessionId); + // if (!!session) { + // this.logger.log1(`Setting up listeners for center session ${sessionId}`); + // session.on(CenterSession.STATUS_CHANGED_EVENT, this.onCenterStatusChange.bind(this, sessionId)); + // session.on(CenterSession.ANY_PDU_EVENT, this.onCenterServerPdu.bind(this, sessionId)); + // session.on(CenterSession.MODE_CHANGED_EVENT, this.onCenterModeChanged.bind(this, sessionId)); + // session.on(CenterSession.SESSION_CHANGED_EVENT, this.onCenterSessionsChanged.bind(this, sessionId)); + // session.on(ClientSession.MESSAGE_SEND_COUNTER_UPDATE_EVENT, this.onCenterMessageCounterUpdate.bind(this, sessionId)); + // } + // this.listenersAlreadySetup.push(`center-${sessionId}`); + // } else { + // this.logger.log1(`Listeners for center session ${sessionId} already set up`); + // } + // } + // + // this.clients[type][sessionId].push(ws); + // this.logger.log1(`Now active ${this.clients[type][sessionId].length} clients in session ID: ${sessionId} of type ${type}`); + // } + // + // onMessage(ws, message) { + // this.logger.log1("New message"); + // message = String(message); + // let data = message.split(":"); + // let type = data[0]; + // let sessionId = data[1]; + // + // this.logger.log1(`Moving client to session ID: ${sessionId} of type ${type}`); + // delete this.unknownClients[ws]; + // this.unknownClients = this.unknownClients.filter(Boolean); + // + // this.addClient(ws, type, sessionId); + // this.logger.log1(`Now active ${this.clients[type][sessionId].length} clients in session ID: ${sessionId} of type ${type}`); + // } + // + // onClose(ws) { + // this.removeClient(ws); + // // this.logger.log6(this.clients); + // this.logger.log1("Connection closed"); + // } + // + // removeClient(ws) { + // this.clients.client = this.removeFromArray(this.clients.client, ws); + // this.clients.center = this.removeFromArray(this.clients.center, ws); + // } + // + // removeFromArray(array, element) { + // for (let sessionId in array) { + // let index = array[sessionId].indexOf(element); + // if (index > -1) { + // delete array[sessionId][index]; + // } + // array[sessionId] = array[sessionId].filter(Boolean); + // if (array[sessionId].length === 0) { + // delete array[sessionId]; + // } + // } + // return array; + // } + // + // onClientSessionStatusChange(sessionId, newStatus) { + // this.logger.log1(`Session with ID ${sessionId} changed`); + // let payload = { + // objectType: "client", + // type: 'status', + // sessionId: sessionId, + // value: newStatus + // } + // let clients = this.clients["client"][sessionId]; + // if (!!clients) { + // this.logger.log1(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); + // clients.forEach(client => { + // client.send(JSON.stringify(payload)); + // }); + // } + // } + // + // onClientSessionPdu(sessionId, pdu) { + // // TODO: Maybe move this to an "ignored" array against who the pdu.command is compared + // if (pdu.command === 'enquire_link_resp' || pdu.command === 'enquire_link') { + // return; + // } + // let clients = this.clients["client"][sessionId]; + // if (!!clients) { + // this.logger.log2(`Session with ID ${sessionId} fired PDU`); + // let payload = { + // objectType: "client", + // type: 'pdu', + // sessionId: sessionId, + // value: pdu + // } + // this.logger.log2(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); + // clients.forEach(client => { + // client.send(JSON.stringify(payload)); + // }); + // } + // } + // + // onClientMessageCounterUpdate(sessionId, counter) { + // this.logger.log2(`Session with ID ${sessionId} updating message send counter`); + // let payload = { + // objectType: "client", + // type: 'counterUpdate', + // sessionId: sessionId, + // value: counter + // } + // let clients = this.clients["client"][sessionId]; + // if (!!clients) { + // this.logger.log2(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); + // clients.forEach(client => { + // client.send(JSON.stringify(payload)); + // }); + // } + // } + // + // onCenterStatusChange(sessionId, newStatus) { + // this.logger.log1(`Session with ID ${sessionId} changed`); + // let payload = { + // objectType: "center", + // type: 'status', + // sessionId: sessionId, + // value: newStatus + // } + // let clients = this.clients["center"][sessionId]; + // if (!!clients) { + // this.logger.log1(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); + // clients.forEach(client => { + // client.send(JSON.stringify(payload)); + // }); + // } + // } + // + // onCenterServerPdu(sessionId, pdu) { + // if (pdu.command === 'enquire_link_resp' || pdu.command === 'enquire_link') { + // return; + // } + // let clients = this.clients["center"][sessionId]; + // if (!!clients) { + // this.logger.log2(`Session with ID ${sessionId} fired PDU`); + // let payload = { + // objectType: "center", + // type: 'pdu', + // sessionId: sessionId, + // value: pdu + // } + // this.logger.log2(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); + // clients.forEach(client => { + // client.send(JSON.stringify(payload)); + // }); + // } + // } + // + // onCenterModeChanged(sessionId, newMode) { + // this.logger.log1(`Session with ID ${sessionId} changed`); + // let payload = { + // objectType: "center", + // type: 'mode', + // sessionId: sessionId, + // value: newMode, + // text: CenterMode[newMode] + // } + // let clients = this.clients["center"][sessionId]; + // if (!!clients) { + // this.logger.log1(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); + // clients.forEach(client => { + // client.send(JSON.stringify(payload)); + // }); + // } + // } + // + // onCenterSessionsChanged(sessionId, newSession) { + // this.logger.log1(`Session with ID ${sessionId} changed`); + // let payload = { + // objectType: "center", + // type: 'sessions', + // sessionId: sessionId, + // value: newSession + // } + // let clients = this.clients["center"][sessionId]; + // if (!!clients) { + // this.logger.log1(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); + // clients.forEach(client => { + // client.send(JSON.stringify(payload)); + // }); + // } + // } + // + // onCenterMessageCounterUpdate(sessionId, counter) { + // this.logger.log2(`Session with ID ${sessionId} updating message send counter`); + // let payload = { + // objectType: "center", + // type: 'counterUpdate', + // sessionId: sessionId, + // value: counter + // } + // let clients = this.clients["center"][sessionId]; + // if (!!clients) { + // this.logger.log2(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); + // clients.forEach(client => { + // client.send(JSON.stringify(payload)); + // }); + // } + // } +} \ No newline at end of file diff --git a/src/main.ts b/src/main.ts index 6b6b102..5484911 100644 --- a/src/main.ts +++ b/src/main.ts @@ -3,6 +3,7 @@ import {Client} from "./Client/Client"; import ClientSessionManager from "./Client/ClientSessionManager"; import {Job} from "./Job/Job"; import Logger from "./Logger"; +import {WSServer} from "./WS/WSServer"; const smpp = require("smpp"); const fs = require("fs"); @@ -18,7 +19,6 @@ const {PDU} = require("smpp"); const app = express(); const SERVER_PORT: number = Number(process.env.SERVER_PORT) || 8190; -const WS_SERVER_PORT: number = Number(process.env.WS_SERVER_PORT) || 8191; const CENTER_SESSIONS_FILE: string = process.env.CENTER_SESSIONS_FILE || "center_sessions.json"; const MESSAGE_SEND_UPDATE_DELAY: number = Number(process.env.MESSAGE_SEND_UPDATE_DELAY) || 500; @@ -30,6 +30,8 @@ let logger = new Logger("main"); let clientManager: ClientSessionManager = new ClientSessionManager(); clientManager.setup(); +let wss: WSServer = new WSServer([clientManager]); + async function main() { // let client: Client = await clientManager.createSession("smpp://localhost:7000", "test", "test") as Client; let client: Client = await clientManager.getSession(0) as Client; @@ -65,7 +67,7 @@ async function main() { // destination_addr: "1234567890", // short_message: "Hello World" // }), 100, 100)); - center.close(); + // center.close(); }, 1000); }