Implement processor attaching and detaching

This commit is contained in:
David Majdandžić
2023-04-05 17:52:44 +02:00
parent 2628445422
commit 6dbb108c2b
4 changed files with 82 additions and 36 deletions

View File

@@ -2,6 +2,7 @@ import {PDU} from "../CommonObjects";
import Logger from "../Logger"; import Logger from "../Logger";
export default abstract class PduProcessor { export default abstract class PduProcessor {
readonly abstract type: string
readonly sessionType: string; readonly sessionType: string;
readonly name: string = this.constructor.name; readonly name: string = this.constructor.name;
readonly logger: Logger = new Logger(`PduProcessor: ${this.name}`); readonly logger: Logger = new Logger(`PduProcessor: ${this.name}`);
@@ -15,7 +16,8 @@ export default abstract class PduProcessor {
serialize(): object { serialize(): object {
return { return {
servesSessionType: this.sessionType, servesSessionType: this.sessionType,
name: this.name name: this.name,
type: this.type
}; };
} }
} }

View File

@@ -6,8 +6,10 @@ import PduProcessor from "./PduProcessor";
import DebugPduProcessor from "./Postprocessor/Center/DebugPduProcessor"; import DebugPduProcessor from "./Postprocessor/Center/DebugPduProcessor";
import EchoPduProcessor from "./Postprocessor/Center/EchoPduProcessor"; import EchoPduProcessor from "./Postprocessor/Center/EchoPduProcessor";
import DeliverSmReplyProcessor from "./Postprocessor/Client/DeliverSmReplyProcessor"; import DeliverSmReplyProcessor from "./Postprocessor/Client/DeliverSmReplyProcessor";
import Postprocessor from "./Postprocessor/Postprocessor";
import DestinationEnumeratorProcessor from "./Preprocessor/Client/DestinationEnumeratorProcessor"; import DestinationEnumeratorProcessor from "./Preprocessor/Client/DestinationEnumeratorProcessor";
import SourceEnumeratorProcessor from "./Preprocessor/Client/SourceEnumeratorProcessor"; import SourceEnumeratorProcessor from "./Preprocessor/Client/SourceEnumeratorProcessor";
import Preprocessor from "./Preprocessor/Preprocessor";
export default class ProcessorManager { export default class ProcessorManager {
static preprocessors: PduProcessor[]; static preprocessors: PduProcessor[];
@@ -45,15 +47,36 @@ export default class ProcessorManager {
} }
static attachProcessor(session: SmppSession, processor: PduProcessor): void { static attachProcessor(session: SmppSession, processor: PduProcessor): void {
this.logger.log1(`Trying to attach processor ${processor.name} to session ${session.constructor.name}-${session.id}`); this.logger.log1(`Trying to attach preprocessor ${processor.name} to session ${session.constructor.name}-${session.id}`);
if (this.areCompatible(session, processor)) { if (this.areCompatible(session, processor)) {
session.addPduProcessor(processor); // This could be done a little better but this is OK for now
switch (processor.type) {
case Preprocessor.name:
session.attachPreprocessor(processor);
break;
case Postprocessor.name:
session.attachPostprocessor(processor);
break;
default:
this.logger.log1(`Processor ${processor.name} is not a preprocessor or a postprocessor`);
break;
}
} }
} }
static detachProcessor(session: SmppSession, processor: PduProcessor): void { static detachProcessor(session: SmppSession, processor: PduProcessor): void {
this.logger.log1(`Trying to detach processor ${processor.name} from session ${session.constructor.name}-${session.id}`); this.logger.log1(`Trying to detach processor ${processor.name} from session ${session.constructor.name}-${session.id}`);
session.removePduProcessor(processor); switch (processor.type) {
case Preprocessor.name:
session.detachPreprocessor(processor);
break;
case Postprocessor.name:
session.detachPostprocessor(processor);
break;
default:
this.logger.log1(`Processor ${processor.name} is not a preprocessor or a postprocessor`);
break;
}
} }
static areCompatible(session: SmppSession, processor: PduProcessor): boolean { static areCompatible(session: SmppSession, processor: PduProcessor): boolean {

View File

@@ -2,6 +2,8 @@ import EventEmitter from "events";
import Job from "./Job/Job"; import Job from "./Job/Job";
import Logger from "./Logger"; import Logger from "./Logger";
import PduProcessor from "./PDUProcessor/PduProcessor"; import PduProcessor from "./PDUProcessor/PduProcessor";
import Postprocessor from "./PDUProcessor/Postprocessor/Postprocessor";
import Preprocessor from "./PDUProcessor/Preprocessor/Preprocessor";
const NanoTimer = require("nanotimer"); const NanoTimer = require("nanotimer");
const smpp = require("smpp"); const smpp = require("smpp");
@@ -14,7 +16,8 @@ export default abstract class SmppSession {
MESSAGE_SEND_COUNTER_UPDATE_EVENT: "MESSAGE_SEND_COUNTER_UPDATE_EVENT", MESSAGE_SEND_COUNTER_UPDATE_EVENT: "MESSAGE_SEND_COUNTER_UPDATE_EVENT",
}; };
abstract STATUSES: string[]; abstract STATUSES: string[];
abstract pduProcessors: PduProcessor[];
processors: { [key: string]: PduProcessor[] } = {};
readonly UPDATE_WS: string = "UPDATE_WS"; readonly UPDATE_WS: string = "UPDATE_WS";
readonly eventEmitter: EventEmitter = new EventEmitter(); readonly eventEmitter: EventEmitter = new EventEmitter();
readonly logger: Logger = new Logger(`SmppSession`); readonly logger: Logger = new Logger(`SmppSession`);
@@ -27,6 +30,9 @@ export default abstract class SmppSession {
this.eventEmitter.on(this.EVENT.STATUS_CHANGED, () => this.updateWs(this.EVENT.STATUS_CHANGED)); this.eventEmitter.on(this.EVENT.STATUS_CHANGED, () => this.updateWs(this.EVENT.STATUS_CHANGED));
this.eventEmitter.on(this.EVENT.ANY_PDU, (pdu: any) => this.updateWs(this.EVENT.ANY_PDU, [pdu])); this.eventEmitter.on(this.EVENT.ANY_PDU, (pdu: any) => this.updateWs(this.EVENT.ANY_PDU, [pdu]));
this.eventEmitter.on(this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT, (count: number) => this.updateWs(this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT, [count])); this.eventEmitter.on(this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT, (count: number) => this.updateWs(this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT, [count]));
this.processors[Preprocessor.name] = [];
this.processors[Postprocessor.name] = [];
} }
abstract _username: string; abstract _username: string;
@@ -156,32 +162,44 @@ export default abstract class SmppSession {
this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize());
} }
addPduProcessor(pduProcessor: PduProcessor): void { attachPreprocessor(processor: PduProcessor): void {
if (this.pduProcessors.indexOf(pduProcessor) === -1) { this.attachProcessor(processor, this.processors.Preprocessor);
this.pduProcessors.push(pduProcessor);
this.logger.log1(`Adding PDU processor: ${pduProcessor.constructor.name}-${this.id}, now active: ${this.pduProcessors.length} processors`);
this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize());
} else {
this.logger.log1(`PDU processor: ${pduProcessor.constructor.name}-${this.id} already attached to session`);
}
} }
removePduProcessor(pduProcessor: PduProcessor): void { attachPostprocessor(processor: PduProcessor): void {
this.pduProcessors.splice(this.pduProcessors.indexOf(pduProcessor), 1); this.attachProcessor(processor, this.processors.Postprocessor);
this.logger.log1(`Removing PDU processor: ${pduProcessor.constructor.name}-${this.id}, now active: ${this.pduProcessors.length} processors`);
this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize());
} }
getPduProcessors(): PduProcessor[] { detachPreprocessor(processor: PduProcessor): void {
return this.pduProcessors; this.detachProcessor(processor, this.processors.Preprocessor);
}
detachPostprocessor(processor: PduProcessor): void {
this.detachProcessor(processor, this.processors.Postprocessor);
} }
serializePduProcessors(): object { serializePduProcessors(): object {
this.logger.log1(`Serializing ${this.pduProcessors.length} clients`) let processors: PduProcessor[] = this.processors.Preprocessor.concat(this.processors.Postprocessor);
return this.pduProcessors.map((processor: PduProcessor) => { return processors.map((processor: PduProcessor) => {
return processor.serialize(); return processor.serialize();
}); });
} }
abstract eventAnyPdu(session: any, pdu: any): Promise<any>; abstract eventAnyPdu(session: any, pdu: any): Promise<any>;
private detachProcessor(processor: PduProcessor, array: PduProcessor[]): void {
array.splice(array.indexOf(processor), 1);
this.logger.log1(`Detaching PDU processor: ${processor.constructor.name}-${this.id}, now active: ${array.length} processors`);
this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize());
}
private attachProcessor(processor: PduProcessor, array: PduProcessor[]): void {
if (array.indexOf(processor) === -1) {
array.push(processor);
this.logger.log1(`Attaching PDU processor: ${processor.constructor.name}-${this.id}, now active: ${array.length} processors`);
this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize());
} else {
this.logger.log1(`PDU processor: ${processor.constructor.name}-${this.id} already attached to session`);
}
}
} }

View File

@@ -3,6 +3,7 @@ import CenterSessionManager from "./Center/CenterSessionManager";
import Client from "./Client/Client"; import Client from "./Client/Client";
import ClientSessionManager from "./Client/ClientSessionManager"; import ClientSessionManager from "./Client/ClientSessionManager";
import Logger from "./Logger"; import Logger from "./Logger";
import SourceEnumeratorProcessor from "./PDUProcessor/Preprocessor/Client/SourceEnumeratorProcessor";
import ProcessorManager from "./PDUProcessor/ProcessorManager"; import ProcessorManager from "./PDUProcessor/ProcessorManager";
import WSServer from "./WS/WSServer"; import WSServer from "./WS/WSServer";
@@ -17,6 +18,7 @@ let centerManager: CenterSessionManager = new CenterSessionManager();
// TODO: Fix reading and writing processors // TODO: Fix reading and writing processors
// TODO: Try creating multiple entries with the same arg // TODO: Try creating multiple entries with the same arg
let wss: WSServer = new WSServer([clientManager, centerManager]); let wss: WSServer = new WSServer([clientManager, centerManager]);
// let httpServer: HttpServer = new HttpServer(clientManager, centerManager); // let httpServer: HttpServer = new HttpServer(clientManager, centerManager);
function cleanup(): void { function cleanup(): void {
@@ -28,23 +30,24 @@ function cleanup(): void {
async function main() { async function main() {
let client: Client = await clientManager.getSession(0) as Client let client: Client = await clientManager.getSession(0) as Client
let center: Center = await centerManager.getSession(0) as Center; // let center: Center = await centerManager.getSession(0) as Center;
setInterval(async () => { // setInterval(async () => {
await client.doConnect(); // await client.doConnect();
setTimeout(async () => { // setTimeout(async () => {
await client.doBind(); // await client.doBind();
setTimeout(async () => { // setTimeout(async () => {
await center.close(); // await center.close();
}, 1000); // }, 1000);
}, 1000); // }, 1000);
}, 3000); // }, 3000);
console.log(ProcessorManager.getProcessorsForType(Client.name));
ProcessorManager.attachProcessor(client, ProcessorManager.getProcessor(SourceEnumeratorProcessor.name));
console.log("OK");
} }
// main(); main();
console.log(ProcessorManager);
// console.log(ProcessorManager.getProcessorsForType(Center.name));
// console.log(ProcessorManager.processors);
console.log("OK");
// process.on('exit', cleanup); // process.on('exit', cleanup);
// process.on('SIGINT', cleanup); // process.on('SIGINT', cleanup);