Split ANY PDU evnt into RX and TX for receive and transmit

This commit is contained in:
David Majdandžić
2023-04-06 18:17:43 +02:00
parent 251658f385
commit 363e3f7fc2
9 changed files with 27 additions and 15 deletions

View File

@@ -84,7 +84,7 @@ export default class Center extends SmppSession {
let pduCopy = new smpp.PDU(pdu.command, {...pdu}); let pduCopy = new smpp.PDU(pdu.command, {...pdu});
let session = this.getNextSession(); let session = this.getNextSession();
this.processors.Preprocessor.forEach((processor: PduProcessor) => processor.processPdu(session, pduCopy)); this.processors.Preprocessor.forEach((processor: PduProcessor) => processor.processPdu(session, pduCopy));
session.send(pduCopy, (replyPdu: any) => { this.doSendPdu(pduCopy, session).then((replyPdu: any) => {
resolve(replyPdu); resolve(replyPdu);
}); });
}); });

View File

@@ -145,7 +145,9 @@ export default class Client extends SmppSession {
let pduCopy = new smpp.PDU(pdu.command, {...pdu}); let pduCopy = new smpp.PDU(pdu.command, {...pdu});
this.processors.Preprocessor.forEach((processor: PduProcessor) => processor.processPdu(this.session, pduCopy)); this.processors.Preprocessor.forEach((processor: PduProcessor) => processor.processPdu(this.session, pduCopy));
this.logger.log5(`Client-${this.id} sending PDU: ${JSON.stringify(pduCopy)}`); this.logger.log5(`Client-${this.id} sending PDU: ${JSON.stringify(pduCopy)}`);
this.session.send(pduCopy, (replyPdu: object) => resolve(replyPdu)); this.doSendPdu(pduCopy, this.session).then((replyPdu: any) => {
resolve(replyPdu);
});
}); });
} }

View File

@@ -20,7 +20,7 @@ export default class BindTranscieverReplyProcessor extends Postprocessor {
if (pdu.system_id === entity?.username && pdu.password === entity?.password) { if (pdu.system_id === entity?.username && pdu.password === entity?.password) {
this.logger.log1(`Center-${entity?.id} client connection successful`); this.logger.log1(`Center-${entity?.id} client connection successful`);
if (pdu.response) { if (pdu.response) {
session.send(pdu.response()); entity?.doSendPdu(pdu.response(), session);
} }
session.resume(); session.resume();
// @ts-ignore // @ts-ignore
@@ -30,9 +30,9 @@ export default class BindTranscieverReplyProcessor extends Postprocessor {
} else { } else {
this.logger.log1(`Center-${entity?.id} client connection failed, invalid credentials (expected: ${entity?.username}, ${entity?.password})`); this.logger.log1(`Center-${entity?.id} client connection failed, invalid credentials (expected: ${entity?.username}, ${entity?.password})`);
if (pdu.response) { if (pdu.response) {
session.send(pdu.response({ entity?.doSendPdu(pdu.response({
command_status: smpp.ESME_RBINDFAIL command_status: smpp.ESME_RBINDFAIL
})); }), session);
} }
// @ts-ignore // @ts-ignore
entity?.pendingSessions = entity?.pendingSessions.filter((s) => s !== session); entity?.pendingSessions = entity?.pendingSessions.filter((s) => s !== session);

View File

@@ -32,7 +32,7 @@ export default class DeliveryReceiptProcessor extends Postprocessor {
short_message: drMessage, short_message: drMessage,
esm_class: 4, esm_class: 4,
}); });
session.send(DRPdu); entity?.doSendPdu(DRPdu, session);
resolve(pdu); resolve(pdu);
} }

View File

@@ -16,7 +16,7 @@ export default class EchoPduProcessor extends Postprocessor {
destination_addr: pdu.source_addr, destination_addr: pdu.source_addr,
short_message: pdu.short_message short_message: pdu.short_message
}); });
session.send(sentPdu); entity?.doSendPdu(sentPdu, session);
resolve(sentPdu); resolve(sentPdu);
} }
}); });

View File

@@ -9,7 +9,7 @@ export default class EnquireLinkReplyProcessor extends Postprocessor {
processPdu(session: any, pdu: any, entity?: SmppSession | undefined): Promise<any> { processPdu(session: any, pdu: any, entity?: SmppSession | undefined): Promise<any> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (!!pdu.command && pdu.command === 'enquire_link') { if (!!pdu.command && pdu.command === 'enquire_link') {
session.send(pdu.response()); entity?.doSendPdu(pdu.response(), session);
resolve(pdu); resolve(pdu);
} }
}); });

View File

@@ -16,7 +16,7 @@ export default class SubmitSmReplyProcessor extends Postprocessor {
let response = pdu.response(); let response = pdu.response();
response.message_id = this.messageIdIterator++; response.message_id = this.messageIdIterator++;
MessageIdManager.addMessageId(pdu, response.message_id); MessageIdManager.addMessageId(pdu, response.message_id);
session.send(response); entity?.doSendPdu(response, session);
resolve(pdu); resolve(pdu);
} }
}); });

View File

@@ -9,7 +9,7 @@ export default class DeliverSmReplyProcessor extends Postprocessor {
processPdu(session: any, pdu: any, entity?: SmppSession | undefined): Promise<any> { processPdu(session: any, pdu: any, entity?: SmppSession | undefined): Promise<any> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (!!pdu.command && pdu.command === 'deliver_sm') { if (!!pdu.command && pdu.command === 'deliver_sm') {
session.send(pdu.response()); entity?.doSendPdu(pdu.response(), session);
resolve(pdu); resolve(pdu);
} }
}); });

View File

@@ -13,7 +13,8 @@ export default abstract class SmppSession {
readonly EVENT: any = { readonly EVENT: any = {
STATUS_CHANGED: "STATUS_CHANGED", STATUS_CHANGED: "STATUS_CHANGED",
STATE_CHANGED: "STATE_CHANGED", STATE_CHANGED: "STATE_CHANGED",
ANY_PDU: "ANY_PDU", ANY_PDU_TX: "ANY_PDU_TX",
ANY_PDU_RX: "ANY_PDU_RX",
MESSAGE_SEND_COUNTER_UPDATE_EVENT: "MESSAGE_SEND_COUNTER_UPDATE_EVENT", MESSAGE_SEND_COUNTER_UPDATE_EVENT: "MESSAGE_SEND_COUNTER_UPDATE_EVENT",
}; };
abstract STATUSES: string[]; abstract STATUSES: string[];
@@ -21,7 +22,7 @@ export default abstract class SmppSession {
processors: { [key: string]: PduProcessor[] } = {}; processors: { [key: string]: PduProcessor[] } = {};
readonly UPDATE_WS: string = "UPDATE_WS"; readonly UPDATE_WS: string = "UPDATE_WS";
readonly eventEmitter: EventEmitter = new EventEmitter(); readonly eventEmitter: EventEmitter = new EventEmitter();
readonly logger: Logger = new Logger(`SmppSession`); readonly logger: Logger = new Logger(this.constructor.name);
readonly sendTimer: any = new NanoTimer(); readonly sendTimer: any = new NanoTimer();
readonly counterUpdateTimer: any = new NanoTimer(); readonly counterUpdateTimer: any = new NanoTimer();
readonly MESSAGE_SEND_UPDATE_DELAY: number = Number(process.env.MESSAGE_SEND_UPDATE_DELAY) || 500; readonly MESSAGE_SEND_UPDATE_DELAY: number = Number(process.env.MESSAGE_SEND_UPDATE_DELAY) || 500;
@@ -29,7 +30,8 @@ export default abstract class SmppSession {
protected constructor() { protected constructor() {
this.eventEmitter.on(this.EVENT.STATE_CHANGED, () => this.updateWs(this.EVENT.STATE_CHANGED)); this.eventEmitter.on(this.EVENT.STATE_CHANGED, () => this.updateWs(this.EVENT.STATE_CHANGED));
this.eventEmitter.on(this.EVENT.STATUS_CHANGED, () => this.updateWs(this.EVENT.STATUS_CHANGED)); this.eventEmitter.on(this.EVENT.STATUS_CHANGED, () => this.updateWs(this.EVENT.STATUS_CHANGED));
this.eventEmitter.on(this.EVENT.ANY_PDU, (pdu: any) => this.updateWs(this.EVENT.ANY_PDU, [pdu])); this.eventEmitter.on(this.EVENT.ANY_PDU_TX, (pdu: any) => this.updateWs(this.EVENT.ANY_PDU_TX, [pdu]));
this.eventEmitter.on(this.EVENT.ANY_PDU_RX, (pdu: any) => this.updateWs(this.EVENT.ANY_PDU_RX, [pdu]));
this.eventEmitter.on(this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT, (count: number) => this.updateWs(this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT, [count])); this.eventEmitter.on(this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT, (count: number) => this.updateWs(this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT, [count]));
this.processors[Preprocessor.name] = []; this.processors[Preprocessor.name] = [];
@@ -110,6 +112,13 @@ export default abstract class SmppSession {
abstract sendPdu(pdu: object, force?: boolean): Promise<object>; abstract sendPdu(pdu: object, force?: boolean): Promise<object>;
doSendPdu(pdu: PDU, session: any): Promise<any> {
return new Promise<any>((resolve, reject) => {
this.eventEmitter.emit(this.EVENT.ANY_PDU_TX, pdu);
session.send(pdu, (reply: any) => resolve(reply));
})
}
sendSingle(job: Job): Promise<object> { sendSingle(job: Job): Promise<object> {
return this.sendPdu(job.pdu); return this.sendPdu(job.pdu);
} }
@@ -153,7 +162,8 @@ export default abstract class SmppSession {
case this.EVENT.STATUS_CHANGED: case this.EVENT.STATUS_CHANGED:
message.data = this.status; message.data = this.status;
break; break;
case this.EVENT.ANY_PDU: case this.EVENT.ANY_PDU_RX:
case this.EVENT.ANY_PDU_TX:
message.data = args![0]; message.data = args![0];
break; break;
case this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT: case this.EVENT.MESSAGE_SEND_COUNTER_UPDATE_EVENT:
@@ -185,7 +195,7 @@ export default abstract class SmppSession {
eventAnyPdu(session: any, pdu: PDU): Promise<any> { eventAnyPdu(session: any, pdu: PDU): Promise<any> {
if (!!pdu) { if (!!pdu) {
this.eventEmitter.emit(this.EVENT.ANY_PDU, pdu); this.eventEmitter.emit(this.EVENT.ANY_PDU_RX, pdu);
// console.log("IS PDU TIME"); // console.log("IS PDU TIME");
this.logger.log6(pdu); this.logger.log6(pdu);
this.processors.Postprocessor.forEach((processor: PduProcessor) => processor.processPdu(session, pdu, this)); this.processors.Postprocessor.forEach((processor: PduProcessor) => processor.processPdu(session, pdu, this));