diff --git a/src/Client.ts b/src/Client.ts index 47822a1..b64505c 100644 --- a/src/Client.ts +++ b/src/Client.ts @@ -1,26 +1,33 @@ import EventEmitter from "events"; -import ClientStatus from "./clientStatus"; -import {Job} from "./job"; -import Logger from "./logger"; +import ClientStatus from "./ClientStatus"; +import {Job} from "./Job"; +import Logger from "./Logger"; import PersistentPromise from "./PersistentPromise"; -import {SmppSession} from "./smppSession"; +import {SmppSession} from "./SmppSession"; +const NanoTimer = require('nanotimer'); const smpp = require("smpp"); const AUTO_ENQUIRE_LINK_PERIOD: number = Number(process.env.AUTO_ENQUIRE_LINK_PERIOD) || 500; +const MESSAGE_SEND_UPDATE_DELAY: number = Number(process.env.MESSAGE_SEND_UPDATE_DELAY) || 500; export class Client implements SmppSession { public static ClientEvents = { - STATUS_CHANGED: "STATUS_CHANGED", STATE_CHANGED: "STATE_CHANGED", ANY_PDU: "ANY_PDU", + STATUS_CHANGED: "STATUS_CHANGED", + STATE_CHANGED: "STATE_CHANGED", + ANY_PDU: "ANY_PDU", + MESSAGE_SEND_COUNTER_UPDATE_EVENT: "MESSAGE_SEND_COUNTER_UPDATE_EVENT" } - defaultSingleJob?: Job; - defaultMultipleJob?: Job; + defaultSingleJob: Job = Job.createEmptySingle(); + defaultMultipleJob: Job = Job.createEmptyMultiple(); private readonly eventEmitter: EventEmitter; private readonly logger: Logger; private readonly id: number; private session?: any; private connectPromise: PersistentPromise | null = null; private bindPromise: PersistentPromise | null = null; + private sendTimer: any | null = null; + private counterUpdateTimer: any | null = null; constructor(id: number, url: string, username: string, password: string) { this.id = id; @@ -105,6 +112,7 @@ export class Client implements SmppSession { this.session.bind_transceiver({ system_id: this._username, password: this._password, }, this.eventBindReply.bind(this)); + this.setStatus(ClientStatus.BINDING); }); return this.bindPromise; } @@ -116,23 +124,81 @@ export class Client implements SmppSession { } serialize(): string { - throw new Error("Method not implemented."); + return JSON.stringify({ + id: this.id, url: this._url, username: this._username, password: this._password, status: this._status, + defaultSingleJob: this.defaultSingleJob, defaultMultipleJob: this.defaultMultipleJob, + }); } close(): Promise { - throw new Error("Method not implemented."); + return new Promise((resolve, reject) => { + this.session.close(); + resolve(); + }); } - sendPdu(pdu: object): Promise { - throw new Error("Method not implemented."); + sendPdu(pdu: object, force?: boolean): Promise { + return new Promise((resolve, reject) => { + if (!force) { + this.validateSession(reject); + this.validateBound(reject); + } + this.logger.log5(`Client-${this.id} sending PDU: ${JSON.stringify(pdu)}`); + this.session.send(pdu); + resolve(pdu); + }); } - sendMultiple(Job: object): Promise { - throw new Error("Method not implemented."); + sendMultiple(job: Job): Promise { + return new Promise((resolve, reject) => { + this.validateSession(reject); + this.validateBound(reject); + if (!job.count || !job.perSecond) { + reject(`Client-${this.id} sendMultiple failed: invalid job, missing fields`); + } + this.logger.log1(`Client-${this.id} sending multiple messages: ${JSON.stringify(job)}`); + + this.setStatus(ClientStatus.BUSY); + + let counter = 0; + let previousUpdateCounter = 0; + + this.counterUpdateTimer = new NanoTimer(); + this.counterUpdateTimer.setInterval(() => { + if (previousUpdateCounter !== counter) { + this.eventEmitter.emit(Client.ClientEvents.MESSAGE_SEND_COUNTER_UPDATE_EVENT, counter); + previousUpdateCounter = counter; + } + }, '', `${MESSAGE_SEND_UPDATE_DELAY / 1000} s`); + + let count = job.count || 1; + let interval = 1 / (job.perSecond || 1); + this.sendTimer = new NanoTimer(); + this.sendTimer.setInterval(() => { + if (count > 0 && counter >= count) { + this.cancelSendInterval(); + } else { + this.sendPdu(job.pdu, true) + .catch(e => this.logger.log1(`Error sending message: ${e}`)); + counter++; + } + }, '', `${interval} s`); + resolve(); + }); } - sendSingle(Job: object): Promise { - throw new Error("Method not implemented."); + sendSingle(job: Job): Promise { + return this.sendPdu(job.pdu); + } + + cancelSendInterval(): void { + if (this.sendTimer) { + this.sendTimer.clearInterval(); + this.counterUpdateTimer.clearInterval(); + this.sendTimer = null; + this.counterUpdateTimer = null; + } + this.setStatus(ClientStatus.BOUND); } private connectSession(): void { @@ -214,4 +280,20 @@ export class Client implements SmppSession { } return true; } + + private validateSession(reject: (reason?: any) => void) { + if (!this.session) { + let errorMessage = `Client-${this.id} session is not defined`; + this.logger.log1(errorMessage); + reject(errorMessage); + } + } + + private validateBound(reject: (reason?: any) => void) { + if (this._status !== ClientStatus.BOUND) { + let errorMessage = `Client-${this.id} is not bound`; + this.logger.log1(errorMessage); + reject(errorMessage); + } + } } \ No newline at end of file diff --git a/src/Job.ts b/src/Job.ts index 93eb79f..ba1737b 100644 --- a/src/Job.ts +++ b/src/Job.ts @@ -1,9 +1,11 @@ +const smpp = require("smpp"); + export class Job { - pdu: object; + pdu: any; perSecond?: number; count?: number; - constructor(pdu: object, perSecond?: number, count?: number) { + constructor(pdu: any, perSecond?: number, count?: number) { this.pdu = pdu; this.perSecond = perSecond; this.count = count; @@ -12,4 +14,12 @@ export class Job { serialize(): string { return JSON.stringify(this); } + + static createEmptySingle(): Job { + return new Job({}); + } + + static createEmptyMultiple(): Job { + return new Job({}, 1, 1); + } } \ No newline at end of file diff --git a/src/SmppSession.ts b/src/SmppSession.ts index 646e564..b7cc9be 100644 --- a/src/SmppSession.ts +++ b/src/SmppSession.ts @@ -1,16 +1,20 @@ +import {Job} from "./Job"; + export interface SmppSession { - username: string, - password: string, + username: string, + password: string, - sendPdu(pdu: object): Promise; + sendPdu(pdu: object): Promise; - sendSingle(Job: object): Promise; + sendSingle(job: Job): Promise; - sendMultiple(Job: object): Promise; + sendMultiple(job: Job): Promise; - close(): Promise; + cancelSendInterval(): void; - initialize(): void; + close(): Promise; - serialize(): string; + initialize(): void; + + serialize(): string; } \ No newline at end of file diff --git a/src/main.ts b/src/main.ts index fb432f6..13508e1 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,4 +1,6 @@ -import Logger from "./logger"; +import {Client} from "./Client"; +import {Job} from "./Job"; +import Logger from "./Logger"; const smpp = require("smpp"); const fs = require("fs"); @@ -17,18 +19,20 @@ const SERVER_PORT: number = Number(process.env.SERVER_PORT) || 8190; const WS_SERVER_PORT: number = Number(process.env.WS_SERVER_PORT) || 8191; const CLIENT_SESSIONS_FILE: string = process.env.CLIENT_SESSIONS_FILE || "client_sessions.json"; const CENTER_SESSIONS_FILE: string = process.env.CENTER_SESSIONS_FILE || "center_sessions.json"; -const MESSAGE_SEND_UPDATE_DELAY: number = Number(process.env.MESSAGE_SEND_UPDATE_DELAY) || 500; // TODO: Add support for encodings // TODO: Implement some sort of metrics on frontend by counting the pdus let logger = new Logger("main"); -import {Client} from "./client"; - let client: Client = new Client(0, "smpp://localhost:7000", "test", "test"); client.connectAndBind().then(() => { - console.log("POGGIES"); + console.log("POGGIES"); + client.sendMultiple(new Job(new smpp.PDU('submit_sm', { + source_addr: "1234567890", + destination_addr: "1234567890", + short_message: "Hello World" + }), 100, 100)) }); // class ClientSession {