From 6dbb108c2bebecf6af3de61928f11c6d7e27396c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Majdand=C5=BEi=C4=87?= Date: Wed, 5 Apr 2023 17:52:44 +0200 Subject: [PATCH] Implement processor attaching and detaching --- src/PDUProcessor/PduProcessor.ts | 4 ++- src/PDUProcessor/ProcessorManager.ts | 29 ++++++++++++++-- src/SmppSession.ts | 52 +++++++++++++++++++--------- src/main.ts | 33 ++++++++++-------- 4 files changed, 82 insertions(+), 36 deletions(-) diff --git a/src/PDUProcessor/PduProcessor.ts b/src/PDUProcessor/PduProcessor.ts index 1ff5d93..2782865 100644 --- a/src/PDUProcessor/PduProcessor.ts +++ b/src/PDUProcessor/PduProcessor.ts @@ -2,6 +2,7 @@ import {PDU} from "../CommonObjects"; import Logger from "../Logger"; export default abstract class PduProcessor { + readonly abstract type: string readonly sessionType: string; readonly name: string = this.constructor.name; readonly logger: Logger = new Logger(`PduProcessor: ${this.name}`); @@ -15,7 +16,8 @@ export default abstract class PduProcessor { serialize(): object { return { servesSessionType: this.sessionType, - name: this.name + name: this.name, + type: this.type }; } } \ No newline at end of file diff --git a/src/PDUProcessor/ProcessorManager.ts b/src/PDUProcessor/ProcessorManager.ts index 4a38f59..34b92d7 100644 --- a/src/PDUProcessor/ProcessorManager.ts +++ b/src/PDUProcessor/ProcessorManager.ts @@ -6,8 +6,10 @@ import PduProcessor from "./PduProcessor"; import DebugPduProcessor from "./Postprocessor/Center/DebugPduProcessor"; import EchoPduProcessor from "./Postprocessor/Center/EchoPduProcessor"; import DeliverSmReplyProcessor from "./Postprocessor/Client/DeliverSmReplyProcessor"; +import Postprocessor from "./Postprocessor/Postprocessor"; import DestinationEnumeratorProcessor from "./Preprocessor/Client/DestinationEnumeratorProcessor"; import SourceEnumeratorProcessor from "./Preprocessor/Client/SourceEnumeratorProcessor"; +import Preprocessor from "./Preprocessor/Preprocessor"; export default class ProcessorManager { static preprocessors: PduProcessor[]; @@ -45,15 +47,36 @@ export default class ProcessorManager { } static attachProcessor(session: SmppSession, processor: PduProcessor): void { - this.logger.log1(`Trying to attach processor ${processor.name} to session ${session.constructor.name}-${session.id}`); + this.logger.log1(`Trying to attach preprocessor ${processor.name} to session ${session.constructor.name}-${session.id}`); if (this.areCompatible(session, processor)) { - session.addPduProcessor(processor); + // This could be done a little better but this is OK for now + switch (processor.type) { + case Preprocessor.name: + session.attachPreprocessor(processor); + break; + case Postprocessor.name: + session.attachPostprocessor(processor); + break; + default: + this.logger.log1(`Processor ${processor.name} is not a preprocessor or a postprocessor`); + break; + } } } static detachProcessor(session: SmppSession, processor: PduProcessor): void { this.logger.log1(`Trying to detach processor ${processor.name} from session ${session.constructor.name}-${session.id}`); - session.removePduProcessor(processor); + switch (processor.type) { + case Preprocessor.name: + session.detachPreprocessor(processor); + break; + case Postprocessor.name: + session.detachPostprocessor(processor); + break; + default: + this.logger.log1(`Processor ${processor.name} is not a preprocessor or a postprocessor`); + break; + } } static areCompatible(session: SmppSession, processor: PduProcessor): boolean { diff --git a/src/SmppSession.ts b/src/SmppSession.ts index 2f09e24..d389a88 100644 --- a/src/SmppSession.ts +++ b/src/SmppSession.ts @@ -2,6 +2,8 @@ import EventEmitter from "events"; import Job from "./Job/Job"; import Logger from "./Logger"; import PduProcessor from "./PDUProcessor/PduProcessor"; +import Postprocessor from "./PDUProcessor/Postprocessor/Postprocessor"; +import Preprocessor from "./PDUProcessor/Preprocessor/Preprocessor"; const NanoTimer = require("nanotimer"); const smpp = require("smpp"); @@ -14,7 +16,8 @@ export default abstract class SmppSession { MESSAGE_SEND_COUNTER_UPDATE_EVENT: "MESSAGE_SEND_COUNTER_UPDATE_EVENT", }; abstract STATUSES: string[]; - abstract pduProcessors: PduProcessor[]; + + processors: { [key: string]: PduProcessor[] } = {}; readonly UPDATE_WS: string = "UPDATE_WS"; readonly eventEmitter: EventEmitter = new EventEmitter(); readonly logger: Logger = new Logger(`SmppSession`); @@ -27,6 +30,9 @@ export default abstract class SmppSession { this.eventEmitter.on(this.EVENT.STATUS_CHANGED, () => this.updateWs(this.EVENT.STATUS_CHANGED)); this.eventEmitter.on(this.EVENT.ANY_PDU, (pdu: any) => this.updateWs(this.EVENT.ANY_PDU, [pdu])); this.eventEmitter.on(this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT, (count: number) => this.updateWs(this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT, [count])); + + this.processors[Preprocessor.name] = []; + this.processors[Postprocessor.name] = []; } abstract _username: string; @@ -156,32 +162,44 @@ export default abstract class SmppSession { this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); } - addPduProcessor(pduProcessor: PduProcessor): void { - if (this.pduProcessors.indexOf(pduProcessor) === -1) { - this.pduProcessors.push(pduProcessor); - this.logger.log1(`Adding PDU processor: ${pduProcessor.constructor.name}-${this.id}, now active: ${this.pduProcessors.length} processors`); - this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); - } else { - this.logger.log1(`PDU processor: ${pduProcessor.constructor.name}-${this.id} already attached to session`); - } + attachPreprocessor(processor: PduProcessor): void { + this.attachProcessor(processor, this.processors.Preprocessor); } - removePduProcessor(pduProcessor: PduProcessor): void { - this.pduProcessors.splice(this.pduProcessors.indexOf(pduProcessor), 1); - this.logger.log1(`Removing PDU processor: ${pduProcessor.constructor.name}-${this.id}, now active: ${this.pduProcessors.length} processors`); - this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); + attachPostprocessor(processor: PduProcessor): void { + this.attachProcessor(processor, this.processors.Postprocessor); } - getPduProcessors(): PduProcessor[] { - return this.pduProcessors; + detachPreprocessor(processor: PduProcessor): void { + this.detachProcessor(processor, this.processors.Preprocessor); + } + + detachPostprocessor(processor: PduProcessor): void { + this.detachProcessor(processor, this.processors.Postprocessor); } serializePduProcessors(): object { - this.logger.log1(`Serializing ${this.pduProcessors.length} clients`) - return this.pduProcessors.map((processor: PduProcessor) => { + let processors: PduProcessor[] = this.processors.Preprocessor.concat(this.processors.Postprocessor); + return processors.map((processor: PduProcessor) => { return processor.serialize(); }); } abstract eventAnyPdu(session: any, pdu: any): Promise; + + private detachProcessor(processor: PduProcessor, array: PduProcessor[]): void { + array.splice(array.indexOf(processor), 1); + this.logger.log1(`Detaching PDU processor: ${processor.constructor.name}-${this.id}, now active: ${array.length} processors`); + this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); + } + + private attachProcessor(processor: PduProcessor, array: PduProcessor[]): void { + if (array.indexOf(processor) === -1) { + array.push(processor); + this.logger.log1(`Attaching PDU processor: ${processor.constructor.name}-${this.id}, now active: ${array.length} processors`); + this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); + } else { + this.logger.log1(`PDU processor: ${processor.constructor.name}-${this.id} already attached to session`); + } + } } \ No newline at end of file diff --git a/src/main.ts b/src/main.ts index 23a8354..9dee230 100644 --- a/src/main.ts +++ b/src/main.ts @@ -3,6 +3,7 @@ import CenterSessionManager from "./Center/CenterSessionManager"; import Client from "./Client/Client"; import ClientSessionManager from "./Client/ClientSessionManager"; import Logger from "./Logger"; +import SourceEnumeratorProcessor from "./PDUProcessor/Preprocessor/Client/SourceEnumeratorProcessor"; import ProcessorManager from "./PDUProcessor/ProcessorManager"; import WSServer from "./WS/WSServer"; @@ -17,6 +18,7 @@ let centerManager: CenterSessionManager = new CenterSessionManager(); // TODO: Fix reading and writing processors // TODO: Try creating multiple entries with the same arg let wss: WSServer = new WSServer([clientManager, centerManager]); + // let httpServer: HttpServer = new HttpServer(clientManager, centerManager); function cleanup(): void { @@ -28,23 +30,24 @@ function cleanup(): void { async function main() { let client: Client = await clientManager.getSession(0) as Client - let center: Center = await centerManager.getSession(0) as Center; - setInterval(async () => { - await client.doConnect(); - setTimeout(async () => { - await client.doBind(); - setTimeout(async () => { - await center.close(); - }, 1000); - }, 1000); - }, 3000); + // let center: Center = await centerManager.getSession(0) as Center; + // setInterval(async () => { + // await client.doConnect(); + // setTimeout(async () => { + // await client.doBind(); + // setTimeout(async () => { + // await center.close(); + // }, 1000); + // }, 1000); + // }, 3000); + + + console.log(ProcessorManager.getProcessorsForType(Client.name)); + ProcessorManager.attachProcessor(client, ProcessorManager.getProcessor(SourceEnumeratorProcessor.name)); + console.log("OK"); } -// main(); -console.log(ProcessorManager); -// console.log(ProcessorManager.getProcessorsForType(Center.name)); -// console.log(ProcessorManager.processors); -console.log("OK"); +main(); // process.on('exit', cleanup); // process.on('SIGINT', cleanup);