Implement response processors

This commit is contained in:
David Majdandžić
2023-04-05 19:27:05 +02:00
parent 9181033fda
commit 694124a6b7
14 changed files with 118 additions and 87 deletions

View File

@@ -2,6 +2,8 @@ import {PDU} from "../CommonObjects";
import Job from "../Job/Job"; import Job from "../Job/Job";
import Logger from "../Logger"; import Logger from "../Logger";
import PduProcessor from "../PDUProcessor/PduProcessor"; import PduProcessor from "../PDUProcessor/PduProcessor";
import BindTranscieverReplyProcessor from "../PDUProcessor/Postprocessor/Center/BindTranscieverReplyProcessor";
import SubmitSmReplyProcessor from "../PDUProcessor/Postprocessor/Center/SubmitSmReplyProcessor";
import ProcessorManager from "../PDUProcessor/ProcessorManager"; import ProcessorManager from "../PDUProcessor/ProcessorManager";
import SmppSession from "../SmppSession"; import SmppSession from "../SmppSession";
@@ -21,10 +23,9 @@ export default class Center extends SmppSession {
_status: string = this.STATUSES[0]; _status: string = this.STATUSES[0];
port: number; port: number;
pduProcessors: PduProcessor[] = [];
readonly logger: Logger; readonly logger: Logger;
private pendingSessions: any[] = []; pendingSessions: any[] = [];
private sessions: any[] = []; sessions: any[] = [];
private nextSession: number = 0; private nextSession: number = 0;
private server: any; private server: any;
@@ -38,6 +39,9 @@ export default class Center extends SmppSession {
this._defaultSingleJob = Job.createEmptySingle('deliver_sm'); this._defaultSingleJob = Job.createEmptySingle('deliver_sm');
this._defaultMultipleJob = Job.createEmptyMultiple('deliver_sm'); this._defaultMultipleJob = Job.createEmptyMultiple('deliver_sm');
ProcessorManager.attachProcessor(this, ProcessorManager.getProcessor(SubmitSmReplyProcessor.name));
ProcessorManager.attachProcessor(this, ProcessorManager.getProcessor(BindTranscieverReplyProcessor.name));
this.logger = new Logger(`Center-${id}`); this.logger = new Logger(`Center-${id}`);
this.initialize(); this.initialize();
@@ -150,6 +154,16 @@ export default class Center extends SmppSession {
}; };
} }
updateStatus(): void {
if (this.sessions.length > 0) {
this.setStatus(2);
} else if (this.pendingSessions.length > 0) {
this.setStatus(1);
} else {
this.setStatus(0);
}
}
private validateSessions(reject: (reason?: any) => void) { private validateSessions(reject: (reason?: any) => void) {
if (this.sessions.length === 0) { if (this.sessions.length === 0) {
reject(`No clients connected`); reject(`No clients connected`);
@@ -165,38 +179,11 @@ export default class Center extends SmppSession {
return session; return session;
} }
// TODO: Move this to smppSession and call postProcessors
private eventBindTransceiver(session: any, pdu: PDU) {
this.logger.log1(`Center-${this.id} got a bind_transciever with system_id ${pdu.system_id} and password ${pdu.password}`);
session.pause();
if (pdu.system_id === this.username && pdu.password === this.password) {
this.logger.log1(`Center-${this.id} client connection successful`);
if (pdu.response) {
session.send(pdu.response());
}
session.resume();
this.pendingSessions = this.pendingSessions.filter((s) => s !== session);
this.sessions.push(session);
this.updateStatus();
} else {
this.logger.log1(`Center-${this.id} client connection failed, invalid credentials (expected: ${this.username}, ${this.password})`);
if (pdu.response) {
session.send(pdu.response({
command_status: smpp.ESME_RBINDFAIL
}));
}
this.pendingSessions = this.pendingSessions.filter((s) => s !== session);
this.updateStatus();
session.close();
}
}
private eventSessionConnected(session: any): void { private eventSessionConnected(session: any): void {
this.logger.log1(`A client connected to center-${this.id}`); this.logger.log1(`A client connected to center-${this.id}`);
this.pendingSessions.push(session); this.pendingSessions.push(session);
session.on('close', this.eventSessionClose.bind(this, session)); session.on('close', this.eventSessionClose.bind(this, session));
session.on('error', this.eventSessionError.bind(this, session)); session.on('error', this.eventSessionError.bind(this, session));
session.on('bind_transceiver', this.eventBindTransceiver.bind(this, session));
session.on('pdu', this.eventAnyPdu.bind(this, session)); session.on('pdu', this.eventAnyPdu.bind(this, session));
this.updateStatus(); this.updateStatus();
this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize());
@@ -213,31 +200,4 @@ export default class Center extends SmppSession {
this.pendingSessions = this.pendingSessions.filter((s: any) => s !== session); this.pendingSessions = this.pendingSessions.filter((s: any) => s !== session);
this.updateStatus(); this.updateStatus();
} }
private updateStatus(): void {
if (this.sessions.length > 0) {
this.setStatus(2);
} else if (this.pendingSessions.length > 0) {
this.setStatus(1);
} else {
this.setStatus(0);
}
}
// TODO: Move this to smppSession and call postProcessors
eventAnyPdu(session: any, pdu: any): Promise<any> {
this.eventEmitter.emit(this.EVENT.ANY_PDU, pdu);
let successful: number = 0;
this.pduProcessors.forEach((pduProcessor: PduProcessor) => {
pduProcessor.processPdu(session, pdu).then((result: any) => {
successful++;
}, (error: any) => {
});
});
if (successful === 0) {
return Promise.resolve("No PDU processor was able to process the PDU");
} else {
return Promise.resolve();
}
}
} }

View File

@@ -2,6 +2,7 @@ import {PDU} from "../CommonObjects";
import Job from "../Job/Job"; import Job from "../Job/Job";
import Logger from "../Logger"; import Logger from "../Logger";
import PduProcessor from "../PDUProcessor/PduProcessor"; import PduProcessor from "../PDUProcessor/PduProcessor";
import DeliverSmReplyProcessor from "../PDUProcessor/Postprocessor/Client/DeliverSmReplyProcessor";
import ProcessorManager from "../PDUProcessor/ProcessorManager"; import ProcessorManager from "../PDUProcessor/ProcessorManager";
import PersistentPromise from "../PersistentPromise"; import PersistentPromise from "../PersistentPromise";
import SmppSession from "../SmppSession"; import SmppSession from "../SmppSession";
@@ -42,6 +43,8 @@ export default class Client extends SmppSession {
this._defaultSingleJob = Job.createEmptySingle('submit_sm'); this._defaultSingleJob = Job.createEmptySingle('submit_sm');
this._defaultMultipleJob = Job.createEmptyMultiple('submit_sm'); this._defaultMultipleJob = Job.createEmptyMultiple('submit_sm');
ProcessorManager.attachProcessor(this, ProcessorManager.getProcessor(DeliverSmReplyProcessor.name));
this.logger = new Logger(`Client-${id}`); this.logger = new Logger(`Client-${id}`);
} }
@@ -184,12 +187,6 @@ export default class Client extends SmppSession {
}); });
} }
// TODO: Move this to smppSession and call postProcessors
eventAnyPdu(session: any, pdu: any): Promise<any> {
this.eventEmitter.emit(this.EVENT.ANY_PDU, pdu);
return Promise.resolve();
}
private connectSession(): Promise<void> { private connectSession(): Promise<void> {
return new Promise<void>((resolve, reject) => { return new Promise<void>((resolve, reject) => {
this.validateFields(reject); this.validateFields(reject);

View File

@@ -2,8 +2,7 @@ import EventEmitter from "events";
import {PDU, SerializedJob} from "../CommonObjects"; import {PDU, SerializedJob} from "../CommonObjects";
const smpp = require("smpp"); const smpp = require("smpp");
// TODO: Use pdu.data_coding for data coding
// See available schemes here https://messaggio.com/industry-specifications-and-standards/smpp-data-coding-scheme/
export default class Job { export default class Job {
static readonly STATE_CHANGED: string = "STATE_CHANGED"; static readonly STATE_CHANGED: string = "STATE_CHANGED";
private eventEmitter: EventEmitter = new EventEmitter(); private eventEmitter: EventEmitter = new EventEmitter();
@@ -94,7 +93,6 @@ export default class Job {
serialize(): SerializedJob { serialize(): SerializedJob {
return { return {
// todo fix issue where pdu hass short message of type buffer
pdu: this.pdu, pdu: this.pdu,
perSecond: this.perSecond, perSecond: this.perSecond,
count: this.count count: this.count

View File

@@ -1,5 +1,5 @@
import {PDU} from "../CommonObjects";
import Logger from "../Logger"; import Logger from "../Logger";
import SmppSession from "../SmppSession";
export default abstract class PduProcessor { export default abstract class PduProcessor {
readonly abstract type: string readonly abstract type: string
@@ -11,7 +11,7 @@ export default abstract class PduProcessor {
this.sessionType = type; this.sessionType = type;
} }
abstract processPdu(session: any, pdu: PDU, ...args: any[]): Promise<any>; abstract processPdu(session: any, pdu: any, entity?: SmppSession | undefined): Promise<any>;
serialize(): object { serialize(): object {
return { return {

View File

@@ -0,0 +1,46 @@
import Center from "../../../Center/Center";
import SmppSession from "../../../SmppSession";
import Postprocessor from "../Postprocessor";
const smpp = require("smpp");
export default class BindTranscieverReplyProcessor extends Postprocessor {
constructor(type: string) {
super(type);
}
processPdu(session: any, pdu: any, entity?: Center | undefined): Promise<any> {
return new Promise((resolve, reject) => {
if (!!pdu.command && pdu.command === 'bind_transceiver') {
if (!entity) {
reject();
}
this.logger.log1(`Center-${entity?.id} got a bind_transciever with system_id ${pdu.system_id} and password ${pdu.password}`);
session.pause();
if (pdu.system_id === entity?.username && pdu.password === entity?.password) {
this.logger.log1(`Center-${entity?.id} client connection successful`);
if (pdu.response) {
session.send(pdu.response());
}
session.resume();
// @ts-ignore
entity?.pendingSessions = entity?.pendingSessions.filter((s) => s !== session);
entity?.sessions.push(session);
entity?.updateStatus();
} else {
this.logger.log1(`Center-${entity?.id} client connection failed, invalid credentials (expected: ${entity?.username}, ${entity?.password})`);
if (pdu.response) {
session.send(pdu.response({
command_status: smpp.ESME_RBINDFAIL
}));
}
// @ts-ignore
entity?.pendingSessions = entity?.pendingSessions.filter((s) => s !== session);
entity?.updateStatus();
session.close();
}
}
});
}
}

View File

@@ -1,4 +1,4 @@
import {PDU} from "../../../CommonObjects"; import SmppSession from "../../../SmppSession";
import Postprocessor from "../Postprocessor"; import Postprocessor from "../Postprocessor";
export default class DebugPduProcessor extends Postprocessor { export default class DebugPduProcessor extends Postprocessor {
@@ -6,7 +6,7 @@ export default class DebugPduProcessor extends Postprocessor {
super(type); super(type);
} }
processPdu(session: any, pdu: PDU, ...args: any[]): Promise<any> { processPdu(session: any, pdu: any, entity?: SmppSession | undefined): Promise<any> {
return new Promise<any>((resolve, reject) => { return new Promise<any>((resolve, reject) => {
if (pdu.response) { if (pdu.response) {
session.send(pdu.response(), (replyPdu: any) => { session.send(pdu.response(), (replyPdu: any) => {

View File

@@ -1,4 +1,4 @@
import {PDU} from "../../../CommonObjects"; import SmppSession from "../../../SmppSession";
import Postprocessor from "../Postprocessor"; import Postprocessor from "../Postprocessor";
const smpp = require("smpp"); const smpp = require("smpp");
@@ -8,7 +8,7 @@ export default class EchoPduProcessor extends Postprocessor {
super(type); super(type);
} }
processPdu(session: any, pdu: PDU, ...args: any[]): Promise<any> { processPdu(session: any, pdu: any, entity?: SmppSession | undefined): Promise<any> {
return new Promise<any>((resolve, reject) => { return new Promise<any>((resolve, reject) => {
let promises = []; let promises = [];
if (pdu.response) { if (pdu.response) {

View File

@@ -0,0 +1,17 @@
import SmppSession from "../../../SmppSession";
import Postprocessor from "../Postprocessor";
export default class SubmitSmReplyProcessor extends Postprocessor {
constructor(type: string) {
super(type);
}
processPdu(session: any, pdu: any, entity?: SmppSession | undefined): Promise<any> {
return new Promise((resolve, reject) => {
if (!!pdu.command && pdu.command === 'submit_sm') {
session.send(pdu.response());
resolve(pdu);
}
});
}
}

View File

@@ -1,4 +1,4 @@
import {PDU} from "../../../CommonObjects"; import SmppSession from "../../../SmppSession";
import Postprocessor from "../Postprocessor"; import Postprocessor from "../Postprocessor";
export default class DeliverSmReplyProcessor extends Postprocessor { export default class DeliverSmReplyProcessor extends Postprocessor {
@@ -6,11 +6,11 @@ export default class DeliverSmReplyProcessor extends Postprocessor {
super(type); super(type);
} }
processPdu(session: any, pdu: PDU, ...args: any[]): Promise<any> { processPdu(session: any, pdu: any, entity?: SmppSession | undefined): Promise<any> {
return new Promise<any>((resolve, reject) => { return new Promise((resolve, reject) => {
if (!!pdu.command && pdu.command === 'deliver_sm') { if (!!pdu.command && pdu.command === 'deliver_sm') {
// @ts-ignore
session.send(pdu.response()); session.send(pdu.response());
resolve(pdu);
} }
}); });
} }

View File

@@ -1,4 +1,4 @@
import {PDU} from "../../../CommonObjects"; import SmppSession from "../../../SmppSession";
import Preprocessor from "../Preprocessor"; import Preprocessor from "../Preprocessor";
export default class DestinationEnumeratorProcessor extends Preprocessor { export default class DestinationEnumeratorProcessor extends Preprocessor {
@@ -7,7 +7,7 @@ export default class DestinationEnumeratorProcessor extends Preprocessor {
super(type); super(type);
} }
processPdu(session: any, pdu: PDU, ...args: any[]): Promise<any> { processPdu(session: any, pdu: any, entity?: SmppSession | undefined): Promise<any> {
return new Promise<any>((resolve, reject) => { return new Promise<any>((resolve, reject) => {
if (!!pdu.destination_addr) { if (!!pdu.destination_addr) {
pdu.destination_addr = pdu.destination_addr + this.padLeft(String(this.iterator++), '0', 5); pdu.destination_addr = pdu.destination_addr + this.padLeft(String(this.iterator++), '0', 5);

View File

@@ -1,4 +1,4 @@
import {PDU} from "../../../CommonObjects"; import SmppSession from "../../../SmppSession";
import Preprocessor from "../Preprocessor"; import Preprocessor from "../Preprocessor";
export default class SourceEnumeratorProcessor extends Preprocessor { export default class SourceEnumeratorProcessor extends Preprocessor {
@@ -7,7 +7,7 @@ export default class SourceEnumeratorProcessor extends Preprocessor {
super(type); super(type);
} }
processPdu(session: any, pdu: PDU, ...args: any[]): Promise<any> { processPdu(session: any, pdu: any, entity?: SmppSession | undefined): Promise<any> {
return new Promise<any>((resolve, reject) => { return new Promise<any>((resolve, reject) => {
if (!!pdu.source_addr) { if (!!pdu.source_addr) {
pdu.source_addr = pdu.source_addr + this.padLeft(String(this.iterator++), '0', 5); pdu.source_addr = pdu.source_addr + this.padLeft(String(this.iterator++), '0', 5);

View File

@@ -3,8 +3,10 @@ import Client from "../Client/Client";
import Logger from "../Logger"; import Logger from "../Logger";
import SmppSession from "../SmppSession"; import SmppSession from "../SmppSession";
import PduProcessor from "./PduProcessor"; import PduProcessor from "./PduProcessor";
import BindTranscieverReplyProcessor from "./Postprocessor/Center/BindTranscieverReplyProcessor";
import DebugPduProcessor from "./Postprocessor/Center/DebugPduProcessor"; import DebugPduProcessor from "./Postprocessor/Center/DebugPduProcessor";
import EchoPduProcessor from "./Postprocessor/Center/EchoPduProcessor"; import EchoPduProcessor from "./Postprocessor/Center/EchoPduProcessor";
import SubmitSmReplyProcessor from "./Postprocessor/Center/SubmitSmReplyProcessor";
import DeliverSmReplyProcessor from "./Postprocessor/Client/DeliverSmReplyProcessor"; import DeliverSmReplyProcessor from "./Postprocessor/Client/DeliverSmReplyProcessor";
import Postprocessor from "./Postprocessor/Postprocessor"; import Postprocessor from "./Postprocessor/Postprocessor";
import DestinationEnumeratorProcessor from "./Preprocessor/Client/DestinationEnumeratorProcessor"; import DestinationEnumeratorProcessor from "./Preprocessor/Client/DestinationEnumeratorProcessor";
@@ -26,7 +28,9 @@ export default class ProcessorManager {
ProcessorManager.postprocessors = [ ProcessorManager.postprocessors = [
new DebugPduProcessor(Center.name), new DebugPduProcessor(Center.name),
new EchoPduProcessor(Center.name), new EchoPduProcessor(Center.name),
new DeliverSmReplyProcessor(Center.name), new DeliverSmReplyProcessor(Client.name),
new SubmitSmReplyProcessor(Center.name),
new BindTranscieverReplyProcessor(Center.name)
]; ];
} }

View File

@@ -1,4 +1,5 @@
import EventEmitter from "events"; import EventEmitter from "events";
import {PDU} from "./CommonObjects";
import Job from "./Job/Job"; import Job from "./Job/Job";
import Logger from "./Logger"; import Logger from "./Logger";
import PduProcessor from "./PDUProcessor/PduProcessor"; import PduProcessor from "./PDUProcessor/PduProcessor";
@@ -177,7 +178,15 @@ export default abstract class SmppSession {
detachPostprocessor(processor: PduProcessor): void { detachPostprocessor(processor: PduProcessor): void {
this.detachProcessor(processor, this.processors.Postprocessor); this.detachProcessor(processor, this.processors.Postprocessor);
} }
abstract eventAnyPdu(session: any, pdu: any): Promise<any>;
eventAnyPdu(session: any, pdu: PDU): Promise<any> {
if (!!pdu) {
this.eventEmitter.emit(this.EVENT.ANY_PDU, pdu);
this.logger.log6(pdu);
this.processors.Postprocessor.forEach((processor: PduProcessor) => processor.processPdu(session, pdu, this));
}
return Promise.resolve();
}
private detachProcessor(processor: PduProcessor, array: PduProcessor[]): void { private detachProcessor(processor: PduProcessor, array: PduProcessor[]): void {
array.splice(array.indexOf(processor), 1); array.splice(array.indexOf(processor), 1);

View File

@@ -14,9 +14,6 @@ let logger = new Logger("main");
new ProcessorManager(); new ProcessorManager();
let clientManager: ClientSessionManager = new ClientSessionManager(); let clientManager: ClientSessionManager = new ClientSessionManager();
let centerManager: CenterSessionManager = new CenterSessionManager(); let centerManager: CenterSessionManager = new CenterSessionManager();
// TODO: Add support for encodings
// TODO: Fix reading and writing processors
// TODO: Try creating multiple entries with the same arg
let wss: WSServer = new WSServer([clientManager, centerManager]); let wss: WSServer = new WSServer([clientManager, centerManager]);
// let httpServer: HttpServer = new HttpServer(clientManager, centerManager); // let httpServer: HttpServer = new HttpServer(clientManager, centerManager);
@@ -42,8 +39,11 @@ async function main() {
// }, 3000); // }, 3000);
console.log(ProcessorManager.getProcessorsForType(Client.name)); // console.log(ProcessorManager.getProcessorsForType(Client.name));
ProcessorManager.attachProcessor(client, ProcessorManager.getProcessor(SourceEnumeratorProcessor.name)); // ProcessorManager.attachProcessor(client, ProcessorManager.getProcessor(SourceEnumeratorProcessor.name));
await client.doConnect();
await client.doBind();
client.sendMultipleDefault();
console.log("OK"); console.log("OK");
} }