Implement pduProcessors

This commit is contained in:
David Majdandžić
2023-03-29 19:56:02 +02:00
parent 7f00cffb40
commit f54ecf0476
10 changed files with 107 additions and 42 deletions

View File

@@ -1,5 +1,6 @@
import {Job} from "../Job/Job"; import {Job} from "../Job/Job";
import Logger from "../Logger"; import Logger from "../Logger";
import {DebugPduProcessor} from "../PDUProcessor/DebugPduProcessor";
import {PduProcessor} from "../PDUProcessor/PduProcessor"; import {PduProcessor} from "../PDUProcessor/PduProcessor";
import {SmppSession} from "../SmppSession"; import {SmppSession} from "../SmppSession";
@@ -90,6 +91,7 @@ export class Center extends SmppSession {
initialize(): void { initialize(): void {
this.server = smpp.createServer({}, this.eventSessionConnected.bind(this)); this.server = smpp.createServer({}, this.eventSessionConnected.bind(this));
this.server.listen(this.port); this.server.listen(this.port);
PduProcessor.attachProcessor(this, PduProcessor.getProcessor(DebugPduProcessor.name));
this.setStatus(0); this.setStatus(0);
} }
@@ -111,6 +113,7 @@ export class Center extends SmppSession {
status: this.status, status: this.status,
defaultSingleJob: this.defaultSingleJob, defaultSingleJob: this.defaultSingleJob,
defaultMultipleJob: this.defaultMultipleJob, defaultMultipleJob: this.defaultMultipleJob,
processors: this.pduProcessors.map(p => p.serialize()),
}; };
} }

View File

@@ -15,6 +15,7 @@ export class CenterSessionManager extends SessionManager {
constructor() { constructor() {
super(); super();
this.setup();
// super.eventEmitter.on(super.SESSION_ADDED_EVENT, (session: SmppSession) => this.eventEmitter.emit(this.SESSION_ADDED_EVENT, session)); // super.eventEmitter.on(super.SESSION_ADDED_EVENT, (session: SmppSession) => this.eventEmitter.emit(this.SESSION_ADDED_EVENT, session));
} }

View File

@@ -1,16 +0,0 @@
import Logger from "../../Logger";
import {CenterPDUProcessor} from "./CenterPDUProcessor";
export class DebugProcessor implements CenterPDUProcessor {
private logger: Logger;
constructor() {
this.logger = new Logger('DebugProcessor');
}
processPdu(session: any, pdu: any): Promise<any> {
return new Promise<any>((resolve, reject) => {
session.send(pdu.response()).then((replyPdu: any) => resolve(replyPdu), (error: any) => reject(error));
});
}
}

View File

@@ -98,6 +98,7 @@ export class Client extends SmppSession {
status: this.status, status: this.status,
defaultSingleJob: this.defaultSingleJob, defaultSingleJob: this.defaultSingleJob,
defaultMultipleJob: this.defaultMultipleJob, defaultMultipleJob: this.defaultMultipleJob,
processors: this.pduProcessors.map(p => p.serialize()),
}; };
} }

View File

@@ -16,6 +16,7 @@ export default class ClientSessionManager extends SessionManager {
constructor() { constructor() {
super(); super();
this.setup();
// super.eventEmitter.on(super.SESSION_ADDED_EVENT, (session: SmppSession) => this.eventEmitter.emit(this.SESSION_ADDED_EVENT, session)); // super.eventEmitter.on(super.SESSION_ADDED_EVENT, (session: SmppSession) => this.eventEmitter.emit(this.SESSION_ADDED_EVENT, session));
} }

View File

@@ -1,6 +1,9 @@
import {Center} from "../Center/Center";
import {PduProcessor} from "./PduProcessor"; import {PduProcessor} from "./PduProcessor";
export class PduDebugProcessor implements PduProcessor { export class DebugPduProcessor extends PduProcessor {
servesSessionType: string = Center.name;
processPdu(session: any, pdu: any, ...args: any[]): Promise<any> { processPdu(session: any, pdu: any, ...args: any[]): Promise<any> {
return new Promise<any>((resolve, reject) => { return new Promise<any>((resolve, reject) => {
session.send(pdu.response(), (replyPdu: any) => { session.send(pdu.response(), (replyPdu: any) => {
@@ -8,8 +11,4 @@ export class PduDebugProcessor implements PduProcessor {
}); });
}) })
} }
serialize(): object {
return {};
}
} }

View File

@@ -1,20 +1,14 @@
import Logger from "../../Logger"; import {Center} from "../Center/Center";
import {CenterPDUProcessor} from "./CenterPDUProcessor"; import {PduProcessor} from "./PduProcessor";
const smpp = require("smpp"); const smpp = require("smpp");
export class DebugProcessor implements CenterPDUProcessor { export class EchoPduProcessor extends PduProcessor {
private logger: Logger; servesSessionType: string = Center.name;
processPdu(session: any, pdu: any, ...args: any[]): Promise<any> {
constructor() {
this.logger = new Logger('DebugProcessor');
}
processPdu(session: any, pdu: any): Promise<any> {
return new Promise<any>((resolve, reject) => { return new Promise<any>((resolve, reject) => {
let promises = []; let promises = [];
let replyPromise = session.send(pdu.response()); let replyPromise = session.send(pdu.response());
let sendPromise = session.send(new smpp.PDU('enquire_link', { let sendPromise = session.send(new smpp.PDU('deliver_sm', {
source_addr: pdu.destination_addr, source_addr: pdu.destination_addr,
destination_addr: pdu.source_addr, destination_addr: pdu.source_addr,
short_message: pdu.short_message short_message: pdu.short_message

View File

@@ -1,5 +1,57 @@
export interface PduProcessor { import Logger from "../Logger";
processPdu(session: any, pdu: any, ...args: any[]): Promise<any>; import {SmppSession} from "../SmppSession";
import {DebugPduProcessor} from "./DebugPduProcessor";
serialize(): object; export abstract class PduProcessor {
static processors: PduProcessor[] = [];
abstract readonly servesSessionType: string;
readonly name: string = this.constructor.name;
readonly logger: Logger = new Logger(`PduProcessor: ${this.name}`);
private static logger: Logger = new Logger("PduProcessor");
static getProcessor(name: string): PduProcessor {
this.logger.log1(`Looking for processor with name ${name}...`);
let pduProcessor = this.processors.find((processor: any) => processor.name === name);
if (pduProcessor) {
this.logger.log1(`Found processor with name ${name}`);
return pduProcessor;
} else {
this.logger.log1(`Processor with name ${name} not found`);
return this.processors[0];
}
}
static attachProcessor(session: SmppSession, processor: PduProcessor): void {
this.logger.log1(`Trying to attach processor ${processor.name} to session ${session.constructor.name}-${session.getId()}`);
if (PduProcessor.areCompatible(session, processor)) {
session.addPduProcessor(processor);
}
}
static detachProcessor(session: SmppSession, processor: PduProcessor): void {
this.logger.log1(`Trying to detach processor ${processor.name} from session ${session.constructor.name}-${session.getId()}`);
session.removePduProcessor(processor);
}
static areCompatible(session: SmppSession, processor: PduProcessor): boolean {
this.logger.log1(`Checking compatibility between session ${session.constructor.name}-${session.getId()} and processor ${processor.name}`);
return session.constructor.name === processor.servesSessionType;
}
static addProcessor(processor: any): void {
PduProcessor.processors.push(new processor());
}
static getProcessorsForType(type: string): any[] {
return this.processors.filter((processor: any) => processor.servesSessionType === type);
}
abstract processPdu(session: any, pdu: any, ...args: any[]): Promise<any>;
serialize(): object {
return {
servesSessionType: this.servesSessionType,
name: this.name
};
}
} }

View File

@@ -136,12 +136,18 @@ export abstract class SmppSession {
} }
addPduProcessor(pduProcessor: PduProcessor): void { addPduProcessor(pduProcessor: PduProcessor): void {
this.pduProcessors.push(pduProcessor); if (this.pduProcessors.indexOf(pduProcessor) === -1) {
this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); this.pduProcessors.push(pduProcessor);
this.logger.log1(`Adding PDU processor: ${pduProcessor.constructor.name}-${this.getId()}, now active: ${this.pduProcessors.length} processors`);
this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize());
} else {
this.logger.log1(`PDU processor: ${pduProcessor.constructor.name}-${this.getId()} already attached to session`);
}
} }
removePduProcessor(pduProcessor: PduProcessor): void { removePduProcessor(pduProcessor: PduProcessor): void {
this.pduProcessors = this.pduProcessors.splice(this.pduProcessors.indexOf(pduProcessor), 1); this.pduProcessors = this.pduProcessors.splice(this.pduProcessors.indexOf(pduProcessor), 1);
this.logger.log1(`Removing PDU processor: ${pduProcessor.constructor.name}-${this.getId()}, now active: ${this.pduProcessors.length} processors`);
this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize()); this.eventEmitter.emit(this.EVENT.STATE_CHANGED, this.serialize());
} }

View File

@@ -1,8 +1,12 @@
import {Center} from "./Center/Center"; import {Center} from "./Center/Center";
import {CenterSessionManager} from "./Center/CenterSessionManager";
import {Client} from "./Client/Client"; import {Client} from "./Client/Client";
import ClientSessionManager from "./Client/ClientSessionManager"; import ClientSessionManager from "./Client/ClientSessionManager";
import {Job} from "./Job/Job";
import Logger from "./Logger"; import Logger from "./Logger";
import {PduDebugProcessor} from "./PDUProcessor/PduDebugProcessor"; import {DebugPduProcessor} from "./PDUProcessor/DebugPduProcessor";
import {EchoPduProcessor} from "./PDUProcessor/EchoPduProcessor";
import {PduProcessor} from "./PDUProcessor/PduProcessor";
const smpp = require("smpp"); const smpp = require("smpp");
const fs = require("fs"); const fs = require("fs");
@@ -24,14 +28,31 @@ const SERVER_PORT: number = Number(process.env.SERVER_PORT) || 8190;
let logger = new Logger("main"); let logger = new Logger("main");
PduProcessor.addProcessor(DebugPduProcessor);
PduProcessor.addProcessor(EchoPduProcessor);
let clientManager: ClientSessionManager = new ClientSessionManager(); let clientManager: ClientSessionManager = new ClientSessionManager();
clientManager.setup(); let centerManager: CenterSessionManager = new CenterSessionManager();
// let wss: WSServer = new WSServer([clientManager]); // let wss: WSServer = new WSServer([clientManager]);
async function main() { async function main() {
let client: Client = await clientManager.createSession("smpp://localhost:7000", "test", "test") as Client; let client: Client = await clientManager.createSession("smpp://localhost:7000", "test", "test") as Client;
// let client: Client = await clientManager.getSession(0) as Client; let center: Center = await centerManager.createSession(7000, "test", "test") as Center;
let debugJobSingle = new Job(new PDU("submit_sm", {
source_addr: "1234567890",
destination_addr: "1234567890",
short_message: "Hello World"
}));
let debugJobMulti = new Job(new PDU("submit_sm", {
source_addr: "1234567890",
destination_addr: "1234567890",
short_message: "Hello World"
}), 10, 50);
client.setDefaultSingleJob(debugJobSingle);
client.setDefaultMultipleJob(debugJobMulti);
// // client.sendMultipleDefault(); // // client.sendMultipleDefault();
// //
@@ -50,7 +71,6 @@ async function main() {
// }, err => console.log(err)); // }, err => console.log(err));
// }); // });
let center: Center = new Center(0, 7000, "test", "test");
// setInterval(() => { // setInterval(() => {
// client.connectAndBind().then(() => { // client.connectAndBind().then(() => {
// console.log("POGGIES"); // console.log("POGGIES");
@@ -65,6 +85,10 @@ async function main() {
// }), 10, 100)); // }), 10, 100));
// // center.close(); // // center.close();
// }, 5000); // }, 5000);
// client.connectAndBind().then(() => {
// client.sendMultipleDefault();
// });
} }
main(); main();