diff --git a/src/Center/Center.ts b/src/Center/Center.ts index 40de5e5..4e8d868 100644 --- a/src/Center/Center.ts +++ b/src/Center/Center.ts @@ -5,19 +5,25 @@ import {JobEvents} from "../Job/JobEvents"; import Logger from "../Logger"; import {SmppSession} from "../SmppSession"; import {CenterEvents} from "./CenterEvents"; +import {CenterPDUProcessor} from "./CenterPDUProcessor"; import CenterStatus from "./CenterStatus"; const NanoTimer = require('nanotimer'); const smpp = require("smpp"); +const MESSAGE_SEND_UPDATE_DELAY: number = Number(process.env.MESSAGE_SEND_UPDATE_DELAY) || 500; + export class Center implements SmppSession { port: number; private pendingSessions: any[] = []; private sessions: any[] = []; + private nextSession: number = 0; private server: any; private eventEmitter: EventEmitter = new EventEmitter(); private readonly logger: Logger; private readonly _id: number; + private sendTimer: any | null = null; + private counterUpdateTimer: any | null = null; constructor(id: number, port: number, username: string, password: string) { this._id = id; @@ -30,8 +36,20 @@ export class Center implements SmppSession { this.initialize(); } + // TODO: Implement a few modes and set this to default DEBUG + private _processor: CenterPDUProcessor | undefined; + + set processor(value: CenterPDUProcessor) { + this._processor = value; + this.eventEmitter.emit(CenterEvents.STATE_CHANGED, this.serialize()); + } + private _defaultMultipleJob!: Job; + get defaultMultipleJob(): Job { + return this._defaultMultipleJob; + } + set defaultMultipleJob(value: Job) { this._defaultMultipleJob = value; this.eventEmitter.emit(CenterEvents.STATE_CHANGED, this.serialize()); @@ -39,6 +57,10 @@ export class Center implements SmppSession { private _defaultSingleJob!: Job; + get defaultSingleJob(): Job { + return this._defaultSingleJob; + } + set defaultSingleJob(value: Job) { this._defaultSingleJob = value; this.eventEmitter.emit(CenterEvents.STATE_CHANGED, this.serialize()); @@ -101,23 +123,60 @@ export class Center implements SmppSession { } sendMultiple(job: Job): Promise { - throw new Error("NEBI"); + return new Promise((resolve, reject) => { + this.validateSessions(reject); + if (!job.count || !job.perSecond) { + reject(`Center-${this._id} sendMultiple failed: invalid job, missing fields`); + } + this.logger.log1(`Center-${this._id} sending multiple messages: ${JSON.stringify(job)}`); + + let counter = 0; + let previousUpdateCounter = 0; + + this.counterUpdateTimer = new NanoTimer(); + this.counterUpdateTimer.setInterval(() => { + if (previousUpdateCounter !== counter) { + this.eventEmitter.emit(ClientEvents.MESSAGE_SEND_COUNTER_UPDATE_EVENT, counter); + previousUpdateCounter = counter; + } + }, '', `${MESSAGE_SEND_UPDATE_DELAY / 1000} s`); + + let count = job.count || 1; + let interval = 1 / (job.perSecond || 1); + this.sendTimer = new NanoTimer(); + this.sendTimer.setInterval(() => { + if (count > 0 && counter >= count) { + this.cancelSendInterval(); + } else { + this.sendPdu(job.pdu, true) + .catch(e => this.logger.log1(`Error sending message: ${e}`)); + counter++; + } + }, '', `${interval} s`); + resolve(); + }); } sendMultipleDefault(): Promise { - throw new Error("NEBI"); + return this.sendMultiple(this.defaultMultipleJob); } sendPdu(pdu: object, force?: boolean): Promise { - throw new Error("NEBI"); + return new Promise((resolve, reject) => { + if (!force) { + this.validateSessions(reject); + } + this.logger.log5(`Center-${this._id} sending PDU: ${JSON.stringify(pdu)}`); + this.getNextSession().send(pdu, (replyPdu: object) => resolve(replyPdu)); + }); } sendSingle(job: Job): Promise { - throw new Error("NEBI"); + return this.sendPdu(job.pdu); } sendSingleDefault(): Promise { - throw new Error("NEBI"); + return this.sendPdu(this.defaultSingleJob.pdu); } serialize(): object { @@ -135,6 +194,21 @@ export class Center implements SmppSession { throw new Error("NEBI"); } + private validateSessions(reject: (reason?: any) => void) { + if (this.sessions.length === 0) { + reject(`No clients connected`); + } + } + + private getNextSession(): any { + if (this.sessions.length === 0) { + return null; + } + let session = this.sessions[this.nextSession]; + this.nextSession = (this.nextSession + 1) % this.sessions.length; + return session; + } + private eventBindTransciever(session: any, pdu: any) { this.logger.log1(`Center-${this._id} got a bind_transciever with system_id ${pdu.system_id} and password ${pdu.password}`); session.pause(); @@ -174,6 +248,7 @@ export class Center implements SmppSession { private eventSessionClose(session: any): void { this.logger.log1(`A client disconnected from center-${this._id}`); this.sessions = this.sessions.filter((s: any) => s !== session); + this.nextSession = 0; this.pendingSessions = this.pendingSessions.filter((s: any) => s !== session); this.updateStatus(); } diff --git a/src/Center/CenterPDUProcessor.ts b/src/Center/CenterPDUProcessor.ts new file mode 100644 index 0000000..c7e22a4 --- /dev/null +++ b/src/Center/CenterPDUProcessor.ts @@ -0,0 +1,3 @@ +export interface CenterPDUProcessor { + +} \ No newline at end of file diff --git a/src/Client/Client.ts b/src/Client/Client.ts index 3dd4f3c..747706a 100644 --- a/src/Client/Client.ts +++ b/src/Client/Client.ts @@ -35,7 +35,7 @@ export class Client implements SmppSession { this.eventEmitter = new EventEmitter(); this.logger = new Logger(`Client-${id}`); - this.setStatus(ClientStatus.NOT_CONNECTED) + this.status = ClientStatus.NOT_CONNECTED; this.initialize(); } @@ -90,11 +90,6 @@ export class Client implements SmppSession { return this.defaultMultipleJob; } - setStatus(status: ClientStatus): void { - this._status = status; - this.eventEmitter.emit("status", status); - } - initialize(): void { this.defaultSingleJob = Job.createEmptySingle(); this.defaultMultipleJob = Job.createEmptyMultiple(); @@ -112,10 +107,10 @@ export class Client implements SmppSession { } this.logger.log1(`Client-${this._id} connecting to ${this._url}`); - this.setStatus(ClientStatus.CONNECTING); + this.status = ClientStatus.CONNECTING; this.connectSession().then(resolve, ((err: any) => { this.logger.log1(`Client-${this._id} connection failed: ${err}`); - this.setStatus(ClientStatus.NOT_CONNECTED); + this.status = ClientStatus.NOT_CONNECTED; this.session.close(); reject(err); })); @@ -130,7 +125,7 @@ export class Client implements SmppSession { this.session.bind_transceiver({ system_id: this._username, password: this._password, }, this.eventBindReply.bind(this)); - this.setStatus(ClientStatus.BINDING); + this.status = ClientStatus.BINDING; }); return this.bindPromise; } @@ -152,7 +147,7 @@ export class Client implements SmppSession { return new Promise((resolve, reject) => { this.logger.log1(`Client-${this._id} closing connection`); this.session.close(); - this.setStatus(ClientStatus.NOT_CONNECTED); + this.status = ClientStatus.NOT_CONNECTED; resolve(); }); } @@ -164,8 +159,7 @@ export class Client implements SmppSession { this.validateBound(reject); } this.logger.log5(`Client-${this._id} sending PDU: ${JSON.stringify(pdu)}`); - this.session.send(pdu); - resolve(pdu); + this.session.send(pdu, (replyPdu: object) => resolve(replyPdu)); }); } @@ -178,7 +172,7 @@ export class Client implements SmppSession { } this.logger.log1(`Client-${this._id} sending multiple messages: ${JSON.stringify(job)}`); - this.setStatus(ClientStatus.BUSY); + this.status = ClientStatus.BUSY; let counter = 0; let previousUpdateCounter = 0; @@ -218,7 +212,7 @@ export class Client implements SmppSession { this.sendTimer = null; this.counterUpdateTimer = null; } - this.setStatus(ClientStatus.BOUND); + this.status = ClientStatus.BOUND; } on(event: string, callback: (...args: any[]) => void): void { @@ -253,7 +247,7 @@ export class Client implements SmppSession { private eventSessionConnected(): void { this.logger.log1(`Client-${this._id} connected to ${this._url}`); - this.setStatus(ClientStatus.CONNECTED); + this.status = ClientStatus.CONNECTED; if (this.connectPromise) { this.connectPromise.resolve(); } @@ -266,26 +260,26 @@ export class Client implements SmppSession { private eventSessionError(pdu: any): void { this.logger.log1(`Client-${this._id} error on ${this._url}`); - this.setStatus(ClientStatus.NOT_CONNECTED); + this.status = ClientStatus.NOT_CONNECTED; this.rejectPromises(pdu); } private eventSessionClose(): void { this.logger.log1(`Client-${this._id} closed on ${this._url}`); - this.setStatus(ClientStatus.NOT_CONNECTED); + this.status = ClientStatus.NOT_CONNECTED; this.rejectPromises(); } private eventBindReply(pdu: any): void { if (pdu.command_status === 0) { this.logger.log1(`Client-${this._id} bound to ${this._url}`); - this.setStatus(ClientStatus.BOUND); + this.status = ClientStatus.BOUND; if (this.bindPromise) { this.bindPromise.resolve(); } } else { this.logger.log1(`Client-${this._id} bind failed to ${this.url}`); - this.setStatus(ClientStatus.CONNECTED); + this.status = ClientStatus.CONNECTED; if (this.bindPromise) { this.bindPromise.reject(pdu); } diff --git a/src/main.ts b/src/main.ts index 8db138f..0319fc2 100644 --- a/src/main.ts +++ b/src/main.ts @@ -21,6 +21,7 @@ const app = express(); const SERVER_PORT: number = Number(process.env.SERVER_PORT) || 8190; const WS_SERVER_PORT: number = Number(process.env.WS_SERVER_PORT) || 8191; const CENTER_SESSIONS_FILE: string = process.env.CENTER_SESSIONS_FILE || "center_sessions.json"; +const MESSAGE_SEND_UPDATE_DELAY: number = Number(process.env.MESSAGE_SEND_UPDATE_DELAY) || 500; // TODO: Add support for encodings // TODO: Implement some sort of metrics on frontend by counting the pdus