Create event for counter update and propagate to websockets

This commit is contained in:
David Majdandžić
2023-03-24 22:13:29 +01:00
parent 6cf5b99705
commit b7eab4f126

42
main.js
View File

@@ -12,7 +12,7 @@ const WebSocket = require("ws");
const SERVER_PORT = process.env.SERVER_PORT || 8190; const SERVER_PORT = process.env.SERVER_PORT || 8190;
const WS_SERVER_PORT = process.env.WS_SERVER_PORT || 8191; const WS_SERVER_PORT = process.env.WS_SERVER_PORT || 8191;
const SESSIONS_FILE = process.env.SESSIONS_FILE || "sessions.json"; const SESSIONS_FILE = process.env.SESSIONS_FILE || "sessions.json";
const MIN_MESSAGE_SEND_UPDATE_DELAY = process.env.MIN_MESSAGE_SEND_UPDATE_DELAY || 500; const MESSAGE_SEND_UPDATE_DELAY = process.env.MESSAGE_SEND_UPDATE_DELAY || 500;
[ [
@@ -136,6 +136,7 @@ class Session {
static STATUS_CHANGED_EVENT = "statusChanged"; static STATUS_CHANGED_EVENT = "statusChanged";
static ANY_PDU_EVENT = "*"; static ANY_PDU_EVENT = "*";
static MESSAGE_SEND_COUNTER_UPDATE_EVENT = "messageSendCounterUpdate";
constructor(id, url, username, password) { constructor(id, url, username, password) {
this.id = id; this.id = id;
@@ -269,14 +270,22 @@ class Session {
reject(`Cannot send many message, not bound to ${this.url} or busy`); reject(`Cannot send many message, not bound to ${this.url} or busy`);
return; return;
} }
// TODO: Create event for counter update
this.busy = true; this.busy = true;
this.timer = new NanoTimer(); this.timer = new NanoTimer();
let counter = 0; let counter = 0;
let previousUpdateCounter = 0;
this.updateTimer = new NanoTimer();
this.updateTimer.setInterval(() => {
if (previousUpdateCounter !== counter) {
this.eventEmitter.emit(Session.MESSAGE_SEND_COUNTER_UPDATE_EVENT, counter);
previousUpdateCounter = counter;
}
}, '', `${MESSAGE_SEND_UPDATE_DELAY / 1000} s`);
this.timer.setInterval(() => { this.timer.setInterval(() => {
if (count > 0 && counter >= count) { if (count > 0 && counter >= count) {
this.timer.clearInterval(); this.cancelSendInterval();
this.busy = false;
} else { } else {
this.send(source, destination, message) this.send(source, destination, message)
.catch(e => this.logger.log1(`Error sending message: ${e}`)); .catch(e => this.logger.log1(`Error sending message: ${e}`));
@@ -289,8 +298,10 @@ class Session {
cancelSendInterval() { cancelSendInterval() {
if (!!this.timer) { if (!!this.timer) {
this.timer.clearInterval() this.timer.clearInterval();
this.updateTimer.clearInterval();
this.timer = null; this.timer = null;
this.updateTimer = null;
} }
this.busy = false; this.busy = false;
} }
@@ -580,8 +591,9 @@ class WSServer {
this.clients[sessionId] = []; this.clients[sessionId] = [];
let session = sessionManager.getSession(sessionId); let session = sessionManager.getSession(sessionId);
if (session) { if (session) {
session.on(Session.STATUS_CHANGED_EVENT, this.onSessionChange.bind(this, sessionId)); // session.on(Session.STATUS_CHANGED_EVENT, this.onSessionChange.bind(this, sessionId));
session.on(Session.ANY_PDU_EVENT, this.pduEvent.bind(this, sessionId)); // session.on(Session.ANY_PDU_EVENT, this.pduEvent.bind(this, sessionId));
session.on(Session.MESSAGE_SEND_COUNTER_UPDATE_EVENT, this.onMessageSendCounterUpdate.bind(this, sessionId));
} }
} }
this.logger.log1(`Added client to session ID: ${sessionId}`); this.logger.log1(`Added client to session ID: ${sessionId}`);
@@ -645,6 +657,22 @@ class WSServer {
}); });
} }
} }
onMessageSendCounterUpdate(sessionId, counter) {
this.logger.log2(`Session with ID ${sessionId} updating message send counter`);
let payload = {
type: 'counterUpdate',
sessionId: sessionId,
value: counter
}
let clients = this.clients[sessionId];
if (!!clients) {
this.logger.log2(`Broadcasting session with ID ${sessionId} to ${clients.length} clients`);
clients.forEach(client => {
client.send(JSON.stringify(payload));
});
}
}
} }
let sessionManager = new SessionManager(); let sessionManager = new SessionManager();