From 96b0257c1d62114924e1b390e7c0b214ffdac368 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Majdand=C5=BEi=C4=87?= Date: Mon, 27 Mar 2023 16:39:15 +0200 Subject: [PATCH] Fix issue where the center thought it still had live connections when a connection died --- main.js | 78 ++++++++++++++++++++++++++++++------------------ websocketTest.js | 28 +++++++++-------- 2 files changed, 64 insertions(+), 42 deletions(-) diff --git a/main.js b/main.js index b005ee0..5080da9 100644 --- a/main.js +++ b/main.js @@ -8,6 +8,7 @@ const express = require("express"); const app = express(); const bodyParser = require("body-parser"); const WebSocket = require("ws"); +const {PDU} = require("smpp"); const SERVER_PORT = process.env.SERVER_PORT || 8190; const WS_SERVER_PORT = process.env.WS_SERVER_PORT || 8191; @@ -19,19 +20,9 @@ const MESSAGE_SEND_UPDATE_DELAY = process.env.MESSAGE_SEND_UPDATE_DELAY || 500; // TODO: Add support for encodings // TODO: Currently there is no feedback about the success of the multi send operation save for the counter // Make a simple event that fires once multi send is complete -// Have every session (client or center) have it's own websocket, this would simplify the code a lot (actually this cannot be done! instead: -// wss.on("connection", (ws, req) => { -// const url = req.url; -// if (url === "/test123") { -// console.log("Connected to test123"); -// // Handle messages from client here -// } else { -// console.log("Invalid URL:", url); -// ws.close(); -// } -// }); // TODO: Implement some sort of metrics on frontend by counting the pdus // TODO: Currently clients don't realize they've been disconnected by time out +// TODO: Currently the center does not realize a session dropped before sending ccredentials [ 'debug', @@ -124,10 +115,11 @@ class ClientSessionStatus { static CONNECTED = "CONNECTED"; static BINDING = "BINDING"; static BOUND = "BOUND"; - static NOT_CONNECTED = "NOT_CONNECTED"; + static NOT_CONNECTED = "NOT CONNECTED"; } class ClientSession { + // TODO: Enable requesting DRs auto_enquire_link_period = 500; eventEmitter = new EventEmitter(); busy = false; @@ -287,6 +279,7 @@ class ClientSession { this.session.bind_transceiver({ system_id: this.username, password: this.password, + registered_delivery: 1 }, this.bindReply.bind(this)); this.bindingPromise.resolve = resolve; this.bindingPromise.reject = reject; @@ -317,7 +310,9 @@ class ClientSession { this.session.submit_sm({ source_addr: source, destination_addr: destination, - short_message: message + short_message: message, + registered_delivery: 1, + message_id: 10 }, pdu => { resolve(pdu); }); @@ -594,6 +589,7 @@ class CenterSession { } else { this.logger.log1("Center connection failed to " + this.port); } + this.logger.log1(`Session on center croaked. Error: ${error}`); this.setStatus(CenterSessionStatus.CONNECTION_PENDING); } @@ -602,6 +598,7 @@ class CenterSession { this.setStatus(CenterSessionStatus.CONNECTION_PENDING); session.on('error', this.error.bind(this)); + session.on('close', this.sessionClosed.bind(this, session)); function bind_transciever(pdu) { this.logger.log1(`Center got a bind_transciever on port ${this.port} with system_id ${pdu.system_id} and password ${pdu.password}`); @@ -612,20 +609,23 @@ class CenterSession { session.resume(); session.on('pdu', this.sessionPdu.bind(this, session)); this.addSession(session); - this.setStatus(CenterSessionStatus.CONNECTED); + if (this.sessions.length > 0) { + this.setStatus(CenterSessionStatus.CONNECTED); + } session.on('debug', (type, msg, payload) => { if (type.includes('pdu.')) { this.eventEmitter.emit(msg, payload); this.eventEmitter.emit(CenterSession.ANY_PDU_EVENT, payload); } }); - session.on('close', this.sessionClosed.bind(this, session)); } else { this.logger.log1(`Center session connection failed, invalid credentials`); session.send(pdu.response({ command_status: smpp.ESME_RBINDFAIL })); - this.setStatus(CenterSessionStatus.WAITING_CONNECTION); + if (this.sessions.length === 0) { + this.setStatus(CenterSessionStatus.WAITING_CONNECTION); + } session.close(); this.session = null; } @@ -635,6 +635,7 @@ class CenterSession { } sessionClosed(session) { + this.logger.log1(`Center session closed on port ${this.port}`); delete this.sessions[this.sessions.indexOf(session)]; this.sessions = this.sessions.filter(Boolean); if (this.sessions.length === 0) { @@ -646,12 +647,18 @@ class CenterSession { if (pdu.command === 'submit_sm') { session.send(pdu.response()); if (this.mode === CenterMode.ECHO) { - this.notify(pdu.source_addr, pdu.destination_addr, pdu.short_message); + this.notify(pdu.destination_addr, pdu.source_addr, pdu.short_message); } // TODO: Figure out how DRs work - // if (this.mode === CenterMode.DR) { - // this.notify(pdu.source_addr, pdu.destination_addr, pdu.short_message); - // } + if (this.mode === CenterMode.DR && pdu.registered_delivery === 1) { + let drPdu = new PDU('deliver_sm', { + receipted_message_id: pdu.message_id, + esm_class: 4, + message_state: 2, + length: 0, + }); + this.send(drPdu).catch(err => console.log(err)); + } } if (pdu.command === 'enquire_link') { session.send(pdu.response()); @@ -681,13 +688,26 @@ class CenterSession { this.getNextSession().deliver_sm({ source_addr: source, destination_addr: destination, - short_message: message + short_message: message, }, pdu => { resolve(pdu); }); }); } + send(pdu) { + return new Promise((resolve, reject) => { + if (!this.canSend()) { + this.logger.log1(`Center cannot send message, no sessions active on ${this.port} or busy`); + reject(`Center cannot send message, no sessions active on ${this.port} or busy`); + return; + } + this.getNextSession().send(pdu, pdu => { + resolve(pdu); + }); + }); + } + configureDefaultInterval(source, destination, message, interval, count) { this.configuredMultiMessageJob = { source: source, @@ -1076,7 +1096,7 @@ class HTTPServer { this.logger.log1(`Sending pre-configured messages on session with ID ${req.params.id}`) if (session) { session.sendDefaultInterval() - .then(pdu => res.send(pdu)) + .then(() => res.send({})) .catch(err => res.status(400).send(JSON.stringify(err))); } else { this.logger.log1(`No session found with ID ${req.params.id}`); @@ -1119,7 +1139,7 @@ class HTTPServer { this.logger.log1(`Cancelling send timer for session with ID ${req.params.id}`); if (session) { session.cancelSendInterval(); - res.send(); + res.send({}); } else { this.logger.log1(`No session found with ID ${req.params.id}`); res.status(404).send(); @@ -1180,7 +1200,7 @@ class HTTPServer { let session = clientSessionManager.getSession(req.params.id); if (session) { clientSessionManager.deleteSession(session); - res.send(); + res.send({}); } else { this.logger.log1(`No session found with ID ${req.params.id}`); res.status(404).send(); @@ -1228,7 +1248,7 @@ class HTTPServer { if (server) { this.logger.log1(`Center session found with ID ${req.params.id}`) server.closeSession(req.params.sessionId) - res.send(); + res.send({}); } else { this.logger.log1(`No center session found with ID ${req.params.id}`); res.status(404).send(); @@ -1241,7 +1261,7 @@ class HTTPServer { if (server) { this.logger.log1(`Center session found with ID ${req.params.id}`) server.deleteSession(req.params.sessionId) - res.send(); + res.send({}); } else { this.logger.log1(`No center session found with ID ${req.params.id}`); res.status(404).send(); @@ -1387,7 +1407,7 @@ class HTTPServer { this.logger.log1(`Cancelling send timer for server with ID ${req.params.id}`); if (server) { server.cancelNotifyInterval(); - res.send(); + res.send({}); } else { this.logger.log1(`No session found with ID ${req.params.id}`); res.status(404).send(); @@ -1415,7 +1435,7 @@ class HTTPServer { let server = centerSessionManager.getSession(req.params.id); if (server) { centerSessionManager.deleteSession(server); - res.send(); + res.send({}); } else { this.logger.log1(`No session found with ID ${req.params.id}`); res.status(404).send(); @@ -1678,7 +1698,7 @@ centerSessionManager.startup(); // let session = clientSessionManager.createSession('smpp://localhost:7001', 'test', 'test'); // let server = centerSessionManager.createSession(7001, 'test', 'test'); -let session = clientSessionManager.getSession(1); +let session = clientSessionManager.getSession(0); let server = centerSessionManager.getSession(1); session.connect() diff --git a/websocketTest.js b/websocketTest.js index ceadb41..d6d3a39 100644 --- a/websocketTest.js +++ b/websocketTest.js @@ -3,7 +3,10 @@ const WebSocket = require('ws'); const WS_SERVER_PORT = process.env.WS_SERVER_PORT || 8191; class Metrics { - static interestingMetrics = ['submit_sm', 'deliver_sm']; + static interestingMetrics = [ + 'submit_sm', + 'deliver_sm' + ]; metrics = {}; constructor() { @@ -31,13 +34,11 @@ let centerMetrics = new Metrics(); const ws = new WebSocket(`ws://localhost:${WS_SERVER_PORT}`); ws.on('open', () => { console.log('WebSocket connection established'); - ws.send("client:0"); + ws.send("client:1"); }); ws.on('message', (data) => { data = JSON.parse(data); - if (data.type === 'pdu') { - clientMetrics.processPdu(data.value); - } + console.log(data); }); const ws2 = new WebSocket(`ws://localhost:${WS_SERVER_PORT}`); @@ -47,13 +48,14 @@ ws2.on('open', () => { }); ws2.on('message', (data) => { data = JSON.parse(data); - if (data.type === 'pdu') { - centerMetrics.processPdu(data.value); - } + console.log(data); + // if (data.type === 'pdu') { + // centerMetrics.processPdu(data.value); + // } }); -setInterval(() => { - console.log(clientMetrics.metrics); - console.log(centerMetrics.metrics); - console.log(""); -}, 500); \ No newline at end of file +// setInterval(() => { +// console.log(clientMetrics.metrics); +// // console.log(centerMetrics.metrics); +// console.log(""); +// }, 500); \ No newline at end of file