diff --git a/.run/main.js.run.xml b/.run/main.js.run.xml index 71c8e4f..de16bf6 100644 --- a/.run/main.js.run.xml +++ b/.run/main.js.run.xml @@ -1,5 +1,5 @@ - + \ No newline at end of file diff --git a/.run/main.ts.run.xml b/.run/main.ts.run.xml new file mode 100644 index 0000000..b4e9633 --- /dev/null +++ b/.run/main.ts.run.xml @@ -0,0 +1,18 @@ + + + + + + + + \ No newline at end of file diff --git a/WebsocketTest.ts b/WebsocketTest.ts new file mode 100644 index 0000000..7be3c06 --- /dev/null +++ b/WebsocketTest.ts @@ -0,0 +1,30 @@ +// @ts-ignore +const Websocket = require('ws'); + +const ws = new Websocket('ws://localhost:8191'); +ws.on('open', function open() { + ws.send('something'); +}); + +interface Animal { + doNoise(): void; +} + +class Dog implements Animal { + doNoise(): void { + console.log("woof"); + } +} + +class Cat implements Animal { + doNoise(): void { + console.log("meow"); + } +} + +const dog = new Dog(); +dog.doNoise(); +const cat = new Cat(); +cat.doNoise(); +let animals: Animal[] = [dog, cat]; +animals.forEach(animal => animal.doNoise()); \ No newline at end of file diff --git a/main.js b/main.js deleted file mode 100644 index 711d25f..0000000 --- a/main.js +++ /dev/null @@ -1,1738 +0,0 @@ -const smpp = require("smpp"); -const fs = require("fs"); -const path = require("path"); -const EventEmitter = require("events"); -const NanoTimer = require('nanotimer'); - -const express = require("express"); -const app = express(); -const bodyParser = require("body-parser"); -const WebSocket = require("ws"); -const {PDU} = require("smpp"); - -const SERVER_PORT = process.env.SERVER_PORT || 8190; -const WS_SERVER_PORT = process.env.WS_SERVER_PORT || 8191; -const CLIENT_SESSIONS_FILE = process.env.CLIENT_SESSIONS_FILE || "client_sessions.json"; -const CENTER_SESSIONS_FILE = process.env.CENTER_SESSIONS_FILE || "center_sessions.json"; -const MESSAGE_SEND_UPDATE_DELAY = process.env.MESSAGE_SEND_UPDATE_DELAY || 500; - -// TODO: Add support for encodings -// TODO: Implement some sort of metrics on frontend by counting the pdus - -[ - 'debug', - 'log', - 'warn', - 'error' -].forEach((methodName) => { - const originalLoggingMethod = console[methodName]; - console[methodName] = (firstArgument, ...otherArguments) => { - const originalPrepareStackTrace = Error.prepareStackTrace; - Error.prepareStackTrace = (_, stack) => stack; - const callee = new Error().stack[2]; - Error.prepareStackTrace = originalPrepareStackTrace; - const relativeFileName = path.relative(process.cwd(), callee.getFileName()); - const prefix = `${relativeFileName}:${callee.getLineNumber()}:`; - if (typeof firstArgument === 'string') { - originalLoggingMethod(prefix + ' ' + firstArgument, ...otherArguments); - } else { - originalLoggingMethod(prefix, firstArgument, ...otherArguments); - } - }; -}); - -class Logger { - constructor(clazz) { - this.clazz = clazz; - this.logLevel = typeof LOG_LEVEL !== "undefined" ? LOG_LEVEL : 6; - this.logFile = typeof LOG_FILE !== "undefined" ? LOG_FILE : null; - - this.logFileWriteStream = null; - if (this.logFile != null) { - this.logFileWriteStream = fs.createWriteStream(this.logFile, {flags: 'a'}); - } - } - - leftPad(str, len, char) { - str = String(str); - let i = -1; - len = len - str.length; - if (char === undefined) { - char = " "; - } - while (++i < len) { - str = char + str; - } - return str; - } - - log(...args) { - let logLevel = args[0]; - let data = args[1]; - if (typeof data === "object") { - data = JSON.stringify(data); - } - let date = new Date(); - - let year = this.leftPad(date.getFullYear(), 4); - let month = this.leftPad(date.getMonth() + 1, 2, 0); - let day = this.leftPad(date.getDate(), 2, 0); - - let hours = this.leftPad(date.getHours(), 2, 0); - let minutes = this.leftPad(date.getMinutes(), 2, 0); - let seconds = this.leftPad(date.getSeconds(), 2, 0); - let milliseconds = this.leftPad(date.getMilliseconds(), 3, 0); - - let datePrefix = `[${day}/${month}/${year}-${hours}:${minutes}:${seconds}:${milliseconds}]` - - // let out = `${datePrefix} [${this.clazz}] (${logLevel}) ${data}`; - let out = datePrefix.padEnd(30, ' ') + `[${this.clazz}]`.padEnd(28, ' ') + `(${logLevel})`.padEnd(8, ' ') + data; - if (args[0] <= this.logLevel || 6) { - console.log(out); - } - if (this.logFileWriteStream != null) { - this.logFileWriteStream.write(out + "\n"); - } - } - - log1 = this.log.bind(this, 1); - log2 = this.log.bind(this, 2); - log3 = this.log.bind(this, 3); - log4 = this.log.bind(this, 4); - log5 = this.log.bind(this, 5); - log6 = this.log.bind(this, 6); -} - -let logger = new Logger("main"); - -class ClientSessionStatus { - static CONNECTING = "CONNECTING"; - static CONNECTED = "CONNECTED"; - static BUSY = "BUSY"; - static BINDING = "BINDING"; - static BOUND = "BOUND"; - static NOT_CONNECTED = "NOT CONNECTED"; -} - -class ClientSession { - // TODO: Enable requesting DRs - auto_enquire_link_period = 500; - eventEmitter = new EventEmitter(); - configuredMessageJob = { - source: "", - destination: "", - message: "", - }; - configuredMultiMessageJob = { - source: "", - destination: "", - message: "", - interval: 1000, - count: 1, - }; - - connectingPromise = { - promise: null, - resolve: null, - reject: null - } - disconnectingPromise = { - promise: null, - resolve: null, - reject: null - } - bindingPromise = { - promise: null, - resolve: null, - reject: null - } - - static STATUS_CHANGED_EVENT = "statusChanged"; - static ANY_PDU_EVENT = "*"; - static MESSAGE_SEND_COUNTER_UPDATE_EVENT = "messageSendCounterUpdate"; - - constructor(id, url, username, password) { - this.id = id; - this.logger = new Logger(`ClientSession-${this.id}`); - this.url = url; - - this.username = username; - this.password = password; - - this.logger.log1(`Client created with url ${this.url}, username ${this.username}, password ${this.password} and ID ${this.id}`); - this.status = ClientSessionStatus.NOT_CONNECTED; - } - - setUsername(username) { - this.username = username; - this.refresh(); - } - - setPassword(password) { - this.password = password; - this.refresh(); - } - - refresh() { - this.logger.log1(`Refreshing client with url ${this.url} and id ${this.id}`); - let status = this.status; - this.close().catch(err => err); - if (status === ClientSessionStatus.CONNECTED) { - this.connect().catch(err => this.logger.log1(err)); - } - if (status === ClientSessionStatus.BOUND) { - this.connect().then(() => { - this.bind().catch((err => this.logger.log1(err))); - }).catch((err => this.logger.log1(err))); - } - } - - setStatus(newStatus) { - this.status = newStatus; - this.eventEmitter.emit(ClientSession.STATUS_CHANGED_EVENT, newStatus); - } - - connect() { - this.connectingPromise.promise = new Promise((resolve, reject) => { - if (this.status !== ClientSessionStatus.NOT_CONNECTED) { - this.logger.log1("Client already connected"); - reject("Client already connected"); - return; - } - this.logger.log1(`Client connecting to ${this.url}`); - this.setStatus(ClientSessionStatus.CONNECTING); - try { - this.session = smpp.connect({ - url: this.url, - auto_enquire_link_period: this.auto_enquire_link_period, - }, this.connected.bind(this)); - this.session.on('error', this.error.bind(this)); - this.session.on('close', this.closed.bind(this)); - } catch (e) { - this.logger.log1("Client connection failed to " + this.url); - this.setStatus(ClientSessionStatus.NOT_CONNECTED); - this.session.close(); - reject("Client connection failed to " + this.url); - } - this.connectingPromise.resolve = resolve; - this.connectingPromise.reject = reject; - }); - return this.connectingPromise.promise; - } - - closed() { - this.logger.log1(`Client closed connection to ${this.url}`); - this.setStatus(ClientSessionStatus.NOT_CONNECTED); - } - - error(error) { - if (error.code === "ETIMEOUT") { - this.logger.log1("Client connection timed out to " + this.url); - } else if (error.code === "ECONNREFUSED") { - this.logger.log1("Client connection refused to " + this.url); - } else { - this.logger.log1("Client connection failed to " + this.url); - } - this.session.close(); - this.connectingPromise.reject(error); - this.setStatus(ClientSessionStatus.NOT_CONNECTED); - } - - connected() { - this.logger.log1("Client connected to " + this.url); - this.setStatus(ClientSessionStatus.CONNECTED); - this.session.on('debug', (type, msg, payload) => { - if (type.includes('pdu.')) { - this.eventEmitter.emit(msg, payload); - this.eventEmitter.emit(ClientSession.ANY_PDU_EVENT, payload); - } - }); - this.session.on('pdu', this.sessionPdu.bind(this)); - this.connectingPromise.resolve(); - } - - sessionPdu(pdu) { - if (pdu.command === 'deliver_sm') { - this.session.send(pdu.response()); - } - } - - bind() { - this.bindingPromise.promise = new Promise((resolve, reject) => { - if (this.status !== ClientSessionStatus.CONNECTED) { - this.logger.log1(`Cannot bind, client not connected to ${this.url}`); - reject(`Cannot bind, client not connected to ${this.url}`); - return; - } - - this.logger.log1("Trying to bind to " + this.url) - if (this.status !== ClientSessionStatus.CONNECTED) { - this.logger.log1(`Cannot bind, client not connected to ${this.url}`); - return; - } - if (!!!this.username || !!!this.password) { - this.logger.log1(`Cannot bind client, username or password not set`); - return; - } - this.setStatus(ClientSessionStatus.BINDING); - this.logger.log1(`Client binding to ${this.url} with username ${this.username} and password ${this.password}`); - - this.session.bind_transceiver({ - system_id: this.username, - password: this.password, - registered_delivery: 1 - }, this.bindReply.bind(this)); - this.bindingPromise.resolve = resolve; - this.bindingPromise.reject = reject; - }); - return this.bindingPromise.promise; - } - - bindReply(pdu) { - if (pdu.command_status === 0) { - this.logger.log1(`Client bound to ${this.url} with username ${this.username} and password ${this.password}`); - this.setStatus(ClientSessionStatus.BOUND); - this.bindingPromise.resolve(); - } else { - this.logger.log1(`Client bind failed to ${this.url} with username ${this.username} and password ${this.password}`); - this.setStatus(ClientSessionStatus.CONNECTED); - this.bindingPromise.reject(); - } - } - - send(source, destination, message, force=false) { - return new Promise((resolve, reject) => { - if (!force && !this.canSend()) { - this.logger.log1(`Client cannot send message, not bound to ${this.url} or busy`); - reject(`Client cannot send message, not bound to ${this.url} or busy`); - return; - } - this.logger.log1(`Client sending message from ${source} to ${destination} with message ${message}`); - this.session.submit_sm({ - source_addr: source, - destination_addr: destination, - short_message: message, - registered_delivery: 1, - message_id: 10 - }, pdu => { - resolve(pdu); - }); - }); - } - - sendDefault() { - return this.send(this.configuredMessageJob.source, this.configuredMessageJob.destination, this.configuredMessageJob.message); - } - - configureDefault(source, destination, message) { - this.configuredMessageJob = { - source: source, - destination: destination, - message: message - } - } - - sendOnInterval(source, destination, message, interval, count) { - return new Promise((resolve, reject) => { - if (!this.canSend()) { - this.logger.log1(`Client cannot send many message, not bound to ${this.url} or busy`); - reject(`Client cannot send many message, not bound to ${this.url} or busy`); - return; - } - this.setStatus(ClientSessionStatus.BUSY); - let counter = 0; - let previousUpdateCounter = 0; - - this.updateTimer = new NanoTimer(); - this.updateTimer.setInterval(() => { - if (previousUpdateCounter !== counter) { - this.eventEmitter.emit(ClientSession.MESSAGE_SEND_COUNTER_UPDATE_EVENT, counter); - previousUpdateCounter = counter; - } - }, '', `${MESSAGE_SEND_UPDATE_DELAY / 1000} s`); - - this.timer = new NanoTimer(); - this.timer.setInterval(() => { - if (count > 0 && counter >= count) { - this.cancelSendInterval(); - } else { - this.send(source, destination, message, true) - .catch(e => this.logger.log1(`Error sending message: ${e}`)); - counter++; - } - }, '', `${interval} s`); - resolve(); - }); - } - - sendDefaultInterval() { - return this.sendOnInterval(this.configuredMultiMessageJob.source, this.configuredMultiMessageJob.destination, this.configuredMultiMessageJob.message, - this.configuredMultiMessageJob.interval, this.configuredMultiMessageJob.count); - } - - configureDefaultInterval(source, destination, message, interval, count) { - this.configuredMultiMessageJob = { - source: source, - destination: destination, - message: message, - interval: interval, - count: count - } - } - - cancelSendInterval() { - if (!!this.timer) { - this.timer.clearInterval(); - this.updateTimer.clearInterval(); - this.timer = null; - this.updateTimer = null; - } - this.setStatus(ClientSessionStatus.BOUND); - } - - close() { - this.disconnectingPromise.promise = new Promise((resolve, reject) => { - if (this.status !== ClientSessionStatus.BOUND && this.status !== ClientSessionStatus.CONNECTED) { - this.logger.log1(`Cannot close client, not bound to ${this.url}`); - reject(`Cannot close client, not bound to ${this.url}`); - return; - } - this.logger.log1(`Client closing connection to ${this.url}`); - this.session.close(); - this.setStatus(ClientSessionStatus.NOT_CONNECTED); - resolve(); - }); - return this.disconnectingPromise.promise; - } - - on(event, callback) { - this.eventEmitter.on(event, callback); - } - - serialize() { - return { - id: this.id, - url: this.url, - username: this.username, - password: this.password, - status: this.status, - configuredMessageJob: this.configuredMessageJob, - configuredMultiMessageJob: this.configuredMultiMessageJob - } - } - - canSend() { - return this.status === ClientSessionStatus.BOUND; - } -} - -class ClientSessionManager { - sessionIdCounter = 0; - logger = new Logger("ClientSessionManager"); - - constructor() { - this.sessions = {}; - } - - createSession(url, username, password) { - let urlB64 = btoa(url); - if (this.sessions[urlB64]) { - this.logger.log1(`Client to ${url} already exists`); - return this.sessions[urlB64]; - } - this.logger.log1(`Creating client to ${url} with username ${username} and password ${password}`); - let session = new ClientSession(this.sessionIdCounter++, url, username, password); - this.addSession(session); - return session; - } - - addSession(session) { - this.logger.log1(`Adding client with ID ${session.id}`); - this.sessions[btoa(session.url)] = session; - } - - deleteSession(session) { - this.logger.log1(`Deleting client with ID ${session.id}`); - if (session.status === ClientSessionStatus.BOUND || session.status === ClientSessionStatus.CONNECTED) { - session.close(); - } - delete this.sessions[btoa(session.url)]; - } - - getSession(id) { - return Object.values(this.sessions).find((session) => { - return session.id == id; - }); - } - - serialize() { - return Object.values(this.sessions).map((session) => { - return session.serialize(); - }); - } - - cleanup() { - this.logger.log1(`Saving clients to ${CLIENT_SESSIONS_FILE}...`); - fs.writeFileSync(CLIENT_SESSIONS_FILE, JSON.stringify(this.serialize(), null, 4)); - } - - startup() { - try { - let sessions = fs.readFileSync(CLIENT_SESSIONS_FILE); - sessions = JSON.parse(sessions); - this.logger.log1(`Loaded ${sessions.length} clients from ${CLIENT_SESSIONS_FILE}...`); - sessions.forEach(session => { - let sessionObj = this.createSession(session.url, session.username, session.password); - sessionObj.configuredMessageJob = session.configuredMessageJob; - sessionObj.configuredMultiMessageJob = session.configuredMultiMessageJob; - }); - } catch (e) { - this.logger.log1(`Error loading clients from ${CLIENT_SESSIONS_FILE}: ${e}`); - } - } -} - -class CenterSessionStatus { - static CONNECTED = "CONNECTED"; - static WAITING_CONNECTION = "WAITING CONNECTION"; - static CONNECTION_PENDING = "CONNECTION PENDING"; - static BUSY = "BUSY"; -} - -class CenterMode { - static DEBUG = "DEBUG"; - static ECHO = "ECHO"; - static DR = "DR"; -} - -class CenterSession { - // TODO: If the port is in use this throws an exception, catch it and log it - eventEmitter = new EventEmitter(); - sessions = []; - nextSession = 0; - mode = CenterMode.DEBUG; - configuredMessageJob = { - source: "", - destination: "", - message: "", - }; - configuredMultiMessageJob = { - source: "", - destination: "", - message: "", - interval: 1000, - count: 1, - }; - - disconnectingPromise = { - promise: null, - resolve: null, - reject: null - } - - static STATUS_CHANGED_EVENT = "statusChanged"; - static MODE_CHANGED_EVENT = "modeChanged"; - static SESSION_CHANGED_EVENT = "sessionChanged"; - static ANY_PDU_EVENT = "*"; - static MESSAGE_SEND_COUNTER_UPDATE_EVENT = "messageSendCounterUpdate"; - - constructor(id, port, username, password) { - this.id = id; - this.logger = new Logger(`CenterSession-${this.id}`); - this.port = port; - - this.username = username; - this.password = password; - - this.server = smpp.createServer({}, this.connected.bind(this)); - this.server.on('debug', (type, msg, payload) => { - if (type.includes('pdu.')) { - this.eventEmitter.emit(msg, payload); - this.eventEmitter.emit(CenterSession.ANY_PDU_EVENT, payload); - } - }); - this.server.listen(this.port); - - this.logger.log1(`Center created with port ${this.port}, username ${this.username}, password ${this.password} and ID ${this.id}`); - this.status = CenterSessionStatus.WAITING_CONNECTION; - } - - setStatus(newStatus) { - this.status = newStatus; - this.eventEmitter.emit(CenterSession.STATUS_CHANGED_EVENT, newStatus); - } - - setUsername(username) { - this.username = username; - this.refresh(); - } - - setPassword(password) { - this.password = password; - this.refresh(); - } - - setMode(mode) { - this.mode = Object.values(CenterMode)[mode]; - this.eventEmitter.emit(CenterSession.MODE_CHANGED_EVENT, mode); - } - - refresh() { - this.close().catch(err => { - }); - } - - error(error) { - if (error.code === "ETIMEOUT") { - this.logger.log1("Center connection timed out to " + this.port); - } else if (error.code === "ECONNREFUSED") { - this.logger.log1("Center connection refused to " + this.port); - } else { - this.logger.log1("Center connection failed to " + this.port); - } - this.logger.log1(`Session on center croaked. Error: ${error}`); - this.setStatus(CenterSessionStatus.CONNECTION_PENDING); - } - - connected(session) { - this.logger.log1("Center got a connection on port " + this.port); - this.setStatus(CenterSessionStatus.CONNECTION_PENDING); - - session.on('error', this.error.bind(this)); - session.on('close', this.sessionClosed.bind(this, session)); - - function bind_transciever(pdu) { - this.logger.log1(`Center got a bind_transciever on port ${this.port} with system_id ${pdu.system_id} and password ${pdu.password}`); - session.pause(); - if (pdu.system_id === this.username && pdu.password === this.password) { - this.logger.log1(`Center session connection successful`); - session.send(pdu.response()); - session.resume(); - session.on('pdu', this.sessionPdu.bind(this, session)); - this.addSession(session); - if (this.sessions.length > 0) { - this.setStatus(CenterSessionStatus.CONNECTED); - } - session.on('debug', (type, msg, payload) => { - if (type.includes('pdu.')) { - this.eventEmitter.emit(msg, payload); - this.eventEmitter.emit(CenterSession.ANY_PDU_EVENT, payload); - } - }); - } else { - this.logger.log1(`Center session connection failed, invalid credentials`); - session.send(pdu.response({ - command_status: smpp.ESME_RBINDFAIL - })); - if (this.sessions.length === 0) { - this.setStatus(CenterSessionStatus.WAITING_CONNECTION); - } - session.close(); - this.session = null; - } - } - - session.on('bind_transceiver', bind_transciever.bind(this)); - } - - sessionClosed(session) { - this.logger.log1(`Center session closed on port ${this.port}`); - delete this.sessions[this.sessions.indexOf(session)]; - this.sessions = this.sessions.filter(Boolean); - if (this.sessions.length === 0) { - this.setStatus(CenterSessionStatus.WAITING_CONNECTION); - } - } - - sessionPdu(session, pdu) { - if (pdu.command === 'submit_sm') { - session.send(pdu.response()); - if (this.mode === CenterMode.ECHO) { - this.notify(pdu.destination_addr, pdu.source_addr, pdu.short_message); - } - // TODO: Figure out how DRs work - if (this.mode === CenterMode.DR && pdu.registered_delivery === 1) { - let drPdu = new PDU('deliver_sm', { - receipted_message_id: pdu.message_id, - esm_class: 4, - message_state: 2, - length: 0, - }); - this.send(drPdu).catch(err => console.log(err)); - } - } - if (pdu.command === 'enquire_link') { - session.send(pdu.response()); - } - } - - configureDefault(source, destination, message) { - this.configuredMessageJob = { - source: source, - destination: destination, - message: message - } - } - - notifyDefault() { - return this.notify(this.configuredMessageJob.source, this.configuredMessageJob.destination, this.configuredMessageJob.message); - } - - notify(source, destination, message, force=false) { - return new Promise((resolve, reject) => { - if (!force && !this.canSend()) { - this.logger.log1(`Center cannot send message, no sessions active on ${this.port} or busy`); - reject(`Center cannot send message, no sessions active on ${this.port} or busy`); - return; - } - this.logger.log1(`Sending notify message from ${source} to ${destination} with message ${message}`); - this.getNextSession().deliver_sm({ - source_addr: source, - destination_addr: destination, - short_message: message, - }, pdu => { - resolve(pdu); - }); - }); - } - - send(pdu) { - return new Promise((resolve, reject) => { - if (!this.canSend()) { - this.logger.log1(`Center cannot send message, no sessions active on ${this.port} or busy`); - reject(`Center cannot send message, no sessions active on ${this.port} or busy`); - return; - } - this.getNextSession().send(pdu, pdu => { - resolve(pdu); - }); - }); - } - - configureDefaultInterval(source, destination, message, interval, count) { - this.configuredMultiMessageJob = { - source: source, - destination: destination, - message: message, - interval: interval, - count: count - } - } - - notifyDefaultInterval() { - return this.notifyOnInterval(this.configuredMultiMessageJob.source, this.configuredMultiMessageJob.destination, this.configuredMultiMessageJob.message, - this.configuredMultiMessageJob.interval, this.configuredMultiMessageJob.count); - } - - notifyOnInterval(source, destination, message, interval, count) { - return new Promise((resolve, reject) => { - if (!this.canSend()) { - this.logger.log1(`Center cannot send many message, no sessions active to ${this.port} or busy`); - reject(`Center cannot send many message, no sessions active to ${this.port} or busy`); - return; - } - this.setStatus(CenterSessionStatus.BUSY); - this.timer = new NanoTimer(); - let counter = 0; - let previousUpdateCounter = 0; - - this.updateTimer = new NanoTimer(); - this.updateTimer.setInterval(() => { - if (previousUpdateCounter !== counter) { - this.eventEmitter.emit(CenterSession.MESSAGE_SEND_COUNTER_UPDATE_EVENT, counter); - previousUpdateCounter = counter; - } - }, '', `${MESSAGE_SEND_UPDATE_DELAY / 1000} s`); - - this.timer.setInterval(() => { - if (count > 0 && counter >= count) { - this.cancelNotifyInterval(); - } else { - this.notify(source, destination, message, true) - .catch(e => this.logger.log1(`Error sending message: ${e}`)); - counter++; - } - }, '', `${interval} s`); - resolve(); - }); - } - - cancelNotifyInterval() { - if (!!this.timer) { - this.timer.clearInterval(); - this.updateTimer.clearInterval(); - this.timer = null; - this.updateTimer = null; - } - this.setStatus(CenterSessionStatus.CONNECTED); - } - - getNextSession() { - if (this.sessions.length === 0) { - return null; - } - let session = this.sessions[this.nextSession]; - this.nextSession = (this.nextSession + 1) % this.sessions.length; - return session; - } - - getSessions() { - return this.sessions.map(session => { - return this.mapSession(session); - }) - } - - mapSession(session) { - return { - closed: session.closed, - paused: session.paused, - remoteAddress: session.remoteAddress, - remotePort: session.remotePort, - _id: session._id, - deleted: session.deleted || false - } - } - - closeSession(sessionId) { - this.logger.log1(`Closing center session ${sessionId}`); - let session = this.sessions.find(session => session._id == sessionId); - if (!!session) { - session.close(); - this.eventEmitter.emit(CenterSession.SESSION_CHANGED_EVENT, this.mapSession(session)); - } - } - - deleteSession(sessionId) { - this.logger.log1(`Deleting center session ${sessionId}`); - let session = this.sessions.find(session => session._id == sessionId); - if (!!session) { - session.close(); - session.destroy(); - session.deleted = true; - this.eventEmitter.emit(CenterSession.SESSION_CHANGED_EVENT, this.mapSession(session)); - delete this.sessions[this.sessions.indexOf(session)]; - this.sessions = this.sessions.filter(Boolean); - } - } - - addSession(session) { - this.logger.log1(`Adding center session ${session._id}`); - let sessionInfo = this.mapSession(session); - this.eventEmitter.emit(CenterSession.SESSION_CHANGED_EVENT, sessionInfo); - this.sessions.push(session); - } - - close() { - this.disconnectingPromise.promise = new Promise((resolve, reject) => { - if (this.status !== CenterSessionStatus.CONNECTED) { - this.logger.log1(`Cannot close center, no sessions active ${this.port}`); - reject(`Cannot close center, no sessions active ${this.port}`); - return; - } - this.sessions.forEach(session => { - session.close(); - }); - this.sessions = []; - this.setStatus(CenterSessionStatus.WAITING_CONNECTION); - resolve(); - }); - return this.disconnectingPromise.promise; - } - - on(event, callback) { - this.eventEmitter.on(event, callback); - } - - serialize() { - return { - id: this.id, - port: this.port, - username: this.username, - password: this.password, - status: this.status, - activeSessions: this.sessions.length, - mode: this.mode, - configuredMessageJob: this.configuredMessageJob, - configuredMultiMessageJob: this.configuredMultiMessageJob, - } - } - - canSend() { - return this.status === CenterSessionStatus.CONNECTED; - } -} - -class CenterSessionManager { - sessionIdCounter = 0; - logger = new Logger("CenterSessionManager"); - - constructor() { - this.servers = {}; - } - - createSession(port, username, password) { - if (this.servers[port]) { - this.logger.log1(`Center listening on ${port} already exists`); - return this.servers[port]; - } - this.logger.log1(`Creating center listening on ${port} with username ${username} and password ${password}`); - let session = new CenterSession(this.sessionIdCounter++, port, username, password); - this.addSession(session); - return session; - } - - addSession(server) { - this.logger.log1(`Adding center with ID ${server.id}`); - this.servers[server.port] = server; - } - - deleteSession(server) { - this.logger.log1(`Deleting center with ID ${server.id}`); - if (server.status === CenterSessionStatus.CONNECTED) { - server.close(); - } - delete this.servers[server.port]; - } - - getSession(id) { - return Object.values(this.servers).find((session) => { - return session.id == id; - }); - } - - serialize() { - return Object.values(this.servers).map((servers) => { - return servers.serialize(); - }); - } - - cleanup() { - this.logger.log1(`Saving centers to ${CENTER_SESSIONS_FILE}...`); - fs.writeFileSync(CENTER_SESSIONS_FILE, JSON.stringify(this.serialize(), null, 4)); - } - - startup() { - try { - let servers = fs.readFileSync(CENTER_SESSIONS_FILE); - servers = JSON.parse(servers); - this.logger.log1(`Loaded ${servers.length} centers from ${CENTER_SESSIONS_FILE}...`); - servers.forEach(server => { - let createdServer = this.createSession(server.port, server.username, server.password); - if (!!server.mode) { - createdServer.mode = server.mode; - } - createdServer.configuredMessageJob = server.configuredMessageJob; - createdServer.configuredMultiMessageJob = server.configuredMultiMessageJob; - }); - } catch (e) { - this.logger.log1(`Error loading centers from ${CLIENT_SESSIONS_FILE}: ${e}`); - } - } - - getAvailableCenterModes() { - let modes = Object.values(CenterMode); - return modes.reduce((acc, curr, idx) => { - acc[idx] = curr; - return acc; - }, {}); - } -} - - -class HTTPServer { - logger = new Logger("HTTPServer"); - - constructor() { - app.use(bodyParser.json()); - - app.get('/api/client', this.getClientSessions.bind(this)); - app.post('/api/client', this.createClientSession.bind(this)); - app.get('/api/client/:id', this.getClientSessionById.bind(this)); - app.patch('/api/client/:id', this.patchClientSession.bind(this)); - app.put('/api/client/:id/send', this.configSend.bind(this)); - app.post('/api/client/:id/send/default', this.sendConfig.bind(this)); - app.post('/api/client/:id/send', this.send.bind(this)); - app.put('/api/client/:id/sendMany', this.configSendMany.bind(this)); - app.post('/api/client/:id/sendMany/default', this.sendManyConfig.bind(this)); - app.post('/api/client/:id/sendMany', this.sendMany.bind(this)); - app.delete('/api/client/:id/sendMany', this.cancelSendMany.bind(this)); - app.post('/api/client/:id/bind', this.bindClientSession.bind(this)); - app.post('/api/client/:id/connect', this.connectClientSession.bind(this)); - app.delete('/api/client/:id/connect', this.disconnectClientSession.bind(this)); - app.delete('/api/client/:id', this.deleteClientSession.bind(this)); - - app.get('/api/center', this.getCenterSessions.bind(this)); - app.post('/api/center', this.createCenterSession.bind(this)); - app.get('/api/center/modes', this.getAvailableModes.bind(this)); - app.get('/api/center/:id', this.getCenterServerById.bind(this)); - app.get('/api/center/:id/session', this.getCenterServerSessionsById.bind(this)); - app.delete('/api/center/:id/session/:sessionId', this.closeCenterServerSessionById.bind(this)); - app.delete('/api/center/:id/session/:sessionId/destroy', this.deleteCenterServerSessionById.bind(this)); - app.patch('/api/center/:id', this.patchCenterServer.bind(this)); - app.put('/api/center/:id/send', this.configNotify.bind(this)); - app.post('/api/center/:id/send/default', this.notifyConfig.bind(this)); - app.post('/api/center/:id/send', this.notify.bind(this)); - app.put('/api/center/:id/sendMany', this.configNotifyMany.bind(this)); - app.post('/api/center/:id/sendMany/default', this.notifyManyConfig.bind(this)); - app.post('/api/center/:id/sendMany', this.notifyMany.bind(this)); - app.delete('/api/center/:id/sendMany', this.cancelNotifyMany.bind(this)); - app.delete('/api/center/:id/connect', this.disconnectCenterSession.bind(this)); - app.delete('/api/center/:id', this.deleteCenterServer.bind(this)); - - this.server = app.listen(SERVER_PORT, function() { - this.logger.log1(`HTTPServer listening at http://localhost:${SERVER_PORT}`) - }.bind(this)); - } - - // TODO: These requests deserve error handling - - getClientSessions(req, res) { - this.logger.log1("Getting client sessions"); - res.send(clientSessionManager.serialize()); - } - - createClientSession(req, res) { - this.logger.log1("Creating client session"); - let session = clientSessionManager.createSession(req.body.url, req.body.username, req.body.password); - res.send(session.serialize()); - } - - getClientSessionById(req, res) { - let session = clientSessionManager.getSession(req.params.id); - this.logger.log1(`Getting client session by ID ${req.params.id}`); - if (session) { - this.logger.log1(`Client session found with ID ${req.params.id}`) - res.send(session.serialize()); - } else { - this.logger.log1(`No client session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - patchClientSession(req, res) { - let session = clientSessionManager.getSession(req.params.id); - if (session) { - this.logger.log1(`Client session found with ID ${req.params.id}`) - if (!!req.body.username && req.body.username !== session.username) { - session.setUsername(req.body.username); - } - if (!!req.body.password && req.body.password !== session.password) { - session.setPassword(req.body.password); - } - res.send(session.serialize()); - } else { - this.logger.log1(`No client session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - configSend(req, res) { - let session = clientSessionManager.getSession(req.params.id); - let source = req.body.source; - let destination = req.body.destination; - let message = req.body.message; - this.logger.log1(`Setting default message from ${source} to ${destination} with message ${message} on session with ID ${req.params.id}`) - if (session) { - session.configureDefault(source, destination, message); - res.send(session.serialize()); - } else { - this.logger.log1(`No session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - sendConfig(req, res) { - let session = clientSessionManager.getSession(req.params.id); - this.logger.log1(`Sending pre-configured message on session with ID ${req.params.id}`) - if (session) { - session.sendDefault() - .then(pdu => res.send(pdu)) - .catch(err => res.status(400).send(JSON.stringify(err))); - } else { - this.logger.log1(`No session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - send(req, res) { - let session = clientSessionManager.getSession(req.params.id); - let source = req.body.source; - let destination = req.body.destination; - let message = req.body.message; - this.logger.log1(`Sending message from ${source} to ${destination} with message ${message} on session with ID ${req.params.id}`) - if (session) { - session.send(source, destination, message) - .then(pdu => res.send(pdu)) - .catch(err => res.status(400).send(JSON.stringify(err))); - } else { - this.logger.log1(`No session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - configSendMany(req, res) { - let session = clientSessionManager.getSession(req.params.id); - let source = req.body.source; - let destination = req.body.destination; - let message = req.body.message; - let interval = req.body.interval / 1000; - let count = req.body.count; - if (!!req.body.perSecond) { - interval = 1 / req.body.perSecond; - } - let perSecond = 1 / interval; - this.logger.log1( - `Setting default ${count} messages from ${source} to ${destination} with message ${message} on session with ID ${req.params.id} at a rate of ${perSecond} per second.`); - if (session) { - session.configureDefaultInterval(source, destination, message, interval, count); - res.send(session.serialize()); - } else { - this.logger.log1(`No session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - sendManyConfig(req, res) { - let session = clientSessionManager.getSession(req.params.id); - this.logger.log1(`Sending pre-configured messages on session with ID ${req.params.id}`) - if (session) { - session.sendDefaultInterval() - .then(() => res.send({})) - .catch(err => res.status(400).send(JSON.stringify(err))); - } else { - this.logger.log1(`No session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - sendMany(req, res) { - let session = clientSessionManager.getSession(req.params.id); - let source = req.body.source; - let destination = req.body.destination; - let message = req.body.message; - let interval = req.body.interval / 1000; - let count = req.body.count; - if (!!req.body.perSecond) { - interval = 1 / req.body.perSecond; - } - let perSecond = 1 / interval; - this.logger.log1( - `Sending ${count} messages from ${source} to ${destination} with message ${message} on session with ID ${req.params.id} at a rate of ${perSecond} per second.`); - if (session) { - session.sendOnInterval(source, destination, message, interval, count) - .then(pdu => res.send(pdu)) - .catch(err => res.status(400).send((err))); - } else { - this.logger.log1(`No session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - cancelSendMany(req, res) { - let session = clientSessionManager.getSession(req.params.id); - if (session.status !== ClientSessionStatus.BUSY) { - res.status(400).send({ - err: true, - msg: `Session with ID ${req.params.id} is not sending messages` - }); - return; - } - this.logger.log1(`Cancelling send timer for session with ID ${req.params.id}`); - if (session) { - session.cancelSendInterval(); - res.send({}); - } else { - this.logger.log1(`No session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - bindClientSession(req, res) { - this.logger.log1(`Binding client session with ID ${req.params.id}`) - // Maybe make this async? - let session = clientSessionManager.getSession(req.params.id); - if (session) { - session.bind() - .then(() => res.send(session.serialize())) - .catch(err => res.status(400).send({ - err: true, - msg: err - })); - } else { - this.logger.log1(`No session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - connectClientSession(req, res) { - this.logger.log1(`Connecting client session with ID ${req.params.id}`) - let session = clientSessionManager.getSession(req.params.id); - if (session) { - session.connect() - .then(() => res.send(session.serialize())) - .catch(err => res.status(400).send({ - err: true, - msg: err - })); - } else { - this.logger.log1(`No session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - disconnectClientSession(req, res) { - this.logger.log1(`Disconnecting client session with ID ${req.params.id}`) - let session = clientSessionManager.getSession(req.params.id); - if (session) { - session.close() - .then(() => res.send(session.serialize())) - .catch(err => res.status(400).send({ - err: true, - msg: err - })); - } else { - this.logger.log1(`No session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - deleteClientSession(req, res) { - this.logger.log1(`Deleting client session with ID ${req.params.id}`); - let session = clientSessionManager.getSession(req.params.id); - if (session) { - clientSessionManager.deleteSession(session); - res.send({}); - } else { - this.logger.log1(`No session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - getCenterSessions(req, res) { - this.logger.log1("Getting center sessions"); - res.send(centerSessionManager.serialize()); - } - - createCenterSession(req, res) { - this.logger.log1("Creating center session"); - let session = centerSessionManager.createSession(req.body.port, req.body.username, req.body.password); - res.send(session.serialize()); - } - - getCenterServerById(req, res) { - let session = centerSessionManager.getSession(req.params.id); - this.logger.log1(`Getting center session by ID ${req.params.id}`); - if (session) { - this.logger.log1(`Center session found with ID ${req.params.id}`) - res.send(session.serialize()); - } else { - this.logger.log1(`No center session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - getCenterServerSessionsById(req, res) { - let server = centerSessionManager.getSession(req.params.id); - this.logger.log1(`Getting center session by ID ${req.params.id}`); - if (server) { - this.logger.log1(`Center session found with ID ${req.params.id}`) - res.send(server.getSessions()); - } else { - this.logger.log1(`No center session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - closeCenterServerSessionById(req, res) { - let server = centerSessionManager.getSession(req.params.id); - this.logger.log1(`Getting center session by ID ${req.params.id}`); - if (server) { - this.logger.log1(`Center session found with ID ${req.params.id}`) - server.closeSession(req.params.sessionId) - res.send({}); - } else { - this.logger.log1(`No center session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - deleteCenterServerSessionById(req, res) { - let server = centerSessionManager.getSession(req.params.id); - this.logger.log1(`Getting center session by ID ${req.params.id}`); - if (server) { - this.logger.log1(`Center session found with ID ${req.params.id}`) - server.deleteSession(req.params.sessionId) - res.send({}); - } else { - this.logger.log1(`No center session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - patchCenterServer(req, res) { - let server = centerSessionManager.getSession(req.params.id); - if (server) { - this.logger.log1(`Center server found with ID ${req.params.id}`) - if (!!req.body.username && req.body.username !== server.username) { - server.setUsername(req.body.username); - } - if (!!req.body.password && req.body.password !== server.password) { - server.setPassword(req.body.password); - } - if (!!req.body.mode) { - server.setMode(req.body.mode); - } - res.send(server.serialize()); - } else { - this.logger.log1(`No center server found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - getAvailableModes(req, res) { - this.logger.log1("Getting available modes"); - res.send(centerSessionManager.getAvailableCenterModes()); - } - - configNotify(req, res) { - let server = centerSessionManager.getSession(req.params.id); - let source = req.body.source; - let destination = req.body.destination; - let message = req.body.message; - this.logger.log1(`Setting default message from ${source} to ${destination} with message ${message} on server with ID ${req.params.id}`) - if (server) { - server.configureDefault(source, destination, message); - res.send(server.serialize()); - } else { - this.logger.log1(`No server found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - notifyConfig(req, res) { - let server = centerSessionManager.getSession(req.params.id); - this.logger.log1(`Sending pre-configured message on server with ID ${req.params.id}`) - if (server) { - server.notifyDefault() - .then(pdu => res.send(pdu)) - .catch(err => res.status(400).send(JSON.stringify(err))); - } else { - this.logger.log1(`No server found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - notify(req, res) { - let server = centerSessionManager.getSession(req.params.id); - let source = req.body.source; - let destination = req.body.destination; - let message = req.body.message; - this.logger.log1(`Sending notify message from ${source} to ${destination} with message ${message} on server with ID ${req.params.id}`) - if (server) { - server.notify(source, destination, message) - .then(pdu => res.send(pdu)) - .catch(err => res.status(400).send(err)); - } else { - this.logger.log1(`No session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - configNotifyMany(req, res) { - let server = centerSessionManager.getSession(req.params.id); - let source = req.body.source; - let destination = req.body.destination; - let message = req.body.message; - let interval = req.body.interval / 1000; - let count = req.body.count; - if (!!req.body.perSecond) { - interval = 1 / req.body.perSecond; - } - let perSecond = 1 / interval; - this.logger.log1( - `Setting default ${count} messages from ${source} to ${destination} with message ${message} on server with ID ${req.params.id} at a rate of ${perSecond} per second.`); - if (server) { - server.configureDefaultInterval(source, destination, message, interval, count); - res.send(server.serialize()); - } else { - this.logger.log1(`No server found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - notifyManyConfig(req, res) { - let server = centerSessionManager.getSession(req.params.id); - this.logger.log1(`Sending pre-configured messages on server with ID ${req.params.id}`) - if (server) { - server.notifyDefaultInterval() - .then(pdu => res.send(pdu)) - .catch(err => res.status(400).send(JSON.stringify(err))); - } else { - this.logger.log1(`No server found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - notifyMany(req, res) { - let server = centerSessionManager.getSession(req.params.id); - let source = req.body.source; - let destination = req.body.destination; - let message = req.body.message; - let interval = req.body.interval / 1000; - let count = req.body.count; - if (!!req.body.perSecond) { - interval = 1 / req.body.perSecond; - } - let perSecond = 1 / interval; - this.logger.log1( - `Sending ${count} notify messages from ${source} to ${destination} with message ${message} on session with ID ${req.params.id} at a rate of ${perSecond} per second.`); - if (server) { - server.notifyOnInterval(source, destination, message, interval, count) - .then(pdu => res.send(pdu)) - .catch(err => res.status(400).send(err)); - } else { - this.logger.log1(`No session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - cancelNotifyMany(req, res) { - let server = centerSessionManager.getSession(req.params.id); - if (server.status !== ClientSessionStatus.BUSY) { - res.status(400).send({ - err: true, - msg: `Session with ID ${req.params.id} is not sending messages` - }); - return; - } - this.logger.log1(`Cancelling send timer for server with ID ${req.params.id}`); - if (server) { - server.cancelNotifyInterval(); - res.send({}); - } else { - this.logger.log1(`No session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - disconnectCenterSession(req, res) { - this.logger.log1(`Disconnecting center session with ID ${req.params.id}`) - let server = centerSessionManager.getSession(req.params.id); - if (server) { - server.close() - .then(() => res.send(server.serialize())) - .catch(err => res.status(400).send({ - err: true, - msg: err - })); - } else { - this.logger.log1(`No session found with ID ${req.params.id}`); - res.status(404).send(); - } - } - - deleteCenterServer(req, res) { - this.logger.log1(`Deleting center session with ID ${req.params.id}`); - let server = centerSessionManager.getSession(req.params.id); - if (server) { - centerSessionManager.deleteSession(server); - res.send({}); - } else { - this.logger.log1(`No session found with ID ${req.params.id}`); - res.status(404).send(); - } - } -} - -class WSServer { - clients = {}; - unknownClients = []; - listenersAlreadySetup = []; - - constructor() { - this.server = new WebSocket.Server({port: WS_SERVER_PORT}); - this.logger = new Logger("WSServer"); - this.server.on('connection', this.onConnection.bind(this)); - this.logger.log1(`WSServer listening at ws://localhost:${WS_SERVER_PORT}`); - } - - onConnection(ws) { - this.logger.log1("New connection"); - this.unknownClients.push(ws); - ws.on('message', this.onMessage.bind(this, ws)); - ws.on('close', this.onClose.bind(this, ws)); - } - - addClient(ws, type, sessionId) { - if (!this.clients[type]) { - this.clients[type] = {}; - } - if (!this.clients[type][sessionId]) { - this.clients[type][sessionId] = []; - } - this.logger.log1(`Adding client ${ws.id} to ${type} session ${sessionId}`); - - if (type === "client") { - if (this.listenersAlreadySetup.indexOf(`client-${sessionId}`) === -1) { - let session = clientSessionManager.getSession(sessionId); - if (!!session) { - this.logger.log1(`Setting up listeners for client session ${sessionId}`); - session.on(ClientSession.STATUS_CHANGED_EVENT, this.onClientSessionStatusChange.bind(this, sessionId)); - session.on(ClientSession.ANY_PDU_EVENT, this.onClientSessionPdu.bind(this, sessionId)); - session.on(ClientSession.MESSAGE_SEND_COUNTER_UPDATE_EVENT, this.onClientMessageCounterUpdate.bind(this, sessionId)); - } - this.listenersAlreadySetup.push(`client-${sessionId}`); - } else { - this.logger.log1(`Listeners for client session ${sessionId} already set up`); - } - } else if (type === "center") { - if (this.listenersAlreadySetup.indexOf(`center-${sessionId}`) === -1) { - let session = centerSessionManager.getSession(sessionId); - if (!!session) { - this.logger.log1(`Setting up listeners for center session ${sessionId}`); - session.on(CenterSession.STATUS_CHANGED_EVENT, this.onCenterStatusChange.bind(this, sessionId)); - session.on(CenterSession.ANY_PDU_EVENT, this.onCenterServerPdu.bind(this, sessionId)); - session.on(CenterSession.MODE_CHANGED_EVENT, this.onCenterModeChanged.bind(this, sessionId)); - session.on(CenterSession.SESSION_CHANGED_EVENT, this.onCenterSessionsChanged.bind(this, sessionId)); - session.on(ClientSession.MESSAGE_SEND_COUNTER_UPDATE_EVENT, this.onCenterMessageCounterUpdate.bind(this, sessionId)); - } - this.listenersAlreadySetup.push(`center-${sessionId}`); - } else { - this.logger.log1(`Listeners for center session ${sessionId} already set up`); - } - } - - this.clients[type][sessionId].push(ws); - this.logger.log1(`Now active ${this.clients[type][sessionId].length} clients in session ID: ${sessionId} of type ${type}`); - } - - onMessage(ws, message) { - this.logger.log1("New message"); - message = String(message); - let data = message.split(":"); - let type = data[0]; - let sessionId = data[1]; - - this.logger.log1(`Moving client to session ID: ${sessionId} of type ${type}`); - delete this.unknownClients[ws]; - this.unknownClients = this.unknownClients.filter(Boolean); - - this.addClient(ws, type, sessionId); - this.logger.log1(`Now active ${this.clients[type][sessionId].length} clients in session ID: ${sessionId} of type ${type}`); - } - - onClose(ws) { - this.removeClient(ws); - // this.logger.log6(this.clients); - this.logger.log1("Connection closed"); - } - - removeClient(ws) { - this.clients.client = this.removeFromArray(this.clients.client, ws); - this.clients.center = this.removeFromArray(this.clients.center, ws); - } - - removeFromArray(array, element) { - for (let sessionId in array) { - let index = array[sessionId].indexOf(element); - if (index > -1) { - delete array[sessionId][index]; - } - array[sessionId] = array[sessionId].filter(Boolean); - if (array[sessionId].length === 0) { - delete array[sessionId]; - } - } - return array; - } - - onClientSessionStatusChange(sessionId, newStatus) { - this.logger.log1(`Session with ID ${sessionId} changed`); - let payload = { - objectType: "client", - type: 'status', - sessionId: sessionId, - value: newStatus - } - let clients = this.clients["client"][sessionId]; - if (!!clients) { - this.logger.log1(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); - clients.forEach(client => { - client.send(JSON.stringify(payload)); - }); - } - } - - onClientSessionPdu(sessionId, pdu) { - // TODO: Maybe move this to an "ignored" array against who the pdu.command is compared - if (pdu.command === 'enquire_link_resp' || pdu.command === 'enquire_link') { - return; - } - let clients = this.clients["client"][sessionId]; - if (!!clients) { - this.logger.log2(`Session with ID ${sessionId} fired PDU`); - let payload = { - objectType: "client", - type: 'pdu', - sessionId: sessionId, - value: pdu - } - this.logger.log2(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); - clients.forEach(client => { - client.send(JSON.stringify(payload)); - }); - } - } - - onClientMessageCounterUpdate(sessionId, counter) { - this.logger.log2(`Session with ID ${sessionId} updating message send counter`); - let payload = { - objectType: "client", - type: 'counterUpdate', - sessionId: sessionId, - value: counter - } - let clients = this.clients["client"][sessionId]; - if (!!clients) { - this.logger.log2(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); - clients.forEach(client => { - client.send(JSON.stringify(payload)); - }); - } - } - - onCenterStatusChange(sessionId, newStatus) { - this.logger.log1(`Session with ID ${sessionId} changed`); - let payload = { - objectType: "center", - type: 'status', - sessionId: sessionId, - value: newStatus - } - let clients = this.clients["center"][sessionId]; - if (!!clients) { - this.logger.log1(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); - clients.forEach(client => { - client.send(JSON.stringify(payload)); - }); - } - } - - onCenterServerPdu(sessionId, pdu) { - if (pdu.command === 'enquire_link_resp' || pdu.command === 'enquire_link') { - return; - } - let clients = this.clients["center"][sessionId]; - if (!!clients) { - this.logger.log2(`Session with ID ${sessionId} fired PDU`); - let payload = { - objectType: "center", - type: 'pdu', - sessionId: sessionId, - value: pdu - } - this.logger.log2(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); - clients.forEach(client => { - client.send(JSON.stringify(payload)); - }); - } - } - - onCenterModeChanged(sessionId, newMode) { - this.logger.log1(`Session with ID ${sessionId} changed`); - let payload = { - objectType: "center", - type: 'mode', - sessionId: sessionId, - value: newMode, - text: CenterMode[newMode] - } - let clients = this.clients["center"][sessionId]; - if (!!clients) { - this.logger.log1(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); - clients.forEach(client => { - client.send(JSON.stringify(payload)); - }); - } - } - - onCenterSessionsChanged(sessionId, newSession) { - this.logger.log1(`Session with ID ${sessionId} changed`); - let payload = { - objectType: "center", - type: 'sessions', - sessionId: sessionId, - value: newSession - } - let clients = this.clients["center"][sessionId]; - if (!!clients) { - this.logger.log1(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); - clients.forEach(client => { - client.send(JSON.stringify(payload)); - }); - } - } - - onCenterMessageCounterUpdate(sessionId, counter) { - this.logger.log2(`Session with ID ${sessionId} updating message send counter`); - let payload = { - objectType: "center", - type: 'counterUpdate', - sessionId: sessionId, - value: counter - } - let clients = this.clients["center"][sessionId]; - if (!!clients) { - this.logger.log2(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); - clients.forEach(client => { - client.send(JSON.stringify(payload)); - }); - } - } -} - -let clientSessionManager = new ClientSessionManager(); -let centerSessionManager = new CenterSessionManager(); -clientSessionManager.startup(); -centerSessionManager.startup(); - -// let session = clientSessionManager.createSession('smpp://localhost:7001', 'test', 'test'); -// let server = centerSessionManager.createSession(7001, 'test', 'test'); - -let session = clientSessionManager.getSession(0); -let server = centerSessionManager.getSession(1); - -session.connect() - .then(() => { - session.bind().then(() => { - // setTimeout(() => session.close(), 1000); - }).catch(err => console.log(err)); - }).catch(err => console.log(err)); - -// setTimeout(() => session.setUsername("test123"), 2000); -// setTimeout(() => session.setPassword("test123"), 4000); - -// session.on(CenterSession.ANY_PDU_EVENT, (pdu) => { -// console.log(pdu); -// }); - -// session.on(ClientSession.ANY_PDU_EVENT, (pdu) => { -// if (pdu.command.includes('enquire')) { -// return; -// } -// console.log(pdu); -// }); - -new WSServer(); -new HTTPServer(); - -function cleanup() { - clientSessionManager.cleanup(); - centerSessionManager.cleanup(); - process.exit(0); -} - -process.on('exit', cleanup); -process.on('SIGINT', cleanup); -process.on('SIGUSR1', cleanup); -process.on('SIGUSR2', cleanup); \ No newline at end of file diff --git a/src/Center/Center.ts b/src/Center/Center.ts new file mode 100644 index 0000000..bcb30e1 --- /dev/null +++ b/src/Center/Center.ts @@ -0,0 +1,192 @@ +import {Job} from "../Job/Job"; +import Logger from "../Logger"; +import {DebugPduProcessor} from "../PDUProcessor/DebugPduProcessor"; +import {PduProcessor} from "../PDUProcessor/PduProcessor"; +import {SmppSession} from "../SmppSession"; + +const NanoTimer = require('nanotimer'); +const smpp = require("smpp"); + +export class Center extends SmppSession { + readonly STATUS: string[] = [ + "WAITING CONNECTION", + "CONNECTING", + "CONNECTED", + ]; + + id: number; + username: string; + password: string; + status: string = this.STATUS[0]; + port: number; + + pduProcessors: PduProcessor[] = []; + defaultSingleJob!: Job; + defaultMultipleJob!: Job; + readonly logger: Logger; + private pendingSessions: any[] = []; + private sessions: any[] = []; + private nextSession: number = 0; + private server: any; + + constructor(id: number, port: number, username: string, password: string) { + super(); + this.id = id; + this.username = username; + this.password = password; + this.port = port; + + this.setDefaultSingleJob(Job.createEmptySingle()); + this.setDefaultMultipleJob(Job.createEmptyMultiple()); + + this.logger = new Logger(`Center-${id}`); + + this.initialize(); + } + + sendMultiple(job: Job): Promise { + return new Promise((resolve, reject) => { + this.validateSessions(reject); + if (!job.count || !job.perSecond) { + reject(`Center-${this.getId()} sendMultiple failed: invalid job, missing fields`); + } + this.logger.log1(`Center-${this.getId()} sending multiple messages: ${JSON.stringify(job)}`); + + let counter = 0; + let previousUpdateCounter = 0; + + this.counterUpdateTimer.setInterval(() => { + if (previousUpdateCounter !== counter) { + this.eventEmitter.emit(this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT, counter); + previousUpdateCounter = counter; + } + }, '', `${this.MESSAGE_SEND_UPDATE_DELAY / 1000} s`); + + let count = job.count || 1; + let interval = 1 / (job.perSecond || 1); + this.sendTimer.setInterval(() => { + if (count > 0 && counter >= count) { + this.cancelSendInterval(); + } else { + this.sendPdu(job.pdu, true); + counter++; + } + }, '', `${interval} s`); + resolve(); + }); + } + + sendPdu(pdu: object, force?: boolean): Promise { + return new Promise((resolve, reject) => { + if (!force) { + this.validateSessions(reject); + } + this.logger.log5(`Center-${this.getId()} sending PDU: ${JSON.stringify(pdu)}`); + this.getNextSession().send(pdu, (replyPdu: any) => { + resolve(replyPdu); + }); + }); + } + + initialize(): void { + this.server = smpp.createServer({}, this.eventSessionConnected.bind(this)); + this.server.listen(this.port); + PduProcessor.attachProcessor(this, PduProcessor.getProcessor(DebugPduProcessor.name)); + this.setStatus(0); + } + + close(): Promise { + return new Promise((resolve, reject) => { + this.logger.log1(`Center-${this.getId()} closing active connections`); + this.server.close(); + this.setStatus(0); + resolve(); + }); + } + + serialize(): object { + return { + id: this.id, + port: this.port, + username: this.username, + password: this.password, + status: this.status, + defaultSingleJob: this.defaultSingleJob, + defaultMultipleJob: this.defaultMultipleJob, + processors: this.pduProcessors.map(p => p.serialize()), + }; + } + + getPort(): number { + return this.port; + } + + private validateSessions(reject: (reason?: any) => void) { + if (this.sessions.length === 0) { + reject(`No clients connected`); + } + } + + private getNextSession(): any { + if (this.sessions.length === 0) { + return null; + } + let session = this.sessions[this.nextSession]; + this.nextSession = (this.nextSession + 1) % this.sessions.length; + return session; + } + + private eventBindTransceiver(session: any, pdu: any) { + this.logger.log1(`Center-${this.getId()} got a bind_transciever with system_id ${pdu.system_id} and password ${pdu.password}`); + session.pause(); + if (pdu.system_id === this.username && pdu.password === this.password) { + this.logger.log1(`Center-${this.getId()} client connection successful`); + session.send(pdu.response()); + session.resume(); + this.pendingSessions = this.pendingSessions.filter((s) => s !== session); + this.sessions.push(session); + this.updateStatus(); + } else { + this.logger.log1(`Center-${this.getId()} client connection failed, invalid credentials (expected: ${this.username}, ${this.password})`); + session.send(pdu.response({ + command_status: smpp.ESME_RBINDFAIL + })); + this.pendingSessions = this.pendingSessions.filter((s) => s !== session); + this.updateStatus(); + session.close(); + } + } + + private eventSessionConnected(session: any): void { + this.logger.log1(`A client connected to center-${this.getId()}`); + this.pendingSessions.push(session); + session.on('close', this.eventSessionClose.bind(this, session)); + session.on('error', this.eventSessionError.bind(this, session)); + session.on('bind_transceiver', this.eventBindTransceiver.bind(this, session)); + session.on('pdu', this.eventAnyPdu.bind(this, session)); + this.updateStatus(); + this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); + } + + private eventSessionError(session: any): void { + this.logger.log1(`A client encountered an error on center-${this.getId()}`); + } + + private eventSessionClose(session: any): void { + this.logger.log1(`A client disconnected from center-${this.getId()}`); + this.sessions = this.sessions.filter((s: any) => s !== session); + this.nextSession = 0; + this.pendingSessions = this.pendingSessions.filter((s: any) => s !== session); + this.updateStatus(); + } + + private updateStatus(): void { + if (this.sessions.length > 0) { + this.setStatus(2); + } else if (this.pendingSessions.length > 0) { + this.setStatus(1); + } else { + this.setStatus(0); + } + } +} \ No newline at end of file diff --git a/src/Center/CenterSessionManager.ts b/src/Center/CenterSessionManager.ts new file mode 100644 index 0000000..6142fd8 --- /dev/null +++ b/src/Center/CenterSessionManager.ts @@ -0,0 +1,23 @@ +import Logger from "../Logger"; +import {SessionManager} from "../SessionManager"; +import {SmppSession} from "../SmppSession"; +import {Center} from "./Center"; + +const CENTER_SESSIONS_FILE: string = process.env.CENTER_SESSIONS_FILE || "center_sessions.json"; + +export class CenterSessionManager extends SessionManager { + StorageFile: string = CENTER_SESSIONS_FILE + ManagedSessionClass: any = Center; + sessionId: number = 0; + sessions: Center[] = []; + identifier: string = "center"; + readonly logger: Logger = new Logger("CenterSessionManager"); + + constructor() { + super(); + this.setup(); + // super.eventEmitter.on(super.SESSION_ADDED_EVENT, (session: SmppSession) => this.eventEmitter.emit(this.SESSION_ADDED_EVENT, session)); + } + + comparatorFn: (arg: any, session: SmppSession) => boolean = (arg: any, session: SmppSession) => (session as Center).getPort() === arg; +} \ No newline at end of file diff --git a/src/Client/Client.ts b/src/Client/Client.ts new file mode 100644 index 0000000..13d14a4 --- /dev/null +++ b/src/Client/Client.ts @@ -0,0 +1,256 @@ +import {Job} from "../Job/Job"; +import Logger from "../Logger"; +import {PduProcessor} from "../PDUProcessor/PduProcessor"; +import PersistentPromise from "../PersistentPromise"; +import {SmppSession} from "../SmppSession"; + +const NanoTimer = require('nanotimer'); +const smpp = require("smpp"); + +const AUTO_ENQUIRE_LINK_PERIOD: number = Number(process.env.AUTO_ENQUIRE_LINK_PERIOD) || 30000; + +export class Client extends SmppSession { + readonly STATUS: string[] = [ + "NOT CONNECTED", + "CONNECTING", + "CONNECTED", + "BINDING", + "BOUND", + "BUSY", + ] + + id: number; + username: string; + password: string; + status: string = this.STATUS[0]; + url: string; + + pduProcessors: PduProcessor[] = []; + defaultSingleJob!: Job; + defaultMultipleJob!: Job; + readonly logger: Logger; + private session?: any; + private connectPromise: PersistentPromise | null = null; + private bindPromise: PersistentPromise | null = null; + private closePromise: PersistentPromise | null = null; + // TODO: Implement close promise + // Apparently the sessions are not closed on a dime but instead a .close() call causes eventSessionClose + + constructor(id: number, url: string, username: string, password: string) { + super(); + this.id = id; + this.username = username; + this.password = password; + this.url = url; + + this.setDefaultSingleJob(Job.createEmptySingle()); + this.setDefaultMultipleJob(Job.createEmptyMultiple()); + + this.logger = new Logger(`Client-${id}`); + } + + doConnect(): PersistentPromise { + this.connectPromise = new PersistentPromise((resolve, reject) => { + if (this.status !== this.STATUS[0]) { + let errorString = `Client-${this.getId()} already connected`; + this.logger.log1(errorString); + reject(errorString); + return; + } + + this.logger.log1(`Client-${this.getId()} connecting to ${this.url}`); + this.setStatus(1); + this.connectSession().then(resolve, ((err: any) => { + this.logger.log1(`Client-${this.getId()} connection failed: ${err}`); + this.setStatus(0); + this.session.close(); + reject(err); + })); + }); + return this.connectPromise; + } + + doBind(): PersistentPromise { + this.bindPromise = new PersistentPromise((resolve, reject) => { + this.validateFields(reject); + + this.session.bind_transceiver({ + system_id: this.username, + password: this.password, + }, this.eventBindReply.bind(this)); + this.setStatus(3); + }); + return this.bindPromise; + } + + connectAndBind(): Promise { + return this.doConnect().then(this.doBind.bind(this), (error) => { + this.logger.log1(`Client-${this.getId()} connectAndBind failed: ${error}`); + }); + } + + serialize(): object { + return { + id: this.getId(), + url: this.url, + username: this.username, + password: this.password, + status: this.status, + defaultSingleJob: this.defaultSingleJob, + defaultMultipleJob: this.defaultMultipleJob, + processors: this.pduProcessors.map(p => p.serialize()), + }; + } + + close(): Promise { + this.logger.log1(`Client-${this.getId()} closing connection`); + return Promise.resolve(this.session.close()); + } + + sendPdu(pdu: object, force?: boolean): Promise { + return new Promise((resolve, reject) => { + if (!force) { + this.validateSession(reject); + this.validateBound(reject); + } + this.logger.log5(`Client-${this.getId()} sending PDU: ${JSON.stringify(pdu)}`); + this.session.send(pdu, (replyPdu: object) => resolve(replyPdu)); + }); + } + + sendMultiple(job: Job): Promise { + return new Promise((resolve, reject) => { + this.validateSession(reject); + this.validateBound(reject); + if (!job.count || !job.perSecond) { + reject(`Client-${this.getId()} sendMultiple failed: invalid job, missing fields`); + } + this.logger.log1(`Client-${this.getId()} sending multiple messages: ${JSON.stringify(job)}`); + + this.setStatus(4); + + let counter = 0; + let previousUpdateCounter = 0; + + this.counterUpdateTimer.setInterval(() => { + if (previousUpdateCounter !== counter) { + this.eventEmitter.emit(this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT, counter); + previousUpdateCounter = counter; + } + }, '', `${this.MESSAGE_SEND_UPDATE_DELAY / 1000} s`); + + let count = job.count || 1; + let interval = 1 / (job.perSecond || 1); + this.sendTimer.setInterval(() => { + if (count > 0 && counter >= count) { + this.cancelSendInterval(); + } else { + this.sendPdu(job.pdu, true) + .catch(e => this.logger.log1(`Error sending message: ${e}`)); + counter++; + } + }, '', `${interval} s`); + resolve(); + }); + } + + getUrl(): string { + return this.url; + } + + private connectSession(): Promise { + return new Promise((resolve, reject) => { + this.validateFields(reject); + this.logger.log1(`Client-${this.getId()} connecting to ${this.url}`); + + this.session = smpp.connect({ + url: this.url, auto_enquire_link_period: AUTO_ENQUIRE_LINK_PERIOD, + }, this.eventSessionConnected.bind(this)); + this.session.on('error', this.eventSessionError.bind(this)); + this.session.on('close', this.eventSessionClose.bind(this)); + this.session.on('pdu', this.eventAnyPdu.bind(this)); + }); + } + + private eventSessionConnected(): void { + this.logger.log1(`Client-${this.getId()} connected to ${this.url}`); + this.setStatus(2); + if (this.connectPromise) { + this.connectPromise.resolve(); + } + } + + private eventSessionError(pdu: any): void { + this.logger.log1(`Client-${this.getId()} error on ${this.url}`); + this.setStatus(0); + this.rejectPromises(); + } + + private eventSessionClose(): void { + this.logger.log1(`Client-${this.getId()} closed on ${this.url}`); + this.setStatus(0); + this.rejectPromises(); + } + + private eventBindReply(pdu: any): void { + if (pdu.command_status === 0) { + this.logger.log1(`Client-${this.getId()} bound to ${this.url}`); + this.setStatus(4); + if (this.bindPromise) { + this.bindPromise.resolve(); + } + } else { + this.logger.log1(`Client-${this.getId()} bind failed to ${this.url}`); + this.setStatus(2); + if (this.bindPromise) { + this.bindPromise.reject(pdu); + } + } + } + + private rejectPromises(err?: any): void { + if (this.connectPromise) { + this.connectPromise.reject(err); + } + if (this.bindPromise) { + this.bindPromise.reject(err); + } + if (this.closePromise) { + this.closePromise.resolve(); + } + } + + private validateFields(reject: (reason?: any) => void) { + if (!this.url) { + let error = `Client-${this.getId()} has no url set`; + this.logger.log1(error); + reject(error); + } + if (!this.username) { + let error = `Client-${this.getId()} has no username set`; + this.logger.log1(error); + reject(error); + } + if (!this.password) { + let error = `Client-${this.getId()} has no password set`; + this.logger.log1(error); + reject(error); + } + } + + private validateSession(reject: (reason?: any) => void) { + if (!this.session) { + let errorMessage = `Client-${this.getId()} session is not defined`; + this.logger.log1(errorMessage); + reject(errorMessage); + } + } + + private validateBound(reject: (reason?: any) => void) { + if (this.status !== this.STATUS[4]) { + let errorMessage = `Client-${this.getId()} is not bound`; + this.logger.log1(errorMessage); + reject(errorMessage); + } + } +} \ No newline at end of file diff --git a/src/Client/ClientSessionManager.ts b/src/Client/ClientSessionManager.ts new file mode 100644 index 0000000..652608b --- /dev/null +++ b/src/Client/ClientSessionManager.ts @@ -0,0 +1,24 @@ +import Logger from "../Logger"; +import {SessionManager} from "../SessionManager"; +import {SmppSession} from "../SmppSession"; +import {Client} from "./Client"; + +const CLIENT_SESSIONS_FILE: string = process.env.CLIENT_SESSIONS_FILE || "client_sessions.json"; + +export default class ClientSessionManager extends SessionManager { + StorageFile: string = CLIENT_SESSIONS_FILE; + ManagedSessionClass: any = Client; + sessionId: number = 0; + sessions: Client[] = []; + // Identifier is used in websockets to identify the type of session this manager manages + identifier: string = "client"; + readonly logger: Logger = new Logger("ClientSessionManager"); + + constructor() { + super(); + this.setup(); + // super.eventEmitter.on(super.SESSION_ADDED_EVENT, (session: SmppSession) => this.eventEmitter.emit(this.SESSION_ADDED_EVENT, session)); + } + + comparatorFn: (arg: any, session: SmppSession) => boolean = (arg: any, session: SmppSession) => (session as Client).getUrl() === arg; +} \ No newline at end of file diff --git a/src/HttpServer/CenterRequestHandler.ts b/src/HttpServer/CenterRequestHandler.ts new file mode 100644 index 0000000..2f0b8ac --- /dev/null +++ b/src/HttpServer/CenterRequestHandler.ts @@ -0,0 +1,64 @@ +import {Center} from "../Center/Center"; +import {CenterSessionManager} from "../Center/CenterSessionManager"; +import Logger from "../Logger"; +import {PduProcessor} from "../PDUProcessor/PduProcessor"; +import {SessionManager} from "../SessionManager"; +import {SmppSession} from "../SmppSession"; +import {RequestHandler} from "./RequestHandler"; + +export class CenterRequestHandler extends RequestHandler { + sessionManager: CenterSessionManager; + logger: Logger = new Logger(this.constructor.name); + + constructor(sessionManager: SessionManager) { + super(); + this.sessionManager = sessionManager as CenterSessionManager; + } + + doGetAvailableProcessors(req: any, res: any): void { + this.logger.log1("Getting available processors"); + let processors: PduProcessor[] = PduProcessor.getProcessorsForType(Center.name); + res.send(processors.map((processor: any) => processor.serialize())); + } + + doGetAppliedProcessors(req: any, res: any): void { + this.sessionManager.getSession(req.params.id).then((session: SmppSession) => { + let processors: PduProcessor[] = session.pduProcessors; + res.send(processors.map((processor: any) => processor.serialize())); + }, this.handleSessionNotFound.bind(this, req, res)); + } + + doAddProcessor(req: any, res: any): void { + this.sessionManager.getSession(req.params.id).then((session: SmppSession) => { + let processor = PduProcessor.getProcessor(req.body.name); + PduProcessor.attachProcessor(session, processor); + res.send(session.serialize()); + }, this.handleSessionNotFound.bind(this, req, res)); + } + + doRemoveProcessor(req: any, res: any): void { + this.sessionManager.getSession(req.params.id).then((session: SmppSession) => { + let processor = PduProcessor.getProcessor(req.body.name); + PduProcessor.detachProcessor(session, processor); + res.send(session.serialize()); + }, this.handleSessionNotFound.bind(this, req, res)); + } + + doPost(req: any, res: any): void { + this.logger.log1("Creating client session"); + this.sessionManager.createSession(req.body.port, req.body.username, req.body.password).then((session: SmppSession) => { + res.send(session.serialize()); + }, (err: any) => { + this.logger.log1(`Failed to create client session: ${err}`); + res.status(500).send(); + }); + } + + doConnect(req: any, res: any): void { + throw new Error("Method not implemented."); + } + + doBind(req: any, res: any): void { + throw new Error("Method not implemented."); + } +} \ No newline at end of file diff --git a/src/HttpServer/ClientRequestHandler.ts b/src/HttpServer/ClientRequestHandler.ts new file mode 100644 index 0000000..8311529 --- /dev/null +++ b/src/HttpServer/ClientRequestHandler.ts @@ -0,0 +1,68 @@ +import {Client} from "../Client/Client"; +import ClientSessionManager from "../Client/ClientSessionManager"; +import Logger from "../Logger"; +import {SessionManager} from "../SessionManager"; +import {SmppSession} from "../SmppSession"; +import {RequestHandler} from "./RequestHandler"; + +export default class ClientRequestHandler extends RequestHandler { + sessionManager: ClientSessionManager; + logger: Logger = new Logger(this.constructor.name); + + constructor(sessionManager: SessionManager) { + super(); + this.sessionManager = sessionManager as ClientSessionManager; + } + + doGetAvailableProcessors(req: any, res: any): void { + throw new Error("Method not implemented."); + } + + doGetAppliedProcessors(req: any, res: any): void { + throw new Error("Method not implemented."); + } + + doAddProcessor(req: any, res: any): void { + throw new Error("Method not implemented."); + } + + doRemoveProcessor(req: any, res: any): void { + throw new Error("Method not implemented."); + } + + doPost(req: any, res: any): void { + this.logger.log1("Creating client session"); + this.sessionManager.createSession(req.body.url, req.body.username, req.body.password).then((session: SmppSession) => { + res.send(session.serialize()); + }, (err: any) => { + this.logger.log1(`Failed to create client session: ${err}`); + res.status(500).send(); + }); + } + + doBind(req: any, res: any): void { + this.sessionManager.getSession(req.params.id).then((session: SmppSession) => { + this.logger.log1(`Binding client session with ID ${req.params.id}`) + let client = session as Client; + client.doBind() + .then(() => res.send(session.serialize())) + .catch(err => res.status(400).send({ + err: true, + msg: err + })); + }, this.handleSessionNotFound.bind(this, req, res)); + } + + doConnect(req: any, res: any): void { + this.sessionManager.getSession(req.params.id).then((session: SmppSession) => { + this.logger.log1(`Connecting client session with ID ${req.params.id}`) + let client = session as Client; + client.doConnect() + .then(() => res.send(session.serialize())) + .catch(err => res.status(400).send({ + err: true, + msg: err + })); + }, this.handleSessionNotFound.bind(this, req, res)); + } +} \ No newline at end of file diff --git a/src/HttpServer/HttpServer.ts b/src/HttpServer/HttpServer.ts new file mode 100644 index 0000000..c77a1b0 --- /dev/null +++ b/src/HttpServer/HttpServer.ts @@ -0,0 +1,70 @@ +import Logger from "../Logger"; +import {SessionManager} from "../SessionManager"; +import {CenterRequestHandler} from "./CenterRequestHandler"; +import ClientRequestHandler from "./ClientRequestHandler"; +import {RequestHandler} from "./RequestHandler"; + +const express = require("express"); +const bodyParser = require("body-parser"); + +const SERVER_PORT: number = Number(process.env.SERVER_PORT) || 8190; + +export class HttpServer { + private clientRequestHandler: RequestHandler; + private centerRequestHandler: RequestHandler; + + private app: any; + private server: any; + private readonly logger: Logger = new Logger(this.constructor.name); + + constructor(clientManager: SessionManager, centerManager: SessionManager) { + this.clientRequestHandler = new ClientRequestHandler(clientManager); + this.centerRequestHandler = new CenterRequestHandler(centerManager); + + this.app = express(); + this.app.use(bodyParser.json()); + + this.app.get('/api/client', this.clientRequestHandler.doGet.bind(this.clientRequestHandler)); + this.app.post('/api/client', this.clientRequestHandler.doPost.bind(this.clientRequestHandler)); + this.app.get('/api/client/:id', this.clientRequestHandler.doGetById.bind(this.clientRequestHandler)); + this.app.patch('/api/client/:id', this.clientRequestHandler.doPatch.bind(this.clientRequestHandler)); + this.app.put('/api/client/:id/send', this.clientRequestHandler.doConfigureSingleJob.bind(this.clientRequestHandler)); + this.app.post('/api/client/:id/send/default', this.clientRequestHandler.doSendSingleJob.bind(this.clientRequestHandler)); + this.app.post('/api/client/:id/send', this.clientRequestHandler.doSend.bind(this.clientRequestHandler)); + this.app.put('/api/client/:id/sendMany', this.clientRequestHandler.doConfigureManyJob.bind(this.clientRequestHandler)); + this.app.post('/api/client/:id/sendMany/default', this.clientRequestHandler.doSendManyJob.bind(this.clientRequestHandler)); + this.app.post('/api/client/:id/sendMany', this.clientRequestHandler.doSendMany.bind(this.clientRequestHandler)); + this.app.delete('/api/client/:id/sendMany', this.clientRequestHandler.doCancelSendMany.bind(this.clientRequestHandler)); + this.app.post('/api/client/:id/bind', this.clientRequestHandler.doBind.bind(this.clientRequestHandler)); + this.app.post('/api/client/:id/connect', this.clientRequestHandler.doConnect.bind(this.clientRequestHandler)); + this.app.delete('/api/client/:id/connect', this.clientRequestHandler.doDisconnect.bind(this.clientRequestHandler)); + this.app.delete('/api/client/:id', this.clientRequestHandler.doDelete.bind(this.clientRequestHandler)); + + this.app.get('/api/center', this.centerRequestHandler.doGet.bind(this.centerRequestHandler)); + this.app.post('/api/center', this.centerRequestHandler.doPost.bind(this.centerRequestHandler)); + this.app.get('/api/center/:id', this.centerRequestHandler.doGetById.bind(this.centerRequestHandler)); + this.app.patch('/api/center/:id', this.centerRequestHandler.doPatch.bind(this.centerRequestHandler)); + this.app.put('/api/center/:id/send', this.centerRequestHandler.doConfigureSingleJob.bind(this.centerRequestHandler)); + this.app.post('/api/center/:id/send/default', this.centerRequestHandler.doSendSingleJob.bind(this.centerRequestHandler)); + this.app.post('/api/center/:id/send', this.centerRequestHandler.doSend.bind(this.centerRequestHandler)); + this.app.put('/api/center/:id/sendMany', this.centerRequestHandler.doConfigureManyJob.bind(this.centerRequestHandler)); + this.app.post('/api/center/:id/sendMany/default', this.centerRequestHandler.doSendManyJob.bind(this.centerRequestHandler)); + this.app.post('/api/center/:id/sendMany', this.centerRequestHandler.doSendMany.bind(this.centerRequestHandler)); + this.app.delete('/api/center/:id/sendMany', this.centerRequestHandler.doCancelSendMany.bind(this.centerRequestHandler)); + this.app.delete('/api/center/:id/connect', this.centerRequestHandler.doDisconnect.bind(this.centerRequestHandler)); + this.app.delete('/api/center/:id', this.centerRequestHandler.doDelete.bind(this.centerRequestHandler)); + this.app.get('/api/center/processors', this.centerRequestHandler.doGetAppliedProcessors.bind(this.centerRequestHandler)); + this.app.get('/api/center/processors/all', this.centerRequestHandler.doGetAvailableProcessors.bind(this.centerRequestHandler)); + this.app.post('/api/center/processors', this.centerRequestHandler.doAddProcessor.bind(this.centerRequestHandler)); + this.app.delete('/api/center/processors', this.centerRequestHandler.doRemoveProcessor.bind(this.centerRequestHandler)); + + this.app.get('/api/ping', function (req: any, res: any) { + res.send('pong'); + }); + + this.server = this.app.listen(SERVER_PORT, function () { + // @ts-ignore + this.logger.log1(`HTTPServer listening at http://localhost:${SERVER_PORT}`) + }.bind(this)); + } +} \ No newline at end of file diff --git a/src/HttpServer/RequestHandler.ts b/src/HttpServer/RequestHandler.ts new file mode 100644 index 0000000..26ee126 --- /dev/null +++ b/src/HttpServer/RequestHandler.ts @@ -0,0 +1,177 @@ +import {Job} from "../Job/Job"; +import Logger from "../Logger"; +import {SessionManager} from "../SessionManager"; +import {SmppSession} from "../SmppSession"; + +export abstract class RequestHandler { + abstract sessionManager: SessionManager; + logger: Logger = new Logger(this.constructor.name); + + doGet(req: any, res: any): void { + this.logger.log1("Getting client sessions"); + res.send(this.sessionManager.serialize()); + } + + doGetById(req: any, res: any): void { + this.sessionManager.getSession(req.params.id).then((session: SmppSession) => { + this.logger.log1(`Client session found with ID ${req.params.id}`) + res.send(session.serialize()); + }, this.handleSessionNotFound.bind(this, req, res)); + } + + doPatch(req: any, res: any): void { + this.sessionManager.getSession(req.params.id).then((session: SmppSession) => { + this.logger.log1(`Session found with ID ${req.params.id}`) + if (!!req.body.username && req.body.username !== session.username) { + session.setUsername(req.body.username); + } + if (!!req.body.password && req.body.password !== session.password) { + session.setPassword(req.body.password); + } + res.send(session.serialize()); + }, this.handleSessionNotFound.bind(this, req, res)); + } + + doConfigureSingleJob(req: any, res: any): void { + this.sessionManager.getSession(req.params.id).then((session: SmppSession) => { + let job: Job = session.getDefaultSingleJob(); + if (job.pdu.source_addr && job.pdu.source_addr !== req.body.source) { + job.pdu.source_addr = req.body.source; + } + if (job.pdu.destination_addr && job.pdu.destination_addr !== req.body.destination) { + job.pdu.destination_addr = req.body.destination; + } + if (job.pdu.short_message && job.pdu.short_message !== req.body.message) { + job.pdu.short_message = req.body.message; + } + this.logger.log1(`Updating default job on session with ID ${req.params.id}`) + }, this.handleSessionNotFound.bind(this, req, res)); + } + + doSendSingleJob(req: any, res: any): void { + this.sessionManager.getSession(req.params.id).then((session: SmppSession) => { + this.logger.log1(`Sending pre-configured message on session with ID ${req.params.id}`) + session.sendSingleDefault() + .then(pdu => res.send(pdu), + err => res.status(400).send({ + err: true, + message: err + })); + }, this.handleSessionNotFound.bind(this, req, res)); + } + + doSend(req: any, res: any): void { + this.sessionManager.getSession(req.params.id).then((session: SmppSession) => { + this.logger.log1(`Sending message on session with ID ${req.params.id}`) + let tempJob: Job = JSON.parse(JSON.stringify(session.defaultSingleJob)); + tempJob.pdu.source_addr = req.body.source; + tempJob.pdu.destination_addr = req.body.destination; + tempJob.pdu.short_message = req.body.message; + session.sendSingle(tempJob) + .then(pdu => res.send(pdu), + err => res.status(400).send({ + err: true, + message: err + })); + }, this.handleSessionNotFound.bind(this, req, res)); + } + + doConfigureManyJob(req: any, res: any): void { + this.sessionManager.getSession(req.params.id).then((session: SmppSession) => { + let perSecond: number = 1 / (req.body.interval / 1000) + let job: Job = session.getDefaultMultipleJob(); + if (!job.pdu.source_addr && job.pdu.source_addr !== req.body.source) { + job.pdu.source_addr = req.body.source; + } + if (!job.pdu.destination_addr && job.pdu.destination_addr !== req.body.destination) { + job.pdu.destination_addr = req.body.destination; + } + if (!job.pdu.short_message && job.pdu.short_message !== req.body.message) { + job.pdu.short_message = req.body.message; + } + if (!job.perSecond && job.perSecond !== perSecond) { + job.perSecond = perSecond; + } + if (!job.count && job.count !== req.body.count) { + job.count = req.body.count; + } + this.logger.log1(`Updating default job on session with ID ${req.params.id}`) + res.send(session.serialize()); + }, this.handleSessionNotFound.bind(this, req, res)); + } + + doSendManyJob(req: any, res: any): void { + this.sessionManager.getSession(req.params.id).then((session: SmppSession) => { + this.logger.log1(`Sending pre-configured messages on session with ID ${req.params.id}`) + session.sendMultipleDefault() + .then(() => res.send({}), + err => res.status(400).send({ + err: true, + message: err + })); + }, this.handleSessionNotFound.bind(this, req, res)); + } + + doSendMany(req: any, res: any) { + this.sessionManager.getSession(req.params.id).then((session: SmppSession) => { + this.logger.log1(`Sending message on session with ID ${req.params.id}`) + let tempJob: Job = JSON.parse(JSON.stringify(session.defaultMultipleJob)); + tempJob.pdu.source_addr = req.body.source; + tempJob.pdu.destination_addr = req.body.destination; + tempJob.pdu.short_message = req.body.message; + tempJob.perSecond = 1 / (req.body.interval / 1000); + tempJob.count = req.body.count; + session.sendMultiple(tempJob) + .then(pdu => res.send(pdu), + err => res.status(400).send({ + err: true, + message: err + })); + }, this.handleSessionNotFound.bind(this, req, res)); + } + + doCancelSendMany(req: any, res: any) { + this.sessionManager.getSession(req.params.id).then((session: SmppSession) => { + this.logger.log1(`Cancelling send timer for session with ID ${req.params.id}`); + session.cancelSendInterval(); + }, this.handleSessionNotFound.bind(this, req, res)); + } + + doDisconnect(req: any, res: any) { + this.sessionManager.getSession(req.params.id).then((session: SmppSession) => { + this.logger.log1(`Disconnecting session with ID ${req.params.id}`) + session.close().then(() => res.send(session.serialize()), err => res.status(400).send({ + err: true, + message: err + })); + }, this.handleSessionNotFound.bind(this, req, res)); + } + + doDelete(req: any, res: any) { + this.sessionManager.getSession(req.params.id).then((session: SmppSession) => { + this.sessionManager.removeSession(session); + }, this.handleSessionNotFound.bind(this, req, res)); + } + + abstract doPost(req: any, res: any): void; + + abstract doConnect(req: any, res: any): void; + + abstract doBind(req: any, res: any): void; + + abstract doGetAvailableProcessors(req: any, res: any): void; + + abstract doGetAppliedProcessors(req: any, res: any): void; + + abstract doAddProcessor(req: any, res: any): void; + + abstract doRemoveProcessor(req: any, res: any): void; + handleSessionNotFound(req: any, res: any): void { + let error = `No session found with ID ${req.params.id}`; + this.logger.log1(error); + res.status(404).send({ + err: true, + message: error + }); + } +} \ No newline at end of file diff --git a/src/Job/Job.ts b/src/Job/Job.ts new file mode 100644 index 0000000..6e384c9 --- /dev/null +++ b/src/Job/Job.ts @@ -0,0 +1,71 @@ +import EventEmitter from "events"; + +const smpp = require("smpp"); + +export class Job { + static readonly STATE_CHANGED: string = "STATE_CHANGED"; + private eventEmitter: EventEmitter = new EventEmitter(); + + constructor(pdu: any, perSecond?: number, count?: number) { + this._pdu = pdu; + this._perSecond = perSecond; + this._count = count; + } + + private _pdu: any; + + get pdu(): any { + return this._pdu; + } + + set pdu(value: any) { + this._pdu = value; + this.eventEmitter.emit(Job.STATE_CHANGED, {}); + } + + private _perSecond?: number; + + get perSecond(): number { + return this._perSecond; + } + + set perSecond(value: number) { + this._perSecond = value; + this.eventEmitter.emit(Job.STATE_CHANGED, {}); + } + + private _count?: number; + + get count(): number { + return this._count; + } + + set count(value: number) { + this._count = value; + this.eventEmitter.emit(Job.STATE_CHANGED, {}); + } + + static deserialize(serialized: any): Job { + if (!serialized._pdu) { + return Job.createEmptyMultiple(); + } + let pdu: any = new smpp.PDU(serialized._pdu.command, serialized._pdu); + return new Job(pdu, serialized._perSecond, serialized._count); + } + + static createEmptySingle(): Job { + return new Job({}); + } + + static createEmptyMultiple(): Job { + return new Job({}, 1, 1); + } + + serialize(): string { + return JSON.stringify(this); + } + + on(event: string, callback: (...args: any[]) => void): void { + this.eventEmitter.on(event, callback); + } +} \ No newline at end of file diff --git a/src/Logger.ts b/src/Logger.ts new file mode 100644 index 0000000..52fc290 --- /dev/null +++ b/src/Logger.ts @@ -0,0 +1,71 @@ +// @ts-ignore +import {WriteStream} from "fs"; + +const fs = require('fs'); + +// @ts-ignore +const LOG_LEVEL: number = process.env.LOG_LEVEL || 6; +// @ts-ignore +const LOG_FILE: string = process.env.LOG_FILE || ""; + +export default class Logger { + log1 = this.log.bind(this, 1); + log2 = this.log.bind(this, 2); + log3 = this.log.bind(this, 3); + log4 = this.log.bind(this, 4); + log5 = this.log.bind(this, 5); + log6 = this.log.bind(this, 6); + private clazz: string; + private readonly logLevel: number; + private readonly logFile: string; + private readonly logFileWriteStream: WriteStream | null = null; + + constructor(clazz: string) { + this.clazz = clazz; + this.logLevel = LOG_LEVEL; + this.logFile = LOG_FILE; + + if (!!this.logFile) { + this.logFileWriteStream = fs.createWriteStream(this.logFile, {flags: 'a'}); + } + } + + leftPad(str: string, len: number, char: string): string { + str = String(str); + let i: number = -1; + len = len - str.length; + if (char === undefined) { + char = " "; + } + while (++i < len) { + str = char + str; + } + return str; + } + + log(logLevel: number, data: any): void { + if (typeof data === "object") { + data = JSON.stringify(data); + } + let date = new Date(); + + let year: string = this.leftPad(String(date.getFullYear()), 4, '0'); + let month: string = this.leftPad(String(date.getMonth() + 1), 2, '0'); + let day: string = this.leftPad(String(date.getDate()), 2, '0'); + + let hours: string = this.leftPad(String(date.getHours()), 2, '0'); + let minutes: string = this.leftPad(String(date.getMinutes()), 2, '0'); + let seconds: string = this.leftPad(String(date.getSeconds()), 2, '0'); + let milliseconds: string = this.leftPad(String(date.getMilliseconds()), 3, '0'); + + let datePrefix: string = `[${day}/${month}/${year}-${hours}:${minutes}:${seconds}:${milliseconds}]` + + let out: string = datePrefix.padEnd(30, ' ') + `[${this.clazz}]`.padEnd(28, ' ') + `(${logLevel})`.padEnd(8, ' ') + data; + if (logLevel <= this.logLevel) { + console.log(out); + } + if (!!this.logFileWriteStream) { + this.logFileWriteStream.write(out + "\n"); + } + } +} \ No newline at end of file diff --git a/src/PDUProcessor/DebugPduProcessor.ts b/src/PDUProcessor/DebugPduProcessor.ts new file mode 100644 index 0000000..09c9432 --- /dev/null +++ b/src/PDUProcessor/DebugPduProcessor.ts @@ -0,0 +1,14 @@ +import {Center} from "../Center/Center"; +import {PduProcessor} from "./PduProcessor"; + +export class DebugPduProcessor extends PduProcessor { + servesSessionType: string = Center.name; + + processPdu(session: any, pdu: any, ...args: any[]): Promise { + return new Promise((resolve, reject) => { + session.send(pdu.response(), (replyPdu: any) => { + resolve(replyPdu); + }); + }) + } +} \ No newline at end of file diff --git a/src/PDUProcessor/EchoPduProcessor.ts b/src/PDUProcessor/EchoPduProcessor.ts new file mode 100644 index 0000000..6b6b244 --- /dev/null +++ b/src/PDUProcessor/EchoPduProcessor.ts @@ -0,0 +1,25 @@ +import {Center} from "../Center/Center"; +import {PduProcessor} from "./PduProcessor"; +const smpp = require("smpp"); + +export class EchoPduProcessor extends PduProcessor { + servesSessionType: string = Center.name; + processPdu(session: any, pdu: any, ...args: any[]): Promise { + return new Promise((resolve, reject) => { + let promises = []; + let replyPromise = session.send(pdu.response()); + let sendPromise = session.send(new smpp.PDU('deliver_sm', { + source_addr: pdu.destination_addr, + destination_addr: pdu.source_addr, + short_message: pdu.short_message + })); + promises.push(replyPromise); + promises.push(sendPromise); + Promise.all(promises).then((replyPdus: any) => { + resolve(replyPdus); + }).catch((error: any) => { + reject(error); + }); + }); + } +} \ No newline at end of file diff --git a/src/PDUProcessor/PduProcessor.ts b/src/PDUProcessor/PduProcessor.ts new file mode 100644 index 0000000..365e42e --- /dev/null +++ b/src/PDUProcessor/PduProcessor.ts @@ -0,0 +1,57 @@ +import Logger from "../Logger"; +import {SmppSession} from "../SmppSession"; +import {DebugPduProcessor} from "./DebugPduProcessor"; + +export abstract class PduProcessor { + static processors: PduProcessor[] = []; + abstract readonly servesSessionType: string; + readonly name: string = this.constructor.name; + readonly logger: Logger = new Logger(`PduProcessor: ${this.name}`); + private static logger: Logger = new Logger("PduProcessor"); + + static getProcessor(name: string): PduProcessor { + this.logger.log1(`Looking for processor with name ${name}...`); + let pduProcessor = this.processors.find((processor: any) => processor.name === name); + if (pduProcessor) { + this.logger.log1(`Found processor with name ${name}`); + return pduProcessor; + } else { + this.logger.log1(`Processor with name ${name} not found`); + return this.processors[0]; + } + } + + static attachProcessor(session: SmppSession, processor: PduProcessor): void { + this.logger.log1(`Trying to attach processor ${processor.name} to session ${session.constructor.name}-${session.getId()}`); + if (PduProcessor.areCompatible(session, processor)) { + session.addPduProcessor(processor); + } + } + + static detachProcessor(session: SmppSession, processor: PduProcessor): void { + this.logger.log1(`Trying to detach processor ${processor.name} from session ${session.constructor.name}-${session.getId()}`); + session.removePduProcessor(processor); + } + + static areCompatible(session: SmppSession, processor: PduProcessor): boolean { + this.logger.log1(`Checking compatibility between session ${session.constructor.name}-${session.getId()} and processor ${processor.name}`); + return session.constructor.name === processor.servesSessionType; + } + + static addProcessor(processor: any): void { + PduProcessor.processors.push(new processor()); + } + + static getProcessorsForType(type: string): PduProcessor[] { + return this.processors.filter((processor: any) => processor.servesSessionType === type); + } + + abstract processPdu(session: any, pdu: any, ...args: any[]): Promise; + + serialize(): object { + return { + servesSessionType: this.servesSessionType, + name: this.name + }; + } +} \ No newline at end of file diff --git a/src/PersistentPromise.ts b/src/PersistentPromise.ts new file mode 100644 index 0000000..dc7d8c9 --- /dev/null +++ b/src/PersistentPromise.ts @@ -0,0 +1,33 @@ +export default class PersistentPromise { + private readonly promise: Promise; + private promiseResolve: ((value?: any) => void) | undefined; + private promiseReject: ((reason?: any) => void) | undefined; + + constructor(callback: (resolve: (value?: any) => void, reject: (reason?: any) => void) => void) { + this.promise = new Promise((resolve, reject) => { + callback(resolve, reject); + this.promiseResolve = resolve; + this.promiseReject = reject; + }); + } + + getPromise(): Promise { + return this.promise; + } + + resolve(value?: any): void { + if (this.promiseResolve) { + this.promiseResolve(value); + } + } + + reject(reason?: any): void { + if (this.promiseReject) { + this.promiseReject(reason); + } + } + + then(onfulfilled?: ((value: any) => any) | undefined | null, onrejected?: ((reason: any) => any) | undefined | null): Promise { + return this.promise.then(onfulfilled, onrejected); + } +} \ No newline at end of file diff --git a/src/SessionManager.ts b/src/SessionManager.ts new file mode 100644 index 0000000..629dca6 --- /dev/null +++ b/src/SessionManager.ts @@ -0,0 +1,129 @@ +import EventEmitter from "events"; +import fs from "fs"; +import {Job} from "./Job/Job"; +import Logger from "./Logger"; +import {SmppSession} from "./SmppSession"; + +export abstract class SessionManager { + // I could've done this by passing these abstract properties to the constructor, but I wanted to have the possibility + // of implementing additional methods + abstract sessions: SmppSession[]; + abstract sessionId: number; + abstract comparatorFn: (arg: any, session: SmppSession) => boolean; + readonly abstract identifier: string; + readonly abstract ManagedSessionClass: any; + readonly abstract StorageFile: string; + + readonly SESSION_ADDED_EVENT: string = "SESSION ADDED"; + readonly logger: Logger = new Logger("SessionManager"); + readonly eventEmitter: EventEmitter = new EventEmitter(); + + addSession(session: SmppSession): Promise { + return new Promise((resolve, reject) => { + this.logger.log1(`Adding session with id ${session.getId()}`); + this.sessions.push(session); + this.eventEmitter.emit(this.SESSION_ADDED_EVENT, session.getId()); + resolve(); + }); + } + + on(event: string, listener: (...args: any[]) => void): void { + this.eventEmitter.on(event, listener); + } + + getSessions(): Promise { + return Promise.resolve(this.sessions); + } + + removeSession(session: SmppSession): Promise { + return new Promise((resolve, reject) => { + this.logger.log1(`Removing session with id ${session.getId()}`); + this.sessions = this.sessions.filter(s => s.getId() !== session.getId()); + resolve(); + }); + } + + getSession(id: number): Promise { + return new Promise((resolve, reject) => { + this.logger.log1(`Looking for session with id ${id}...`); + let session: SmppSession | undefined = this.sessions.find(s => s.getId() === id); + if (session) { + this.logger.log1(`Found session with id ${id}`); + resolve(session); + } else { + this.logger.log1(`Session with id ${id} not found`); + reject(`Session with id ${id} not found`); + } + }); + } + + serialize(): object { + this.logger.log1(`Serializing ${this.sessions.length} clients`); + return this.sessions.map((session: SmppSession) => { + return session.serialize(); + }); + } + + createSession(arg: any, username: string, password: string): Promise { + return new Promise((resolve, reject) => { + this.logger.log1(`Creating session of type ${this.ManagedSessionClass.name} with arg ${arg}`); + this.getExisting(arg).then((s: SmppSession) => { + resolve(s); + }, err => { + }); + this.verifyField(arg, reject); + this.verifyField(username, reject); + this.verifyField(password, reject); + + let session = new this.ManagedSessionClass(this.sessionId++, arg, username, password); + this.addSession(session).then(() => { + resolve(session); + }); + }); + } + + verifyField(field: string, reject: (reason?: any) => void) { + if (!field) { + let error = `Request to make a new client failed because of missing ${field}.`; + this.logger.log1(error); + reject(error); + } + } + + setup(): void { + try { + this.logger.log1(`Loading ${this.ManagedSessionClass.name} from ${this.StorageFile}`) + let sessions: Buffer = fs.readFileSync(this.StorageFile); + let loadedSessions: any[] = JSON.parse(String(sessions)); + this.logger.log1(`Loaded ${loadedSessions.length} clients from ${this.StorageFile}`); + loadedSessions.forEach(session => { + this.createSession(session.url, session.username, session.password).then((sessionObj: SmppSession) => { + sessionObj.setDefaultSingleJob(Job.deserialize(session.defaultSingleJob)); + sessionObj.setDefaultMultipleJob(Job.deserialize(session.defaultMultipleJob)); + }); + }); + } catch (e) { + this.logger.log1(`Error loading centers from ${this.StorageFile}: ${e}`); + return; + } + } + + cleanup(): void { + this.logger.log1(`Saving centers to ${this.StorageFile}...`); + fs.writeFileSync(this.StorageFile, JSON.stringify(this.serialize(), null, 4)); + } + + getExisting(arg: any): Promise { + return new Promise((resolve, reject) => { + this.logger.log1(`Looking for session with arg ${arg}...`); + let session: SmppSession | undefined = this.sessions.find(this.comparatorFn.bind(this, arg)); + if (session) { + this.logger.log1(`Found session with arg ${arg}`); + resolve(session); + } else { + this.logger.log1(`Session with arg ${arg} not found`); + reject(`Session with arg ${arg} not found`); + } + }); + } +} \ No newline at end of file diff --git a/src/SmppSession.ts b/src/SmppSession.ts new file mode 100644 index 0000000..8a4ef50 --- /dev/null +++ b/src/SmppSession.ts @@ -0,0 +1,180 @@ +import EventEmitter from "events"; +import {Job} from "./Job/Job"; +import Logger from "./Logger"; +import {PduProcessor} from "./PDUProcessor/PduProcessor"; + +const NanoTimer = require("nanotimer"); +const smpp = require("smpp"); + +export abstract class SmppSession { + readonly EVENT: any = { + STATUS_CHANGED: "STATUS_CHANGED", + STATE_CHANGED: "STATE_CHANGED", + ANY_PDU: "ANY_PDU", + MESSAGE_SEND_COUNTER_UPDATE_EVENT: "MESSAGE_SEND_COUNTER_UPDATE_EVENT", + }; + abstract STATUS: string[]; + + abstract id: number; + abstract username: string; + abstract password: string; + abstract status: string; + abstract pduProcessors: PduProcessor[]; + + abstract defaultSingleJob: Job; + abstract defaultMultipleJob: Job; + + readonly UPDATE_WS: string = "UPDATE_WS"; + readonly eventEmitter: EventEmitter = new EventEmitter(); + readonly logger: Logger = new Logger(`SmppSession`); + readonly sendTimer: any = new NanoTimer(); + readonly counterUpdateTimer: any = new NanoTimer(); + readonly MESSAGE_SEND_UPDATE_DELAY: number = Number(process.env.MESSAGE_SEND_UPDATE_DELAY) || 500; + + constructor() { + this.eventEmitter.on(this.EVENT.STATE_CHANGED, () => this.updateWs(this.EVENT.STATE_CHANGED)); + this.eventEmitter.on(this.EVENT.STATUS_CHANGED, () => this.updateWs(this.EVENT.STATUS_CHANGED)); + this.eventEmitter.on(this.EVENT.ANY_PDU, (pdu: any) => this.updateWs(this.EVENT.ANY_PDU, [pdu])); + this.eventEmitter.on(this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT, (count: number) => this.updateWs(this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT, [count])); + } + + abstract sendPdu(pdu: object, force?: boolean): Promise; + + sendSingle(job: Job): Promise { + return this.sendPdu(job.pdu); + } + + sendSingleDefault(): Promise { + return this.sendSingle(this.defaultSingleJob); + } + + abstract sendMultiple(job: Job): Promise; + + sendMultipleDefault(): Promise { + return this.sendMultiple(this.defaultMultipleJob); + } + + cancelSendInterval(): void { + this.sendTimer.clearInterval(); + this.counterUpdateTimer.clearInterval(); + } + + abstract close(): Promise; + + abstract serialize(): object; + + on(event: string, callback: (...args: any[]) => void): void { + this.eventEmitter.on(event, callback); + } + + updateWs(event: string, args?: any[]): void { + this.logger.log1(`Update WS: ${event}`); + let message: { + type: string, + data?: string + } = { + type: event, + }; + switch (event) { + case this.EVENT.STATE_CHANGED: + message.data = JSON.stringify(this.serialize()); + break; + case this.EVENT.STATUS_CHANGED: + message.data = JSON.stringify(this.status); + break; + case this.EVENT.ANY_PDU: + message.data = JSON.stringify(args![0]); + break; + case this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT: + message.data = JSON.stringify(args![0]); + break; + } + this.eventEmitter.emit(this.UPDATE_WS, message); + } + + getDefaultSingleJob(): Job { + return this.defaultSingleJob; + } + + setDefaultSingleJob(job: Job): void { + this.defaultSingleJob = job; + job.on(Job.STATE_CHANGED, this.eventJobUpdated); + this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); + } + + getDefaultMultipleJob(): Job { + return this.defaultMultipleJob; + } + + setDefaultMultipleJob(job: Job): void { + this.defaultMultipleJob = job; + job.on(Job.STATE_CHANGED, this.eventJobUpdated); + this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); + } + + getId(): number { + return this.id; + } + + setStatus(statusIndex: number): void { + this.status = this.STATUS[statusIndex]; + this.eventEmitter.emit(this.EVENT.STATUS_CHANGED, this.status); + } + + setUsername(username: string): void { + this.username = username; + this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); + } + + setPassword(password: string): void { + this.password = password; + this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); + } + + eventJobUpdated(): void { + this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); + } + + addPduProcessor(pduProcessor: PduProcessor): void { + if (this.pduProcessors.indexOf(pduProcessor) === -1) { + this.pduProcessors.push(pduProcessor); + this.logger.log1(`Adding PDU processor: ${pduProcessor.constructor.name}-${this.getId()}, now active: ${this.pduProcessors.length} processors`); + this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); + } else { + this.logger.log1(`PDU processor: ${pduProcessor.constructor.name}-${this.getId()} already attached to session`); + } + } + + removePduProcessor(pduProcessor: PduProcessor): void { + this.pduProcessors = this.pduProcessors.splice(this.pduProcessors.indexOf(pduProcessor), 1); + this.logger.log1(`Removing PDU processor: ${pduProcessor.constructor.name}-${this.getId()}, now active: ${this.pduProcessors.length} processors`); + this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); + } + + getPduProcessors(): PduProcessor[] { + return this.pduProcessors; + } + + serializePduProcessors(): object { + this.logger.log1(`Serializing ${this.pduProcessors.length} clients`) + return this.pduProcessors.map((processor: PduProcessor) => { + return processor.serialize(); + }); + } + + eventAnyPdu(session: any, pdu: any): Promise { + this.eventEmitter.emit(this.EVENT.ANY_PDU, pdu); + let successful: number = 0; + this.pduProcessors.forEach((pduProcessor: PduProcessor) => { + pduProcessor.processPdu(session, pdu).then((result: any) => { + successful++; + }, (error: any) => { + }); + }); + if (successful === 0) { + return Promise.resolve("No PDU processor was able to process the PDU"); + } else { + return Promise.resolve(); + } + } +} \ No newline at end of file diff --git a/src/WS/ClientSet.ts b/src/WS/ClientSet.ts new file mode 100644 index 0000000..34cf925 --- /dev/null +++ b/src/WS/ClientSet.ts @@ -0,0 +1,66 @@ +import Logger from "../Logger"; +import {SessionManager} from "../SessionManager"; +import {SmppSession} from "../SmppSession"; + +export class ClientSet { + identifier: string; + private clients: any[]; + private readonly type: string; + private readonly sessionId: number; + private readonly logger: Logger; + private readonly relevantSessionManager: SessionManager | undefined; + + constructor(identifier: string, sessionManagers: SessionManager[]) { + this.clients = []; + this.identifier = identifier; + + let data: string[] = identifier.split(':'); + this.type = data[0]; + this.sessionId = parseInt(data[1]); + + this.logger = new Logger(`ClientSet-${this.type}-${this.sessionId}`); + + this.relevantSessionManager = sessionManagers.find(sm => sm.identifier === this.type); + if (!this.relevantSessionManager) { + this.logger.log1(`No session manager found for type ${this.type}`); + return; + } + if (this.relevantSessionManager) { + this.relevantSessionManager.getSessions().then((sessions) => { + sessions.forEach((session) => { + this.attachListener(session); + }); + }); + } + this.relevantSessionManager.on(this.relevantSessionManager.SESSION_ADDED_EVENT, this.eventOnSessionAdded.bind(this)); + } + + eventOnSessionAdded(sessionId: number): void { + this.logger.log2(`Session added: ${sessionId}`); + this.relevantSessionManager?.getSession(sessionId).then((session) => { + this.attachListener(session); + }) + } + + add(ws: any): void { + this.logger.log2(`Added client`); + this.clients.push(ws); + ws.on('close', this.eventOnClose.bind(this)); + } + + eventOnClose(ws: any): void { + this.logger.log2(`Removed client`); + this.clients.splice(this.clients.indexOf(ws), 1); + } + + notifyClients(message: string) { + this.logger.log2(`Notifying clients: ${message}`); + this.clients.forEach((ws) => { + ws.send(message); + }); + } + + private attachListener(session: SmppSession) { + session.on(session.UPDATE_WS, (message: object) => this.notifyClients(JSON.stringify(message))); + } +} \ No newline at end of file diff --git a/src/WS/WSServer.ts b/src/WS/WSServer.ts new file mode 100644 index 0000000..82b220e --- /dev/null +++ b/src/WS/WSServer.ts @@ -0,0 +1,292 @@ +import Logger from "../Logger"; +import {SessionManager} from "../SessionManager"; +import {ClientSet} from "./ClientSet"; + +const WebSocket = require("ws"); + +const WS_SERVER_PORT: number = Number(process.env.WS_SERVER_PORT) || 8191; + +export class WSServer { + private readonly clients: ClientSet[]; + private readonly unknownClients: any[]; + private readonly server: any; + private readonly logger: Logger; + private readonly sessionManagers: SessionManager[]; + + constructor(sessionManagers: SessionManager[]) { + this.clients = []; + this.unknownClients = []; + this.server = new WebSocket.Server({port: WS_SERVER_PORT}); + this.sessionManagers = sessionManagers; + this.logger = new Logger("WSServer"); + this.server.on('connection', this.eventOnConnection.bind(this)); + this.logger.log1(`WSServer listening atws://localhost:${WS_SERVER_PORT}`); + } + + private eventOnConnection(ws: WebSocket): void { + this.logger.log1("New connection"); + this.unknownClients.push(ws); + // @ts-ignore + ws.on('message', this.eventOnMessage.bind(this, ws)); + // @ts-ignore + ws.on('close', this.eventOnClose.bind(this, ws)); + } + + private eventOnMessage(ws: any, message: string): void { + this.logger.log1("New message"); + message = String(message); + this.unknownClients.splice(this.unknownClients.indexOf(ws), 1); + let clientSet: ClientSet | undefined = this.clients.find((clientSet: ClientSet) => clientSet.identifier === message); + if (!clientSet) { + clientSet = new ClientSet(message, this.sessionManagers); + } + clientSet.add(ws); + } + + private eventOnClose(ws: any): void { + this.logger.log1("Connection closed"); + this.unknownClients.splice(this.unknownClients.indexOf(ws), 1); + } + + // constructor() { + // // @ts-ignore + // this.server = new WebSocket.Server({port: WS_SERVER_PORT}); + // this.logger = new Logger("WSServer"); + // this.server.on('connection', this.onConnection.bind(this)); + // this.logger.log1(`WSServer listening at ws://localhost:${WS_SERVER_PORT}`); + // } + + // onConnection(ws: WebSocket) { + // this.logger.log1("New connection"); + // this.unknownClients.push(ws); + // ws.on('message', this.onMessage.bind(this, ws)); + // ws.on('close', this.onClose.bind(this, ws)); + // } + // + // addClient(ws, type, sessionId) { + // if (!this.clients[type]) { + // this.clients[type] = {}; + // } + // if (!this.clients[type][sessionId]) { + // this.clients[type][sessionId] = []; + // } + // this.logger.log1(`Adding client ${ws.id} to ${type} session ${sessionId}`); + // + // if (type === "client") { + // if (this.listenersAlreadySetup.indexOf(`client-${sessionId}`) === -1) { + // let session = clientSessionManager.getSession(sessionId); + // if (!!session) { + // this.logger.log1(`Setting up listeners for client session ${sessionId}`); + // session.on(ClientSession.STATUS_CHANGED_EVENT, this.onClientSessionStatusChange.bind(this, sessionId)); + // session.on(ClientSession.ANY_PDU_EVENT, this.onClientSessionPdu.bind(this, sessionId)); + // session.on(ClientSession.MESSAGE_SEND_COUNTER_UPDATE_EVENT, this.onClientMessageCounterUpdate.bind(this, sessionId)); + // } + // this.listenersAlreadySetup.push(`client-${sessionId}`); + // } else { + // this.logger.log1(`Listeners for client session ${sessionId} already set up`); + // } + // } else if (type === "center") { + // if (this.listenersAlreadySetup.indexOf(`center-${sessionId}`) === -1) { + // let session = centerSessionManager.getSession(sessionId); + // if (!!session) { + // this.logger.log1(`Setting up listeners for center session ${sessionId}`); + // session.on(CenterSession.STATUS_CHANGED_EVENT, this.onCenterStatusChange.bind(this, sessionId)); + // session.on(CenterSession.ANY_PDU_EVENT, this.onCenterServerPdu.bind(this, sessionId)); + // session.on(CenterSession.MODE_CHANGED_EVENT, this.onCenterModeChanged.bind(this, sessionId)); + // session.on(CenterSession.SESSION_CHANGED_EVENT, this.onCenterSessionsChanged.bind(this, sessionId)); + // session.on(ClientSession.MESSAGE_SEND_COUNTER_UPDATE_EVENT, this.onCenterMessageCounterUpdate.bind(this, sessionId)); + // } + // this.listenersAlreadySetup.push(`center-${sessionId}`); + // } else { + // this.logger.log1(`Listeners for center session ${sessionId} already set up`); + // } + // } + // + // this.clients[type][sessionId].push(ws); + // this.logger.log1(`Now active ${this.clients[type][sessionId].length} clients in session ID: ${sessionId} of type ${type}`); + // } + // + // onMessage(ws, message) { + // this.logger.log1("New message"); + // message = String(message); + // let data = message.split(":"); + // let type = data[0]; + // let sessionId = data[1]; + // + // this.logger.log1(`Moving client to session ID: ${sessionId} of type ${type}`); + // delete this.unknownClients[ws]; + // this.unknownClients = this.unknownClients.filter(Boolean); + // + // this.addClient(ws, type, sessionId); + // this.logger.log1(`Now active ${this.clients[type][sessionId].length} clients in session ID: ${sessionId} of type ${type}`); + // } + // + // onClose(ws) { + // this.removeClient(ws); + // // this.logger.log6(this.clients); + // this.logger.log1("Connection closed"); + // } + // + // removeClient(ws) { + // this.clients.client = this.removeFromArray(this.clients.client, ws); + // this.clients.center = this.removeFromArray(this.clients.center, ws); + // } + // + // removeFromArray(array, element) { + // for (let sessionId in array) { + // let index = array[sessionId].indexOf(element); + // if (index > -1) { + // delete array[sessionId][index]; + // } + // array[sessionId] = array[sessionId].filter(Boolean); + // if (array[sessionId].length === 0) { + // delete array[sessionId]; + // } + // } + // return array; + // } + // + // onClientSessionStatusChange(sessionId, newStatus) { + // this.logger.log1(`Session with ID ${sessionId} changed`); + // let payload = { + // objectType: "client", + // type: 'status', + // sessionId: sessionId, + // value: newStatus + // } + // let clients = this.clients["client"][sessionId]; + // if (!!clients) { + // this.logger.log1(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); + // clients.forEach(client => { + // client.send(JSON.stringify(payload)); + // }); + // } + // } + // + // onClientSessionPdu(sessionId, pdu) { + // // TODO: Maybe move this to an "ignored" array against who the pdu.command is compared + // if (pdu.command === 'enquire_link_resp' || pdu.command === 'enquire_link') { + // return; + // } + // let clients = this.clients["client"][sessionId]; + // if (!!clients) { + // this.logger.log2(`Session with ID ${sessionId} fired PDU`); + // let payload = { + // objectType: "client", + // type: 'pdu', + // sessionId: sessionId, + // value: pdu + // } + // this.logger.log2(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); + // clients.forEach(client => { + // client.send(JSON.stringify(payload)); + // }); + // } + // } + // + // onClientMessageCounterUpdate(sessionId, counter) { + // this.logger.log2(`Session with ID ${sessionId} updating message send counter`); + // let payload = { + // objectType: "client", + // type: 'counterUpdate', + // sessionId: sessionId, + // value: counter + // } + // let clients = this.clients["client"][sessionId]; + // if (!!clients) { + // this.logger.log2(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); + // clients.forEach(client => { + // client.send(JSON.stringify(payload)); + // }); + // } + // } + // + // onCenterStatusChange(sessionId, newStatus) { + // this.logger.log1(`Session with ID ${sessionId} changed`); + // let payload = { + // objectType: "center", + // type: 'status', + // sessionId: sessionId, + // value: newStatus + // } + // let clients = this.clients["center"][sessionId]; + // if (!!clients) { + // this.logger.log1(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); + // clients.forEach(client => { + // client.send(JSON.stringify(payload)); + // }); + // } + // } + // + // onCenterServerPdu(sessionId, pdu) { + // if (pdu.command === 'enquire_link_resp' || pdu.command === 'enquire_link') { + // return; + // } + // let clients = this.clients["center"][sessionId]; + // if (!!clients) { + // this.logger.log2(`Session with ID ${sessionId} fired PDU`); + // let payload = { + // objectType: "center", + // type: 'pdu', + // sessionId: sessionId, + // value: pdu + // } + // this.logger.log2(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); + // clients.forEach(client => { + // client.send(JSON.stringify(payload)); + // }); + // } + // } + // + // onCenterModeChanged(sessionId, newMode) { + // this.logger.log1(`Session with ID ${sessionId} changed`); + // let payload = { + // objectType: "center", + // type: 'mode', + // sessionId: sessionId, + // value: newMode, + // text: CenterMode[newMode] + // } + // let clients = this.clients["center"][sessionId]; + // if (!!clients) { + // this.logger.log1(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); + // clients.forEach(client => { + // client.send(JSON.stringify(payload)); + // }); + // } + // } + // + // onCenterSessionsChanged(sessionId, newSession) { + // this.logger.log1(`Session with ID ${sessionId} changed`); + // let payload = { + // objectType: "center", + // type: 'sessions', + // sessionId: sessionId, + // value: newSession + // } + // let clients = this.clients["center"][sessionId]; + // if (!!clients) { + // this.logger.log1(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); + // clients.forEach(client => { + // client.send(JSON.stringify(payload)); + // }); + // } + // } + // + // onCenterMessageCounterUpdate(sessionId, counter) { + // this.logger.log2(`Session with ID ${sessionId} updating message send counter`); + // let payload = { + // objectType: "center", + // type: 'counterUpdate', + // sessionId: sessionId, + // value: counter + // } + // let clients = this.clients["center"][sessionId]; + // if (!!clients) { + // this.logger.log2(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`); + // clients.forEach(client => { + // client.send(JSON.stringify(payload)); + // }); + // } + // } +} \ No newline at end of file diff --git a/src/main.ts b/src/main.ts new file mode 100644 index 0000000..d641e55 --- /dev/null +++ b/src/main.ts @@ -0,0 +1,37 @@ +import {Center} from "./Center/Center"; +import {CenterSessionManager} from "./Center/CenterSessionManager"; +import {Client} from "./Client/Client"; +import ClientSessionManager from "./Client/ClientSessionManager"; +import {HttpServer} from "./HttpServer/HttpServer"; +import {Job} from "./Job/Job"; +import Logger from "./Logger"; +import {DebugPduProcessor} from "./PDUProcessor/DebugPduProcessor"; +import {EchoPduProcessor} from "./PDUProcessor/EchoPduProcessor"; +import {PduProcessor} from "./PDUProcessor/PduProcessor"; +import {WSServer} from "./WS/WSServer"; + +const {PDU} = require("smpp"); +// TODO: Add support for encodings +// TODO: Implement some sort of metrics on frontend by counting the pdus + +let logger = new Logger("main"); + +PduProcessor.addProcessor(DebugPduProcessor); +PduProcessor.addProcessor(EchoPduProcessor); + +let clientManager: ClientSessionManager = new ClientSessionManager(); +let centerManager: CenterSessionManager = new CenterSessionManager(); + +let wss: WSServer = new WSServer([clientManager, centerManager]); +let httpServer: HttpServer = new HttpServer(clientManager, centerManager); + +function cleanup(): void { + logger.log1("Cleaning up..."); + clientManager.cleanup(); + process.exit(0); +} + +process.on('exit', cleanup); +process.on('SIGINT', cleanup); +process.on('SIGUSR1', cleanup); +process.on('SIGUSR2', cleanup); \ No newline at end of file diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..a93e486 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "es6", + "module": "commonjs", + "outDir": "./dist", + "rootDir": "./src", + "strict": true, + "moduleResolution": "node", + "esModuleInterop": true + }, + "exclude": [ + "./node_modules" + ] +}