Implement sending messages on interval
This commit is contained in:
76
main.js
76
main.js
@@ -3,6 +3,7 @@ const keyboard = require("keyboardjs");
|
|||||||
const fs = require("fs");
|
const fs = require("fs");
|
||||||
const path = require("path");
|
const path = require("path");
|
||||||
const EventEmitter = require("events");
|
const EventEmitter = require("events");
|
||||||
|
const NanoTimer = require('nanotimer');
|
||||||
|
|
||||||
const express = require("express");
|
const express = require("express");
|
||||||
const app = express();
|
const app = express();
|
||||||
@@ -115,6 +116,7 @@ class SessionStatus {
|
|||||||
class Session {
|
class Session {
|
||||||
auto_enquire_link_period = 500;
|
auto_enquire_link_period = 500;
|
||||||
eventEmitter = new EventEmitter();
|
eventEmitter = new EventEmitter();
|
||||||
|
busy = false;
|
||||||
|
|
||||||
connectingPromise = {
|
connectingPromise = {
|
||||||
promise: null,
|
promise: null,
|
||||||
@@ -244,9 +246,9 @@ class Session {
|
|||||||
|
|
||||||
send(source, destination, message) {
|
send(source, destination, message) {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
if (this.status !== SessionStatus.BOUND) {
|
if (!this.canSend()) {
|
||||||
this.logger.log1(`Cannot send message, not bound to ${this.url}`);
|
this.logger.log1(`Cannot send message, not bound to ${this.url} or busy`);
|
||||||
reject(`Cannot send message, not bound to ${this.url}`);
|
reject(`Cannot send message, not bound to ${this.url} or busy`);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.logger.log1(`Sending message from ${source} to ${destination} with message ${message}`);
|
this.logger.log1(`Sending message from ${source} to ${destination} with message ${message}`);
|
||||||
@@ -260,6 +262,33 @@ class Session {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sendOnInterval(source, destination, message, interval, count) {
|
||||||
|
// TODO: Create method for stopping sending messages
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
if (!this.canSend() || this.busy) {
|
||||||
|
this.logger.log1(`Cannot send many message, not bound to ${this.url} or busy`);
|
||||||
|
reject(`Cannot send many message, not bound to ${this.url} or busy`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// TODO: Remember to update busy when cancelling the timer
|
||||||
|
// TODO: Create event for counter update
|
||||||
|
this.busy = true;
|
||||||
|
this.timer = new NanoTimer();
|
||||||
|
let counter = 0;
|
||||||
|
this.timer.setInterval(() => {
|
||||||
|
if (count > 0 && counter >= count) {
|
||||||
|
this.timer.clearInterval();
|
||||||
|
this.busy = false;
|
||||||
|
} else {
|
||||||
|
this.send(source, destination, message)
|
||||||
|
.catch(e => this.logger.log1(`Error sending message: ${e}`));
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
}, '', `${interval} s`);
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
close() {
|
close() {
|
||||||
this.disconnectingPromise.promise = new Promise((resolve, reject) => {
|
this.disconnectingPromise.promise = new Promise((resolve, reject) => {
|
||||||
if (this.status !== SessionStatus.BOUND && this.status !== SessionStatus.CONNECTED) {
|
if (this.status !== SessionStatus.BOUND && this.status !== SessionStatus.CONNECTED) {
|
||||||
@@ -287,6 +316,10 @@ class Session {
|
|||||||
status: this.status
|
status: this.status
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
canSend() {
|
||||||
|
return this.status === SessionStatus.BOUND;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class SessionManager {
|
class SessionManager {
|
||||||
@@ -295,11 +328,11 @@ class SessionManager {
|
|||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.sessions = {};
|
this.sessions = {};
|
||||||
process.on('exit', this.cleanup.bind(this));
|
// process.on('exit', this.cleanup.bind(this));
|
||||||
process.on('SIGINT', this.cleanup.bind(this));
|
// process.on('SIGINT', this.cleanup.bind(this));
|
||||||
process.on('SIGUSR1', this.cleanup.bind(this));
|
// process.on('SIGUSR1', this.cleanup.bind(this));
|
||||||
process.on('SIGUSR2', this.cleanup.bind(this));
|
// process.on('SIGUSR2', this.cleanup.bind(this));
|
||||||
process.on('uncaughtException', this.cleanup.bind(this));
|
// process.on('uncaughtException', this.cleanup.bind(this));
|
||||||
}
|
}
|
||||||
|
|
||||||
createSession(url, username, password) {
|
createSession(url, username, password) {
|
||||||
@@ -365,6 +398,7 @@ class HTTPServer {
|
|||||||
app.post('/api/sessions', this.createSession.bind(this));
|
app.post('/api/sessions', this.createSession.bind(this));
|
||||||
app.get('/api/sessions/:id', this.getById.bind(this));
|
app.get('/api/sessions/:id', this.getById.bind(this));
|
||||||
app.post('/api/sessions/:id/send', this.send.bind(this));
|
app.post('/api/sessions/:id/send', this.send.bind(this));
|
||||||
|
app.post('/api/sessions/:id/sendMany', this.sendMany.bind(this));
|
||||||
app.post('/api/sessions/:id/bind', this.bind.bind(this));
|
app.post('/api/sessions/:id/bind', this.bind.bind(this));
|
||||||
app.post('/api/sessions/:id/connect', this.connect.bind(this));
|
app.post('/api/sessions/:id/connect', this.connect.bind(this));
|
||||||
app.delete('/api/sessions/:id/connect', this.disconnect.bind(this));
|
app.delete('/api/sessions/:id/connect', this.disconnect.bind(this));
|
||||||
@@ -414,6 +448,30 @@ class HTTPServer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sendMany(req, res) {
|
||||||
|
// TODO: These requests deserve error handling
|
||||||
|
let session = sessionManager.getSession(req.params.id);
|
||||||
|
let source = req.body.source;
|
||||||
|
let destination = req.body.destination;
|
||||||
|
let message = req.body.message;
|
||||||
|
let interval = req.body.interval / 1000;
|
||||||
|
let count = req.body.count;
|
||||||
|
if (!!req.body.perSecond) {
|
||||||
|
interval = 1 / req.body.perSecond;
|
||||||
|
}
|
||||||
|
let perSecond = 1 / interval;
|
||||||
|
this.logger.log1(
|
||||||
|
`Sending ${count} messages from ${source} to ${destination} with message ${message} on session with ID ${req.params.id} at a rate of ${perSecond} per second.`);
|
||||||
|
if (session) {
|
||||||
|
session.sendOnInterval(source, destination, message, interval, count)
|
||||||
|
.then(pdu => res.send(JSON.stringify(pdu)))
|
||||||
|
.catch(err => res.status(400).send(JSON.stringify(err)));
|
||||||
|
} else {
|
||||||
|
this.logger.log1(`No session found with ID ${req.params.id}`);
|
||||||
|
res.status(404).send();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bind(req, res) {
|
bind(req, res) {
|
||||||
this.logger.log1(`Binding session with ID ${req.params.id}`)
|
this.logger.log1(`Binding session with ID ${req.params.id}`)
|
||||||
// Maybe make this async?
|
// Maybe make this async?
|
||||||
@@ -558,5 +616,7 @@ class WSServer {
|
|||||||
|
|
||||||
let sessionManager = new SessionManager();
|
let sessionManager = new SessionManager();
|
||||||
sessionManager.startup();
|
sessionManager.startup();
|
||||||
|
let session = sessionManager.getSession(0);
|
||||||
|
session.connect().then(() => session.bind());
|
||||||
new WSServer();
|
new WSServer();
|
||||||
new HTTPServer();
|
new HTTPServer();
|
||||||
@@ -13,6 +13,7 @@
|
|||||||
"body-parser": "^1.20.2",
|
"body-parser": "^1.20.2",
|
||||||
"express": "^4.18.2",
|
"express": "^4.18.2",
|
||||||
"keyboardjs": "^2.7.0",
|
"keyboardjs": "^2.7.0",
|
||||||
|
"nanotimer": "^0.3.15",
|
||||||
"smpp": "^0.6.0-rc.4",
|
"smpp": "^0.6.0-rc.4",
|
||||||
"ws": "^8.13.0"
|
"ws": "^8.13.0"
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user