From 09b07297dd1489ead04a9ed601b38e9fc075d38d Mon Sep 17 00:00:00 2001 From: cheetah Date: Tue, 23 Mar 2021 16:49:16 +0000 Subject: [PATCH] improved structure and message routing, aswell adding a Dummy Connector for testing Duplex --- config.json | 17 +++- index.js | 1 + package.json | 3 +- types/ConnectorRegistry.js | 22 ++++- types/MessageManager.js | 134 +++++++++++++++++++++++----- types/connectors/Connector.js | 6 +- types/connectors/DummyConnector.js | 37 ++++++++ types/connectors/POCSAGConnector.js | 9 +- types/connectors/index.js | 1 + types/devices/BirdySlim.js | 8 +- types/devices/GenericPager.js | 3 - types/devices/Skyper.js | 3 - 12 files changed, 206 insertions(+), 38 deletions(-) create mode 100644 types/connectors/DummyConnector.js diff --git a/config.json b/config.json index e160fb4..11aad62 100644 --- a/config.json +++ b/config.json @@ -6,8 +6,13 @@ "port": 3000 }, "connectors": { + "dummy": { + "enabled": true, + "duplexTimeout": 30 + }, "pocsag": { - "enabled": true + "enabled": true, + "duplexTimeout": 30 }, "lorawan": { "enabled": false @@ -15,5 +20,15 @@ "dapnet": { "enabled": false } + }, + "pagers": { + "birdyslim": { + "enabled": true, + "formatRecvAck": "|5||S||V||G||D||C|>A", + "formatReadAck": "|5||S||V||G||D||C|>B", + "formatOperAck": "|5||S||V||G||D||C|$OP", + "formatStatus": "|S||V||G||D||C|", + "formatSOS": "|S||V||G||D||C|" + } } } \ No newline at end of file diff --git a/index.js b/index.js index f243721..baed2fb 100644 --- a/index.js +++ b/index.js @@ -17,6 +17,7 @@ if (!!config.connectors.lorawan && config.connectors.lorawan.enabled === true) { if (!!config.connectors.dapnet && config.connectors.dapnet.enabled === true) { types.ConnectorRegistry.register(new types.Connectors.DAPNETConnector()) } +types.ConnectorRegistry.register(new types.Connectors.DummyConnector()) types.DeviceRegistry.register(new types.devices.GenericPager()) types.DeviceRegistry.register(new types.devices.BirdySlim()) diff --git a/package.json b/package.json index eeb7422..2c1a455 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,7 @@ "amqp-connection-manager": "^3.2.2", "amqplib": "^0.7.0", "body-parser": "^1.19.0", - "express": "^4.17.1" + "express": "^4.17.1", + "md5": "^2.3.0" } } diff --git a/types/ConnectorRegistry.js b/types/ConnectorRegistry.js index 978fd98..cb9d603 100644 --- a/types/ConnectorRegistry.js +++ b/types/ConnectorRegistry.js @@ -1,20 +1,40 @@ -const { POCSAGConnector } = require("./connectors") +const events = require('events') +const md5 = require('md5') class ConnectorRegistry { constructor() { this.Connectors = {} + this.events = new events.EventEmitter() + this.events.on('ping', (x) => console.log('connector event "ping" from', x)) } register(connector) { this.Connectors[ connector.name ] = connector + connector.Hook(this) } transmit(name, msg, params) { if (!this.Connectors[ name ]) throw "not registred" + //md5(JSON.stringify([name, ...params])), this.Connectors[ name ].transmitMessage(msg, params) } supportDuplex(name) { if (!this.Connectors[ name ]) throw "not registred" return this.Connectors[ name ].duplexCapable } + + reportState(msg, uuid, state) { + this.events.emit(`msg:status`, msg.id, uuid, state) + } + reportFail(msg, uuid) { + this.events.emit(`msg:status:${ msg.id }:failed`) + this.reportState(msg, uuid, 'failed') + } + reportDelivered(msg, uuid) { + this.events.emit(`msg:status:${ msg.id }:delivered`) + this.reportState(msg, uuid, 'delivered') + } + + + //W.I.P receive(id, data) { } } diff --git a/types/MessageManager.js b/types/MessageManager.js index 246873a..c944e78 100644 --- a/types/MessageManager.js +++ b/types/MessageManager.js @@ -1,8 +1,11 @@ const ConnectorRegistry = require("./ConnectorRegistry") +const config = require('../config.json') +const md5 = require('md5') class MessageManager { constructor() { this.messages = {} + ConnectorRegistry.events.on('msg:status', this.msgStatus.bind(this)) } async New(type, routingParams, payload) { if (!routingParams.device) { @@ -15,37 +18,126 @@ class MessageManager { } await require("./DeviceRegistry").Devices[ routingParams.device ].formatTX(msgObj) // console.log('finished msg obj is ', msgObj) - this.messages[ msgObj.id ]._routerData = this.messages[ msgObj.id ]._routerData || { + this.messages[ msgObj.id ]._routerData = this.messages[ msgObj.id ]._routerData || {} + const hwDuplexSupport = require("./DeviceRegistry").Devices[ routingParams.device ].duplex || false + Object.assign(this.messages[ msgObj.id ]._routerData, hwDuplexSupport ? { + duplexCapable: true, recvAck: false, readAck: false, response: false, - } + deliveryLog: {}, + } : { + duplexCapable: false, + deliveryLog: {}, + }) console.log('finished msg obj is ', msgObj) return msgObj.id } + async msgStatus(msgId, uuid, status) { + this.messages[ msgId ]._routerData.deliveryLog[ uuid ] = status + //this.Deliver(msgId) + console.log(msgId, uuid, 'status is', status) + } async Deliver(msgId) { + if (this.messages[ msgId ].type === 'duplex') + this.DeliverDuplex(msgId) + else + this.DeliverOneWay(msgId) + } + _clearEventHandlers4MsgID(msgId) { + ConnectorRegistry.events.removeAllListeners(`msg:status:${ msgId }:delivered`) + ConnectorRegistry.events.removeAllListeners(`msg:status:${ msgId }:failed`) + } + async DeliverDuplex(msgId) { const msg = this.messages[ msgId ] - // Throw error, because wtf if (!msg._routerData) throw `No Routerdata attached to msg with id ${ msgId }` - // attach Start Delivery Unix Timestamp if not already there - if (!msg._routerData.startDelivery) msg._routerData.startDelivery = Math.floor(new Date().valueOf()/1e3) - if (!msg._routerData.deliveryLog) msg._routerData.deliveryLog = [] - let logLength = msg._routerData.deliveryLog.length - if (logLength < msg.routingParams.connectors.length) { - let deliveryConnector = msg.routingParams.connectors[ logLength ] - console.log('delivering with ', deliveryConnector) - const connectorName = deliveryConnector[0] - await ConnectorRegistry.transmit(connectorName, msg, deliveryConnector.slice(1)) - msg._routerData.deliveryLog.push(deliveryConnector) - if (msg._routerData.duplexCapable === true || ConnectorRegistry.supportDuplex(connectorName) === true) { - // we support duplex on connector AND the hardware, lets use it - - } else { // if we dont support duplex on the pager or on the connector, we just keep delivering - await this.Deliver(msgId) + if (!!msg.locked) throw 'message is locked' + + console.log(msg.routingParams.connectors) + let deliveryChain = msg.routingParams.connectors.map((connectorDeliveryTry) => { + const connectorName = connectorDeliveryTry[0], + connectorArgs = connectorDeliveryTry.slice(1), + connectorConfig = config.connectors[connectorName] + //const UUID = md5(JSON.stringify(connectorDeliveryTry)) + const chainPromise = (res) => { + ConnectorRegistry.events.removeAllListeners(`msg:status:${ msgId }:delivered`) + ConnectorRegistry.events.removeAllListeners(`msg:status:${ msgId }:failed`) + setTimeout(() => { + res([false, 'timeout']) + }, !!connectorConfig && !!connectorConfig.duplexTimeout ? connectorConfig.duplexTimeout*1e3 : 30e3) + ConnectorRegistry.events.once(`msg:status:${ msgId }:failed`, () => { + console.log(`${ msgId } failed, continuing`) + res([false, 'failed']) + }) + ConnectorRegistry.events.once(`msg:status:${ msgId }:delivered`, () => { + console.log(`${ msgId } delivered`) + res([true, 'delivered']) + }) + console.log(`Trying to deliver msg#${ msg.id } with ${ JSON.stringify(connectorDeliveryTry) }`) + console.log(this.messages[ msgId ].deliveryLog) + ConnectorRegistry.transmit(connectorName, msg, connectorArgs) } - // msg._routerData.duplexCapable // if PagerHardware supports Duplex - // ConnectorRegistry.supportDuplex(connectorName) // if Connector supports Duplex - } + return chainPromise + }) + new Promise(async (res, rej) => { + for(let deliveryFunction of deliveryChain) { + let result = await new Promise(deliveryFunction) + if (result[0] === true) { res(result); break; } + // TODO: handle case, when a different verification channel is used for ACK + } + rej() + }) + .then(($) => { + this._clearEventHandlers4MsgID(msgId) + console.log('DELIVERY WAS A SUCCESS', $, this.messages[ msgId ]._routerData.deliveryLog) + }) + .catch(($) => { + this._clearEventHandlers4MsgID(msgId) + console.log('DELIVERY WAS A TOTAL FAILURE , FUCK THIS PLANET', $) + }) + return true + } + async DeliverOneWay(msgId) { + const msg = this.messages[ msgId ] + if (!msg._routerData) throw `No Routerdata attached to msg with id ${ msgId }` + if (!!msg.locked) throw 'message is locked' + + //console.log(msg.routingParams.connectors) + let deliveryChain = msg.routingParams.connectors.map((connectorDeliveryTry) => { + const connectorName = connectorDeliveryTry[0], + connectorArgs = connectorDeliveryTry.slice(1), + connectorConfig = config.connectors[connectorName] + const chainPromise = (res, rej) => { + this._clearEventHandlers4MsgID(msgId) + setTimeout(() => { + res([false, 'timeout']) + }, !!connectorConfig && !!connectorConfig.simplexTimeout ? connectorConfig.simplexTimeout*1e3 : 2e3) + ConnectorRegistry.events.once(`msg:status:${ msgId }:failed`, () => { + console.log(`${ msgId } failed, continuing`) + res([false, 'failed']) + }) + console.log(`Trying to deliver msg#${ msg.id } with ${ JSON.stringify(connectorDeliveryTry) }`) + //console.log(this.messages[ msgId ].deliveryLog) + ConnectorRegistry.transmit(connectorName, msg, connectorArgs) + } + return chainPromise + }) + //console.log(deliveryChain) + new Promise(async (res, rej) => { + for(let deliveryFunction of deliveryChain) { + let result = await new Promise(deliveryFunction) + if (result[0] === true) { res(result); break; } + } + rej() + }) + .then(($) => { + this._clearEventHandlers4MsgID(msgId) + console.log('ROUTING WAS A SUCCESS', $, this.messages[ msgId ]._routerData.deliveryLog) + }) + .catch(($) => { + this._clearEventHandlers4MsgID(msgId) + console.log('ROUTING WAS A TOTAL FAILURE , FUCK THIS PLANET', $) + }) } async BindMsg(msg) { this.messages[ msg.id ] = msg diff --git a/types/connectors/Connector.js b/types/connectors/Connector.js index 61d7bdf..b107d75 100644 --- a/types/connectors/Connector.js +++ b/types/connectors/Connector.js @@ -4,7 +4,11 @@ class Connector { this.name = "_base" this.duplexCapable = false } - async transmitMessage(msg, params) { + Hook (connectorRegistry) { + this.connectorRegistry = connectorRegistry + this.connectorRegistry.events.emit('ping', this.name) + } + async transmitMessage(uuid, msg, params) { throw "not implemented" } } diff --git a/types/connectors/DummyConnector.js b/types/connectors/DummyConnector.js new file mode 100644 index 0000000..422cb8e --- /dev/null +++ b/types/connectors/DummyConnector.js @@ -0,0 +1,37 @@ +const Connector = require("./Connector") +const md5 = require('md5') + +class DummyConnector extends Connector { + constructor (amqpConnMngr) { + super(amqpConnMngr) + this.name = "dummy" + this.duplexCapable = true + } + async test(msg, UUID) { + let msgId = msg.payload.substr(0, 5) + await new Promise((res)=>setTimeout(res,3e3)) + // set Routed + this.connectorRegistry.reportState(msg, UUID, 'routed') + await new Promise((res)=>setTimeout(res,6e3)) + // Pager Configuration Response Format "|5||S||V||G||D||C" + let virtGPS = "+001.38207+43.7717105" + let virtSerial = "1337" + let virtRSSI = "22" + let virtDate = "01/01/2024" + let virtBattery = 0x28.toString(16) + this.connectorRegistry.events.emit('response', `${ msgId }|${ virtSerial }|${ virtBattery }|${ virtGPS }|${ virtDate }|${ virtRSSI }`) + // this will happen somewhere else, down the response processing chain + this.connectorRegistry.events.emit(`msg:status:${ msgId }:delivered`) + this.connectorRegistry.reportDelivered(msg, UUID) // cheating lol + } + async transmitMessage(msg, params) { // lets pretend to be a Birdy Slim + const UUID = this.name+':'+md5(JSON.stringify([ this.name, ...params ])) // uuid=name+hash of name+args + if (params[0] == 'fail') { + await new Promise((res)=>setTimeout(res,3e3)) + this.connectorRegistry.reportFail(msg, UUID) + } else { + this.test(msg, UUID) + } + } +} +module.exports = DummyConnector \ No newline at end of file diff --git a/types/connectors/POCSAGConnector.js b/types/connectors/POCSAGConnector.js index 4d64db3..a50bad0 100644 --- a/types/connectors/POCSAGConnector.js +++ b/types/connectors/POCSAGConnector.js @@ -1,4 +1,6 @@ +const { uuid } = require("@supercharge/strings/dist") const Connector = require("./Connector") +const md5 = require('md5') class POCSAGConnector extends Connector { constructor (amqpConnMngr) { @@ -13,6 +15,7 @@ class POCSAGConnector extends Connector { }) } async transmitMessage(msg, params) { + const UUID = this.name+':'+md5(JSON.stringify([this.name,...params])) if (params.length < 1) return false const RIC = params[0] const lastChar = RIC[RIC.length - 1].charCodeAt(0) - 65 @@ -33,9 +36,11 @@ class POCSAGConnector extends Connector { this.channelWrapper.sendToQueue('tx_pocsag', Buffer.from(msg.payload), { headers }) - .then(function() { + .then(() => { + this.connectorRegistry.reportState(msg, UUID, 'routed') return true - }).catch(function(err) { + }).catch((err) => { + this.connectorRegistry.reportFail(msg, UUID) return false }) } diff --git a/types/connectors/index.js b/types/connectors/index.js index 2f6ab3d..eb943f3 100644 --- a/types/connectors/index.js +++ b/types/connectors/index.js @@ -2,5 +2,6 @@ module.exports = { DAPNETConnector: require("./DAPNETConnector"), LoRaWANConnector: require("./LoRaWANConnector"), POCSAGConnector: require("./POCSAGConnector"), + DummyConnector: require("./DummyConnector"), Connector: require("./Connector"), } \ No newline at end of file diff --git a/types/devices/BirdySlim.js b/types/devices/BirdySlim.js index 4690a64..8b0d9bc 100644 --- a/types/devices/BirdySlim.js +++ b/types/devices/BirdySlim.js @@ -1,3 +1,4 @@ +const MessageManager = require("../MessageManager") const PagerDevice = require("./Device") const Str = require('@supercharge/strings') @@ -7,17 +8,14 @@ class BirdySlim extends PagerDevice { super() this.duplex = true this.name = "birdyslim" - } + } RandID() { return `B${ Str.random(4) }` } async formatTX(msg) { msg.id = this.RandID() await MessageManager.BindMsg(msg) - msg.payload = `${ msg.id }${ msg.payload }` - msg._routerData = { - duplexCapable: true, - } + msg.payload = msg.type === 'duplex' ? `${ msg.id }${ msg.payload }` : msg.payload // only if duplex wanted we add the id } } module.exports = BirdySlim \ No newline at end of file diff --git a/types/devices/GenericPager.js b/types/devices/GenericPager.js index ab1ba08..b51f365 100644 --- a/types/devices/GenericPager.js +++ b/types/devices/GenericPager.js @@ -14,9 +14,6 @@ class GenericPager extends PagerDevice { async formatTX(msg) { msg.id = this.RandID() await MessageManager.BindMsg(msg) - msg._routerData = { - duplexCapable: false, - } // return msg } } diff --git a/types/devices/Skyper.js b/types/devices/Skyper.js index 2aeda06..99d4d53 100644 --- a/types/devices/Skyper.js +++ b/types/devices/Skyper.js @@ -17,9 +17,6 @@ class Skyper extends PagerDevice { async formatTX(msg) { msg.id = this.RandID() await MessageManager.BindMsg(msg) - msg._routerData = { - duplexCapable: false, - } if (msg.routingParams.skyperNetwork === true) { let newPayload = msg.payload.substring(0, 3) for(let chr of msg.payload.substring(3).split('')) {