Fix issue where the center thought it still had live connections when a connection died

This commit is contained in:
David Majdandžić
2023-03-27 16:39:15 +02:00
parent 21d0e515e8
commit 96b0257c1d
2 changed files with 64 additions and 42 deletions

78
main.js
View File

@@ -8,6 +8,7 @@ const express = require("express");
const app = express(); const app = express();
const bodyParser = require("body-parser"); const bodyParser = require("body-parser");
const WebSocket = require("ws"); const WebSocket = require("ws");
const {PDU} = require("smpp");
const SERVER_PORT = process.env.SERVER_PORT || 8190; const SERVER_PORT = process.env.SERVER_PORT || 8190;
const WS_SERVER_PORT = process.env.WS_SERVER_PORT || 8191; const WS_SERVER_PORT = process.env.WS_SERVER_PORT || 8191;
@@ -19,19 +20,9 @@ const MESSAGE_SEND_UPDATE_DELAY = process.env.MESSAGE_SEND_UPDATE_DELAY || 500;
// TODO: Add support for encodings // TODO: Add support for encodings
// TODO: Currently there is no feedback about the success of the multi send operation save for the counter // TODO: Currently there is no feedback about the success of the multi send operation save for the counter
// Make a simple event that fires once multi send is complete // Make a simple event that fires once multi send is complete
// Have every session (client or center) have it's own websocket, this would simplify the code a lot (actually this cannot be done! instead:
// wss.on("connection", (ws, req) => {
// const url = req.url;
// if (url === "/test123") {
// console.log("Connected to test123");
// // Handle messages from client here
// } else {
// console.log("Invalid URL:", url);
// ws.close();
// }
// });
// TODO: Implement some sort of metrics on frontend by counting the pdus // TODO: Implement some sort of metrics on frontend by counting the pdus
// TODO: Currently clients don't realize they've been disconnected by time out // TODO: Currently clients don't realize they've been disconnected by time out
// TODO: Currently the center does not realize a session dropped before sending ccredentials
[ [
'debug', 'debug',
@@ -124,10 +115,11 @@ class ClientSessionStatus {
static CONNECTED = "CONNECTED"; static CONNECTED = "CONNECTED";
static BINDING = "BINDING"; static BINDING = "BINDING";
static BOUND = "BOUND"; static BOUND = "BOUND";
static NOT_CONNECTED = "NOT_CONNECTED"; static NOT_CONNECTED = "NOT CONNECTED";
} }
class ClientSession { class ClientSession {
// TODO: Enable requesting DRs
auto_enquire_link_period = 500; auto_enquire_link_period = 500;
eventEmitter = new EventEmitter(); eventEmitter = new EventEmitter();
busy = false; busy = false;
@@ -287,6 +279,7 @@ class ClientSession {
this.session.bind_transceiver({ this.session.bind_transceiver({
system_id: this.username, system_id: this.username,
password: this.password, password: this.password,
registered_delivery: 1
}, this.bindReply.bind(this)); }, this.bindReply.bind(this));
this.bindingPromise.resolve = resolve; this.bindingPromise.resolve = resolve;
this.bindingPromise.reject = reject; this.bindingPromise.reject = reject;
@@ -317,7 +310,9 @@ class ClientSession {
this.session.submit_sm({ this.session.submit_sm({
source_addr: source, source_addr: source,
destination_addr: destination, destination_addr: destination,
short_message: message short_message: message,
registered_delivery: 1,
message_id: 10
}, pdu => { }, pdu => {
resolve(pdu); resolve(pdu);
}); });
@@ -594,6 +589,7 @@ class CenterSession {
} else { } else {
this.logger.log1("Center connection failed to " + this.port); this.logger.log1("Center connection failed to " + this.port);
} }
this.logger.log1(`Session on center croaked. Error: ${error}`);
this.setStatus(CenterSessionStatus.CONNECTION_PENDING); this.setStatus(CenterSessionStatus.CONNECTION_PENDING);
} }
@@ -602,6 +598,7 @@ class CenterSession {
this.setStatus(CenterSessionStatus.CONNECTION_PENDING); this.setStatus(CenterSessionStatus.CONNECTION_PENDING);
session.on('error', this.error.bind(this)); session.on('error', this.error.bind(this));
session.on('close', this.sessionClosed.bind(this, session));
function bind_transciever(pdu) { function bind_transciever(pdu) {
this.logger.log1(`Center got a bind_transciever on port ${this.port} with system_id ${pdu.system_id} and password ${pdu.password}`); this.logger.log1(`Center got a bind_transciever on port ${this.port} with system_id ${pdu.system_id} and password ${pdu.password}`);
@@ -612,20 +609,23 @@ class CenterSession {
session.resume(); session.resume();
session.on('pdu', this.sessionPdu.bind(this, session)); session.on('pdu', this.sessionPdu.bind(this, session));
this.addSession(session); this.addSession(session);
this.setStatus(CenterSessionStatus.CONNECTED); if (this.sessions.length > 0) {
this.setStatus(CenterSessionStatus.CONNECTED);
}
session.on('debug', (type, msg, payload) => { session.on('debug', (type, msg, payload) => {
if (type.includes('pdu.')) { if (type.includes('pdu.')) {
this.eventEmitter.emit(msg, payload); this.eventEmitter.emit(msg, payload);
this.eventEmitter.emit(CenterSession.ANY_PDU_EVENT, payload); this.eventEmitter.emit(CenterSession.ANY_PDU_EVENT, payload);
} }
}); });
session.on('close', this.sessionClosed.bind(this, session));
} else { } else {
this.logger.log1(`Center session connection failed, invalid credentials`); this.logger.log1(`Center session connection failed, invalid credentials`);
session.send(pdu.response({ session.send(pdu.response({
command_status: smpp.ESME_RBINDFAIL command_status: smpp.ESME_RBINDFAIL
})); }));
this.setStatus(CenterSessionStatus.WAITING_CONNECTION); if (this.sessions.length === 0) {
this.setStatus(CenterSessionStatus.WAITING_CONNECTION);
}
session.close(); session.close();
this.session = null; this.session = null;
} }
@@ -635,6 +635,7 @@ class CenterSession {
} }
sessionClosed(session) { sessionClosed(session) {
this.logger.log1(`Center session closed on port ${this.port}`);
delete this.sessions[this.sessions.indexOf(session)]; delete this.sessions[this.sessions.indexOf(session)];
this.sessions = this.sessions.filter(Boolean); this.sessions = this.sessions.filter(Boolean);
if (this.sessions.length === 0) { if (this.sessions.length === 0) {
@@ -646,12 +647,18 @@ class CenterSession {
if (pdu.command === 'submit_sm') { if (pdu.command === 'submit_sm') {
session.send(pdu.response()); session.send(pdu.response());
if (this.mode === CenterMode.ECHO) { if (this.mode === CenterMode.ECHO) {
this.notify(pdu.source_addr, pdu.destination_addr, pdu.short_message); this.notify(pdu.destination_addr, pdu.source_addr, pdu.short_message);
} }
// TODO: Figure out how DRs work // TODO: Figure out how DRs work
// if (this.mode === CenterMode.DR) { if (this.mode === CenterMode.DR && pdu.registered_delivery === 1) {
// this.notify(pdu.source_addr, pdu.destination_addr, pdu.short_message); let drPdu = new PDU('deliver_sm', {
// } receipted_message_id: pdu.message_id,
esm_class: 4,
message_state: 2,
length: 0,
});
this.send(drPdu).catch(err => console.log(err));
}
} }
if (pdu.command === 'enquire_link') { if (pdu.command === 'enquire_link') {
session.send(pdu.response()); session.send(pdu.response());
@@ -681,13 +688,26 @@ class CenterSession {
this.getNextSession().deliver_sm({ this.getNextSession().deliver_sm({
source_addr: source, source_addr: source,
destination_addr: destination, destination_addr: destination,
short_message: message short_message: message,
}, pdu => { }, pdu => {
resolve(pdu); resolve(pdu);
}); });
}); });
} }
send(pdu) {
return new Promise((resolve, reject) => {
if (!this.canSend()) {
this.logger.log1(`Center cannot send message, no sessions active on ${this.port} or busy`);
reject(`Center cannot send message, no sessions active on ${this.port} or busy`);
return;
}
this.getNextSession().send(pdu, pdu => {
resolve(pdu);
});
});
}
configureDefaultInterval(source, destination, message, interval, count) { configureDefaultInterval(source, destination, message, interval, count) {
this.configuredMultiMessageJob = { this.configuredMultiMessageJob = {
source: source, source: source,
@@ -1076,7 +1096,7 @@ class HTTPServer {
this.logger.log1(`Sending pre-configured messages on session with ID ${req.params.id}`) this.logger.log1(`Sending pre-configured messages on session with ID ${req.params.id}`)
if (session) { if (session) {
session.sendDefaultInterval() session.sendDefaultInterval()
.then(pdu => res.send(pdu)) .then(() => res.send({}))
.catch(err => res.status(400).send(JSON.stringify(err))); .catch(err => res.status(400).send(JSON.stringify(err)));
} else { } else {
this.logger.log1(`No session found with ID ${req.params.id}`); this.logger.log1(`No session found with ID ${req.params.id}`);
@@ -1119,7 +1139,7 @@ class HTTPServer {
this.logger.log1(`Cancelling send timer for session with ID ${req.params.id}`); this.logger.log1(`Cancelling send timer for session with ID ${req.params.id}`);
if (session) { if (session) {
session.cancelSendInterval(); session.cancelSendInterval();
res.send(); res.send({});
} else { } else {
this.logger.log1(`No session found with ID ${req.params.id}`); this.logger.log1(`No session found with ID ${req.params.id}`);
res.status(404).send(); res.status(404).send();
@@ -1180,7 +1200,7 @@ class HTTPServer {
let session = clientSessionManager.getSession(req.params.id); let session = clientSessionManager.getSession(req.params.id);
if (session) { if (session) {
clientSessionManager.deleteSession(session); clientSessionManager.deleteSession(session);
res.send(); res.send({});
} else { } else {
this.logger.log1(`No session found with ID ${req.params.id}`); this.logger.log1(`No session found with ID ${req.params.id}`);
res.status(404).send(); res.status(404).send();
@@ -1228,7 +1248,7 @@ class HTTPServer {
if (server) { if (server) {
this.logger.log1(`Center session found with ID ${req.params.id}`) this.logger.log1(`Center session found with ID ${req.params.id}`)
server.closeSession(req.params.sessionId) server.closeSession(req.params.sessionId)
res.send(); res.send({});
} else { } else {
this.logger.log1(`No center session found with ID ${req.params.id}`); this.logger.log1(`No center session found with ID ${req.params.id}`);
res.status(404).send(); res.status(404).send();
@@ -1241,7 +1261,7 @@ class HTTPServer {
if (server) { if (server) {
this.logger.log1(`Center session found with ID ${req.params.id}`) this.logger.log1(`Center session found with ID ${req.params.id}`)
server.deleteSession(req.params.sessionId) server.deleteSession(req.params.sessionId)
res.send(); res.send({});
} else { } else {
this.logger.log1(`No center session found with ID ${req.params.id}`); this.logger.log1(`No center session found with ID ${req.params.id}`);
res.status(404).send(); res.status(404).send();
@@ -1387,7 +1407,7 @@ class HTTPServer {
this.logger.log1(`Cancelling send timer for server with ID ${req.params.id}`); this.logger.log1(`Cancelling send timer for server with ID ${req.params.id}`);
if (server) { if (server) {
server.cancelNotifyInterval(); server.cancelNotifyInterval();
res.send(); res.send({});
} else { } else {
this.logger.log1(`No session found with ID ${req.params.id}`); this.logger.log1(`No session found with ID ${req.params.id}`);
res.status(404).send(); res.status(404).send();
@@ -1415,7 +1435,7 @@ class HTTPServer {
let server = centerSessionManager.getSession(req.params.id); let server = centerSessionManager.getSession(req.params.id);
if (server) { if (server) {
centerSessionManager.deleteSession(server); centerSessionManager.deleteSession(server);
res.send(); res.send({});
} else { } else {
this.logger.log1(`No session found with ID ${req.params.id}`); this.logger.log1(`No session found with ID ${req.params.id}`);
res.status(404).send(); res.status(404).send();
@@ -1678,7 +1698,7 @@ centerSessionManager.startup();
// let session = clientSessionManager.createSession('smpp://localhost:7001', 'test', 'test'); // let session = clientSessionManager.createSession('smpp://localhost:7001', 'test', 'test');
// let server = centerSessionManager.createSession(7001, 'test', 'test'); // let server = centerSessionManager.createSession(7001, 'test', 'test');
let session = clientSessionManager.getSession(1); let session = clientSessionManager.getSession(0);
let server = centerSessionManager.getSession(1); let server = centerSessionManager.getSession(1);
session.connect() session.connect()

View File

@@ -3,7 +3,10 @@ const WebSocket = require('ws');
const WS_SERVER_PORT = process.env.WS_SERVER_PORT || 8191; const WS_SERVER_PORT = process.env.WS_SERVER_PORT || 8191;
class Metrics { class Metrics {
static interestingMetrics = ['submit_sm', 'deliver_sm']; static interestingMetrics = [
'submit_sm',
'deliver_sm'
];
metrics = {}; metrics = {};
constructor() { constructor() {
@@ -31,13 +34,11 @@ let centerMetrics = new Metrics();
const ws = new WebSocket(`ws://localhost:${WS_SERVER_PORT}`); const ws = new WebSocket(`ws://localhost:${WS_SERVER_PORT}`);
ws.on('open', () => { ws.on('open', () => {
console.log('WebSocket connection established'); console.log('WebSocket connection established');
ws.send("client:0"); ws.send("client:1");
}); });
ws.on('message', (data) => { ws.on('message', (data) => {
data = JSON.parse(data); data = JSON.parse(data);
if (data.type === 'pdu') { console.log(data);
clientMetrics.processPdu(data.value);
}
}); });
const ws2 = new WebSocket(`ws://localhost:${WS_SERVER_PORT}`); const ws2 = new WebSocket(`ws://localhost:${WS_SERVER_PORT}`);
@@ -47,13 +48,14 @@ ws2.on('open', () => {
}); });
ws2.on('message', (data) => { ws2.on('message', (data) => {
data = JSON.parse(data); data = JSON.parse(data);
if (data.type === 'pdu') { console.log(data);
centerMetrics.processPdu(data.value); // if (data.type === 'pdu') {
} // centerMetrics.processPdu(data.value);
// }
}); });
setInterval(() => { // setInterval(() => {
console.log(clientMetrics.metrics); // console.log(clientMetrics.metrics);
console.log(centerMetrics.metrics); // // console.log(centerMetrics.metrics);
console.log(""); // console.log("");
}, 500); // }, 500);