From 330051382e094a13fe2fa1a2ce60e7a94e26cae9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Majdand=C5=BEi=C4=87?= Date: Wed, 29 Mar 2023 18:25:57 +0200 Subject: [PATCH] Generify smppSession --- src/Center/Center.ts | 276 ++++-------------- src/Center/CenterSessionManager.ts | 42 --- .../PDUProcessors/CenterPDUProcessor.ts | 3 - src/Client/ClientSessionManager.ts | 1 - src/Job/JobEvents.ts | 3 - src/PDUProcessor/PduProcessor.ts | 5 + src/SmppSession.ts | 182 ++++++++++-- 7 files changed, 215 insertions(+), 297 deletions(-) delete mode 100644 src/Center/PDUProcessors/CenterPDUProcessor.ts delete mode 100644 src/Job/JobEvents.ts create mode 100644 src/PDUProcessor/PduProcessor.ts diff --git a/src/Center/Center.ts b/src/Center/Center.ts index c76b462..871ac1b 100644 --- a/src/Center/Center.ts +++ b/src/Center/Center.ts @@ -1,221 +1,67 @@ -import EventEmitter from "events"; import {Job} from "../Job/Job"; -import {JobEvents} from "../Job/JobEvents"; import Logger from "../Logger"; +import {PduProcessor} from "../PDUProcessor/PduProcessor"; import {SmppSession} from "../SmppSession"; import CenterStatus from "./CenterStatus"; -import {CenterPDUProcessor} from "./PDUProcessors/CenterPDUProcessor"; -import {DebugProcessor} from "./PDUProcessors/DebugProcessor"; const NanoTimer = require('nanotimer'); const smpp = require("smpp"); -const MESSAGE_SEND_UPDATE_DELAY: number = Number(process.env.MESSAGE_SEND_UPDATE_DELAY) || 500; +export class Center extends SmppSession { + readonly STATUS: string[] = [ + "WAITING CONNECTION", + "CONNECTING", + "CONNECTED", + ]; + + id: number; + username: string; + password: string; + status: string = this.STATUS[0]; + port: number; + + pduProcessors: PduProcessor[] = []; + defaultSingleJob: Job = Job.createEmptySingle(); + defaultMultipleJob: Job = Job.createEmptyMultiple(); -export class Center implements SmppSession { - static EVENTS: any = { - STATUS_CHANGED: "STATUS_CHANGED", - STATE_CHANGED: "STATE_CHANGED", - ANY_PDU: "ANY_PDU", - MESSAGE_SEND_COUNTER_UPDATE_EVENT: "MESSAGE_SEND_COUNTER_UPDATE_EVENT", - } - UPDATE_WS: string = "UPDATE_WS"; private pendingSessions: any[] = []; private sessions: any[] = []; private nextSession: number = 0; private server: any; - private eventEmitter: EventEmitter = new EventEmitter(); - private readonly logger: Logger; - private readonly _id: number; - private sendTimer: any | null = null; - private counterUpdateTimer: any | null = null; + readonly logger: Logger; constructor(id: number, port: number, username: string, password: string) { - this._id = id; - this._port = port; - this._username = username; - this._password = password; + super(); + this.id = id; + this.username = username; + this.password = password; + this.port = port; this.logger = new Logger(`Center-${id}`); - this.eventEmitter.on(Center.EVENTS.STATE_CHANGED, () => this.updateWs(Center.EVENTS.STATE_CHANGED)); - this.eventEmitter.on(Center.EVENTS.STATUS_CHANGED, () => this.updateWs(Center.EVENTS.STATUS_CHANGED)); - this.eventEmitter.on(Center.EVENTS.ANY_PDU, (pdu: any) => this.updateWs(Center.EVENTS.ANY_PDU, [pdu])); - this.eventEmitter.on(Center.EVENTS.MESSAGE_SEND_COUNTER_UPDATE_EVENT, (count: number) => this.updateWs(Center.EVENTS.MESSAGE_SEND_COUNTER_UPDATE_EVENT, [count])); - this.initialize(); } - get id(): number { - return this._id; - } - - private _port: number; - - get port(): number { - return this._port; - } - - // TODO: Implement processor switching - private _processor: CenterPDUProcessor = new DebugProcessor(); - - get processor(): CenterPDUProcessor { - return this._processor; - } - - set processor(value: CenterPDUProcessor) { - this._processor = value; - this.eventEmitter.emit(Center.EVENTS.STATE_CHANGED, this.serialize()); - } - - private _defaultMultipleJob!: Job; - - get defaultMultipleJob(): Job { - return this._defaultMultipleJob; - } - - set defaultMultipleJob(value: Job) { - this._defaultMultipleJob = value; - this.eventEmitter.emit(Center.EVENTS.STATE_CHANGED, this.serialize()); - } - - private _defaultSingleJob!: Job; - - get defaultSingleJob(): Job { - return this._defaultSingleJob; - } - - set defaultSingleJob(value: Job) { - this._defaultSingleJob = value; - this.eventEmitter.emit(Center.EVENTS.STATE_CHANGED, this.serialize()); - } - - private _password: string; - - get password(): string { - return this._password; - } - - set password(value: string) { - this._password = value; - this.eventEmitter.emit(Center.EVENTS.STATE_CHANGED, this.serialize()); - } - - private _username: string; - - get username(): string { - return this._username; - } - - set username(value: string) { - this._username = value; - this.eventEmitter.emit(Center.EVENTS.STATE_CHANGED, this.serialize()); - } - - private _status: CenterStatus = CenterStatus.WAITING_CONNECTION; - - get status(): CenterStatus { - return this._status; - } - - set status(value: CenterStatus) { - this._status = value; - this.eventEmitter.emit(Center.EVENTS.STATUS_CHANGED, this._status); - this.eventEmitter.emit(Center.EVENTS.STATE_CHANGED, this.serialize()); - } - - on(event: string, callback: (...args: any[]) => void): void { - this.eventEmitter.on(event, callback); - } - - updateWs(event: string, args?: any[]): void { - this.logger.log1(`Update WS: ${event}`); - let message: { - type: string, - data?: string - } = { - type: event, - }; - switch (event) { - case Center.EVENTS.STATE_CHANGED: - message.data = JSON.stringify(this.serialize()); - break; - case Center.EVENTS.STATUS_CHANGED: - message.data = JSON.stringify(this._status); - break; - case Center.EVENTS.ANY_PDU: - message.data = JSON.stringify(args![0]); - break; - case Center.EVENTS.MESSAGE_SEND_COUNTER_UPDATE_EVENT: - message.data = JSON.stringify(args![0]); - break; - } - this.eventEmitter.emit(this.UPDATE_WS, message); - } - - initialize(): void { - this._defaultSingleJob = Job.createEmptySingle(); - this._defaultMultipleJob = Job.createEmptyMultiple(); - this._defaultSingleJob.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(Center.EVENTS.STATE_CHANGED, this.serialize())); - this._defaultMultipleJob.on(JobEvents.STATE_CHANGED, () => this.eventEmitter.emit(Center.EVENTS.STATE_CHANGED, this.serialize())); - - this.server = smpp.createServer({}, this.eventSessionConnected.bind(this)); - this.server.listen(this._port); - this.status = CenterStatus.WAITING_CONNECTION; - } - - cancelSendInterval(): void { - if (this.sendTimer) { - this.sendTimer.clearInterval(); - this.counterUpdateTimer.clearInterval(); - this.sendTimer = null; - this.counterUpdateTimer = null; - } - } - - close(): Promise { - return new Promise((resolve, reject) => { - this.logger.log1(`Center-${this._id} closing...`); - this.server.close(); - this.status = CenterStatus.WAITING_CONNECTION; - resolve(); - }); - } - - getDefaultMultipleJob(): Job { - return this.getDefaultSingleJob(); - } - - getDefaultSingleJob(): Job { - return this.defaultSingleJob; - } - - getId(): number { - return this.id; - } - sendMultiple(job: Job): Promise { return new Promise((resolve, reject) => { this.validateSessions(reject); if (!job.count || !job.perSecond) { - reject(`Center-${this._id} sendMultiple failed: invalid job, missing fields`); + reject(`Center-${this.getId()} sendMultiple failed: invalid job, missing fields`); } - this.logger.log1(`Center-${this._id} sending multiple messages: ${JSON.stringify(job)}`); + this.logger.log1(`Center-${this.getId()} sending multiple messages: ${JSON.stringify(job)}`); let counter = 0; let previousUpdateCounter = 0; - this.counterUpdateTimer = new NanoTimer(); this.counterUpdateTimer.setInterval(() => { if (previousUpdateCounter !== counter) { - this.eventEmitter.emit(Center.EVENTS.MESSAGE_SEND_COUNTER_UPDATE_EVENT, counter); + this.eventEmitter.emit(this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT, counter); previousUpdateCounter = counter; } - }, '', `${MESSAGE_SEND_UPDATE_DELAY / 1000} s`); + }, '', `${this.MESSAGE_SEND_UPDATE_DELAY / 1000} s`); let count = job.count || 1; let interval = 1 / (job.perSecond || 1); - this.sendTimer = new NanoTimer(); this.sendTimer.setInterval(() => { if (count > 0 && counter >= count) { this.cancelSendInterval(); @@ -228,54 +74,45 @@ export class Center implements SmppSession { }); } - sendMultipleDefault(): Promise { - return this.sendMultiple(this.defaultMultipleJob); - } - sendPdu(pdu: object, force?: boolean): Promise { return new Promise((resolve, reject) => { if (!force) { this.validateSessions(reject); } - this.logger.log5(`Center-${this._id} sending PDU: ${JSON.stringify(pdu)}`); + this.logger.log5(`Center-${this.getId()} sending PDU: ${JSON.stringify(pdu)}`); this.getNextSession().send(pdu, (replyPdu: any) => { resolve(replyPdu); }); }); } - sendSingle(job: Job): Promise { - return this.sendPdu(job.pdu); + initialize(): void { + this.server = smpp.createServer({}, this.eventSessionConnected.bind(this)); + this.server.listen(this.port); + this.setStatus(0); } - sendSingleDefault(): Promise { - return this.sendPdu(this.defaultSingleJob.pdu); + close(): Promise { + return new Promise((resolve, reject) => { + this.logger.log1(`Center-${this.getId()} closing active connections`); + this.server.close(); + this.setStatus(0); + resolve(); + }); } serialize(): object { return { - id: this._id, - port: this._port, - username: this._username, - password: this._password, - status: this._status, - defaultSingleJob: this._defaultSingleJob, - defaultMultipleJob: this._defaultMultipleJob, + id: this.id, + port: this.port, + username: this.username, + password: this.password, + status: this.status, + defaultSingleJob: this.defaultSingleJob, + defaultMultipleJob: this.defaultMultipleJob, }; } - setDefaultMultipleJob(job: Job): void { - this.defaultMultipleJob = job; - } - - setDefaultSingleJob(job: Job): void { - this.defaultSingleJob = job; - } - - getPort(): number { - return this.port; - } - private validateSessions(reject: (reason?: any) => void) { if (this.sessions.length === 0) { reject(`No clients connected`); @@ -292,17 +129,17 @@ export class Center implements SmppSession { } private eventBindTransceiver(session: any, pdu: any) { - this.logger.log1(`Center-${this._id} got a bind_transciever with system_id ${pdu.system_id} and password ${pdu.password}`); + this.logger.log1(`Center-${this.getId()} got a bind_transciever with system_id ${pdu.system_id} and password ${pdu.password}`); session.pause(); if (pdu.system_id === this.username && pdu.password === this.password) { - this.logger.log1(`Center-${this._id} client connection successful`); + this.logger.log1(`Center-${this.getId()} client connection successful`); session.send(pdu.response()); session.resume(); this.pendingSessions = this.pendingSessions.filter((s) => s !== session); this.sessions.push(session); this.updateStatus(); } else { - this.logger.log1(`Center-${this._id} client connection failed, invalid credentials (expected: ${this.username}, ${this.password})`); + this.logger.log1(`Center-${this.getId()} client connection failed, invalid credentials (expected: ${this.username}, ${this.password})`); session.send(pdu.response({ command_status: smpp.ESME_RBINDFAIL })); @@ -313,22 +150,22 @@ export class Center implements SmppSession { } private eventSessionConnected(session: any): void { - this.logger.log1(`A client connected to center-${this._id}`); + this.logger.log1(`A client connected to center-${this.getId()}`); this.pendingSessions.push(session); session.on('close', this.eventSessionClose.bind(this, session)); session.on('error', this.eventSessionError.bind(this, session)); session.on('bind_transceiver', this.eventBindTransceiver.bind(this, session)); session.on('pdu', this.eventAnyPdu.bind(this, session)); this.updateStatus(); - this.eventEmitter.emit(Center.EVENTS.STATE_CHANGED, this.serialize()); + this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); } private eventSessionError(session: any): void { - this.logger.log1(`A client encountered an error on center-${this._id}`); + this.logger.log1(`A client encountered an error on center-${this.getId()}`); } private eventSessionClose(session: any): void { - this.logger.log1(`A client disconnected from center-${this._id}`); + this.logger.log1(`A client disconnected from center-${this.getId()}`); this.sessions = this.sessions.filter((s: any) => s !== session); this.nextSession = 0; this.pendingSessions = this.pendingSessions.filter((s: any) => s !== session); @@ -344,11 +181,4 @@ export class Center implements SmppSession { this.status = CenterStatus.WAITING_CONNECTION; } } - - private eventAnyPdu(session: any, pdu: any): void { - this.eventEmitter.emit(Center.EVENTS.ANY_PDU, pdu); - this.processor.processPdu(session, pdu).then(() => { - }, () => { - }); - } } \ No newline at end of file diff --git a/src/Center/CenterSessionManager.ts b/src/Center/CenterSessionManager.ts index 90a2d81..c8ca877 100644 --- a/src/Center/CenterSessionManager.ts +++ b/src/Center/CenterSessionManager.ts @@ -1,4 +1,3 @@ -import EventEmitter from "events"; import Logger from "../Logger"; import {SessionManager} from "../SessionManager"; import {SmppSession} from "../SmppSession"; @@ -13,7 +12,6 @@ export class CenterSessionManager extends SessionManager { sessions: Center[] = []; identifier: string = "center"; readonly logger: Logger = new Logger("CenterSessionManager"); - readonly eventEmitter: EventEmitter = new EventEmitter(); constructor() { super(); @@ -21,44 +19,4 @@ export class CenterSessionManager extends SessionManager { } comparatorFn: (arg: any, session: SmppSession) => boolean = (arg: any, session: SmppSession) => (session as Center).getPort() === arg; - - createSession(port: number, username: string, password: string): Promise { - return new Promise((resolve, reject) => { - this.logger.log1(`Creating session with port ${port}`); - this.getExisting(port).then(s => { - resolve(s); - }, err => { - }); - this.verifyPort(port, reject); - // this.verifyUsername(username, reject); - // this.verifyPassword(password, reject); - - let client = new Center(this.sessionId++, port, username, password); - this.addSession(client).then(() => { - resolve(client); - }); - }); - } - - getExisting(arg: any): Promise { - return new Promise((resolve, reject) => { - this.logger.log1(`Looking for session with port ${arg}...`); - let session: SmppSession | undefined = this.sessions.find((s: Center) => s.getPort() === arg); - if (session) { - this.logger.log1(`Found session with port ${arg}`); - resolve(session); - } else { - this.logger.log1(`Session with port ${arg} not found`); - reject(`Session with port ${arg} not found`); - } - }); - } - - private verifyPort(port: number, reject: (reason?: any) => void) { - if (!port) { - let error = `Request to make a new center failed because of missing port.`; - this.logger.log1(error); - reject(error); - } - } } \ No newline at end of file diff --git a/src/Center/PDUProcessors/CenterPDUProcessor.ts b/src/Center/PDUProcessors/CenterPDUProcessor.ts deleted file mode 100644 index 96ca270..0000000 --- a/src/Center/PDUProcessors/CenterPDUProcessor.ts +++ /dev/null @@ -1,3 +0,0 @@ -export interface CenterPDUProcessor { - processPdu(session: any, pdu: any): Promise; -} \ No newline at end of file diff --git a/src/Client/ClientSessionManager.ts b/src/Client/ClientSessionManager.ts index 6dd8613..7306681 100644 --- a/src/Client/ClientSessionManager.ts +++ b/src/Client/ClientSessionManager.ts @@ -14,7 +14,6 @@ export default class ClientSessionManager extends SessionManager { // Identifier is used in websockets to identify the type of session this manager manages identifier: string = "client"; readonly logger: Logger = new Logger("ClientSessionManager"); - readonly eventEmitter: EventEmitter = new EventEmitter(); constructor() { super(); diff --git a/src/Job/JobEvents.ts b/src/Job/JobEvents.ts deleted file mode 100644 index d205964..0000000 --- a/src/Job/JobEvents.ts +++ /dev/null @@ -1,3 +0,0 @@ -export class JobEvents { - static readonly STATE_CHANGED: string = "STATE_CHANGED"; -} \ No newline at end of file diff --git a/src/PDUProcessor/PduProcessor.ts b/src/PDUProcessor/PduProcessor.ts new file mode 100644 index 0000000..d60674d --- /dev/null +++ b/src/PDUProcessor/PduProcessor.ts @@ -0,0 +1,5 @@ +export interface PduProcessor { + processPdu(session: any, pdu: any, ...args: any[]): Promise; + + serialize(): object; +} \ No newline at end of file diff --git a/src/SmppSession.ts b/src/SmppSession.ts index 41696e6..716a759 100644 --- a/src/SmppSession.ts +++ b/src/SmppSession.ts @@ -1,44 +1,176 @@ +import EventEmitter from "events"; import {Job} from "./Job/Job"; +import Logger from "./Logger"; +import {PduProcessor} from "./PDUProcessor/PduProcessor"; -// TODO: Implement on change event and propagate it to sessions -// Do something like "onJobChange" here... -// Maybe even make it default -export interface SmppSession { - username: string, - password: string, - defaultSingleJob: Job; - defaultMultipleJob: Job; - readonly UPDATE_WS: string; +const NanoTimer = require("nanotimer"); +const smpp = require("smpp"); - getDefaultSingleJob(): Job; +export abstract class SmppSession { + readonly EVENT: any = { + STATUS_CHANGED: "STATUS_CHANGED", + STATE_CHANGED: "STATE_CHANGED", + ANY_PDU: "ANY_PDU", + MESSAGE_SEND_COUNTER_UPDATE_EVENT: "MESSAGE_SEND_COUNTER_UPDATE_EVENT", + }; + abstract STATUS: string[]; - setDefaultSingleJob(job: Job): void; + abstract id: number; + abstract username: string; + abstract password: string; + abstract status: string; + abstract pduProcessors: PduProcessor[]; - getDefaultMultipleJob(): Job; + abstract defaultSingleJob: Job; + abstract defaultMultipleJob: Job; - setDefaultMultipleJob(job: Job): void; + readonly UPDATE_WS: string = "UPDATE_WS"; + readonly eventEmitter: EventEmitter = new EventEmitter(); + readonly logger: Logger = new Logger(`SmppSession`); + readonly sendTimer: any = new NanoTimer(); + readonly counterUpdateTimer: any = new NanoTimer(); + readonly MESSAGE_SEND_UPDATE_DELAY: number = Number(process.env.MESSAGE_SEND_UPDATE_DELAY) || 500; - getId(): number; + constructor() { + this.eventEmitter.on(this.EVENT.STATE_CHANGED, () => this.updateWs(this.EVENT.STATE_CHANGED)); + this.eventEmitter.on(this.EVENT.STATUS_CHANGED, () => this.updateWs(this.EVENT.STATUS_CHANGED)); + this.eventEmitter.on(this.EVENT.ANY_PDU, (pdu: any) => this.updateWs(this.EVENT.ANY_PDU, [pdu])); + this.eventEmitter.on(this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT, (count: number) => this.updateWs(this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT, [count])); + } - sendPdu(pdu: object, force?: boolean): Promise; + abstract sendPdu(pdu: object, force?: boolean): Promise; - sendSingle(job: Job): Promise; + sendSingle(job: Job): Promise { + return this.sendPdu(job.pdu); + } - sendSingleDefault(): Promise; + sendSingleDefault(): Promise { + return this.sendSingle(this.defaultSingleJob); + } - sendMultiple(job: Job): Promise; + abstract sendMultiple(job: Job): Promise; - sendMultipleDefault(): Promise; + sendMultipleDefault(): Promise { + return this.sendMultiple(this.defaultMultipleJob); + } - cancelSendInterval(): void; + cancelSendInterval(): void { + this.sendTimer.clearInterval(); + this.counterUpdateTimer.clearInterval(); + } - close(): Promise; + abstract close(): Promise; - initialize(): void; + abstract initialize(): void; - serialize(): object; + abstract serialize(): object; - on(event: string, callback: (...args: any[]) => void): void; + on(event: string, callback: (...args: any[]) => void): void { + this.eventEmitter.on(event, callback); + } - updateWs(event: string, args?: any[]): void; + updateWs(event: string, args?: any[]): void { + this.logger.log1(`Update WS: ${event}`); + let message: { + type: string, + data?: string + } = { + type: event, + }; + switch (event) { + case this.EVENT.STATE_CHANGED: + message.data = JSON.stringify(this.serialize()); + break; + case this.EVENT.STATUS_CHANGED: + message.data = JSON.stringify(this.status); + break; + case this.EVENT.ANY_PDU: + message.data = JSON.stringify(args![0]); + break; + case this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT: + message.data = JSON.stringify(args![0]); + break; + } + this.eventEmitter.emit(this.UPDATE_WS, message); + } + + getDefaultSingleJob(): Job { + return this.defaultSingleJob; + } + + setDefaultSingleJob(job: Job): void { + this.defaultSingleJob = job; + job.on(Job.STATE_CHANGED, this.eventJobUpdated); + this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); + } + + getDefaultMultipleJob(): Job { + return this.defaultMultipleJob; + } + + setDefaultMultipleJob(job: Job): void { + this.defaultMultipleJob = job; + job.on(Job.STATE_CHANGED, this.eventJobUpdated); + this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); + } + + getId(): number { + return this.id; + } + + setStatus(statusIndex: number): void { + this.status = this.STATUS[statusIndex]; + this.eventEmitter.emit(this.EVENT.STATUS_CHANGED, this.status); + } + + setUsername(username: string): void { + this.username = username; + this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); + } + + setPassword(password: string): void { + this.password = password; + this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); + } + + eventJobUpdated(): void { + this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); + } + + addPduProcessor(pduProcessor: PduProcessor): void { + this.pduProcessors.push(pduProcessor); + this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); + } + + removePduProcessor(pduProcessor: PduProcessor): void { + this.pduProcessors = this.pduProcessors.splice(this.pduProcessors.indexOf(pduProcessor), 1); + this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); + } + + getPduProcessors(): PduProcessor[] { + return this.pduProcessors; + } + + serializePduProcessors(): object { + this.logger.log1(`Serializing ${this.pduProcessors.length} clients`) + return this.pduProcessors.map((processor: PduProcessor) => { + return processor.serialize(); + }); + } + + eventAnyPdu(session: any, pdu: any): Promise { + this.eventEmitter.emit(this.EVENT.ANY_PDU, pdu); + let successful: number = 0; + this.pduProcessors.forEach((pduProcessor: PduProcessor) => { + pduProcessor.processPdu(session, pdu).then((result: any) => { + successful++; + }, (error: any) => { + }); + }); + if (successful === 0) { + return Promise.reject("No PDU processor was able to process the PDU"); + } else { + return Promise.resolve(); + } + } } \ No newline at end of file