improved structure and message routing, aswell adding a Dummy Connector for testing Duplex

master
cheetah 4 years ago
parent 9e2a0e83f4
commit 09b07297dd

@ -6,8 +6,13 @@
"port": 3000 "port": 3000
}, },
"connectors": { "connectors": {
"dummy": {
"enabled": true,
"duplexTimeout": 30
},
"pocsag": { "pocsag": {
"enabled": true "enabled": true,
"duplexTimeout": 30
}, },
"lorawan": { "lorawan": {
"enabled": false "enabled": false
@ -15,5 +20,15 @@
"dapnet": { "dapnet": {
"enabled": false "enabled": false
} }
},
"pagers": {
"birdyslim": {
"enabled": true,
"formatRecvAck": "|5||S||V||G||D||C|>A",
"formatReadAck": "|5||S||V||G||D||C|>B",
"formatOperAck": "|5||S||V||G||D||C|$OP",
"formatStatus": "|S||V||G||D||C|",
"formatSOS": "|S||V||G||D||C|"
}
} }
} }

@ -17,6 +17,7 @@ if (!!config.connectors.lorawan && config.connectors.lorawan.enabled === true) {
if (!!config.connectors.dapnet && config.connectors.dapnet.enabled === true) { if (!!config.connectors.dapnet && config.connectors.dapnet.enabled === true) {
types.ConnectorRegistry.register(new types.Connectors.DAPNETConnector()) types.ConnectorRegistry.register(new types.Connectors.DAPNETConnector())
} }
types.ConnectorRegistry.register(new types.Connectors.DummyConnector())
types.DeviceRegistry.register(new types.devices.GenericPager()) types.DeviceRegistry.register(new types.devices.GenericPager())
types.DeviceRegistry.register(new types.devices.BirdySlim()) types.DeviceRegistry.register(new types.devices.BirdySlim())

@ -21,6 +21,7 @@
"amqp-connection-manager": "^3.2.2", "amqp-connection-manager": "^3.2.2",
"amqplib": "^0.7.0", "amqplib": "^0.7.0",
"body-parser": "^1.19.0", "body-parser": "^1.19.0",
"express": "^4.17.1" "express": "^4.17.1",
"md5": "^2.3.0"
} }
} }

@ -1,20 +1,40 @@
const { POCSAGConnector } = require("./connectors") const events = require('events')
const md5 = require('md5')
class ConnectorRegistry { class ConnectorRegistry {
constructor() { constructor() {
this.Connectors = {} this.Connectors = {}
this.events = new events.EventEmitter()
this.events.on('ping', (x) => console.log('connector event "ping" from', x))
} }
register(connector) { register(connector) {
this.Connectors[ connector.name ] = connector this.Connectors[ connector.name ] = connector
connector.Hook(this)
} }
transmit(name, msg, params) { transmit(name, msg, params) {
if (!this.Connectors[ name ]) throw "not registred" if (!this.Connectors[ name ]) throw "not registred"
//md5(JSON.stringify([name, ...params])),
this.Connectors[ name ].transmitMessage(msg, params) this.Connectors[ name ].transmitMessage(msg, params)
} }
supportDuplex(name) { supportDuplex(name) {
if (!this.Connectors[ name ]) throw "not registred" if (!this.Connectors[ name ]) throw "not registred"
return this.Connectors[ name ].duplexCapable return this.Connectors[ name ].duplexCapable
} }
reportState(msg, uuid, state) {
this.events.emit(`msg:status`, msg.id, uuid, state)
}
reportFail(msg, uuid) {
this.events.emit(`msg:status:${ msg.id }:failed`)
this.reportState(msg, uuid, 'failed')
}
reportDelivered(msg, uuid) {
this.events.emit(`msg:status:${ msg.id }:delivered`)
this.reportState(msg, uuid, 'delivered')
}
//W.I.P
receive(id, data) { receive(id, data) {
} }
} }

@ -1,8 +1,11 @@
const ConnectorRegistry = require("./ConnectorRegistry") const ConnectorRegistry = require("./ConnectorRegistry")
const config = require('../config.json')
const md5 = require('md5')
class MessageManager { class MessageManager {
constructor() { constructor() {
this.messages = {} this.messages = {}
ConnectorRegistry.events.on('msg:status', this.msgStatus.bind(this))
} }
async New(type, routingParams, payload) { async New(type, routingParams, payload) {
if (!routingParams.device) { if (!routingParams.device) {
@ -15,37 +18,126 @@ class MessageManager {
} }
await require("./DeviceRegistry").Devices[ routingParams.device ].formatTX(msgObj) await require("./DeviceRegistry").Devices[ routingParams.device ].formatTX(msgObj)
// console.log('finished msg obj is ', msgObj) // console.log('finished msg obj is ', msgObj)
this.messages[ msgObj.id ]._routerData = this.messages[ msgObj.id ]._routerData || { this.messages[ msgObj.id ]._routerData = this.messages[ msgObj.id ]._routerData || {}
const hwDuplexSupport = require("./DeviceRegistry").Devices[ routingParams.device ].duplex || false
Object.assign(this.messages[ msgObj.id ]._routerData, hwDuplexSupport ? {
duplexCapable: true,
recvAck: false, recvAck: false,
readAck: false, readAck: false,
response: false, response: false,
} deliveryLog: {},
} : {
duplexCapable: false,
deliveryLog: {},
})
console.log('finished msg obj is ', msgObj) console.log('finished msg obj is ', msgObj)
return msgObj.id return msgObj.id
} }
async msgStatus(msgId, uuid, status) {
this.messages[ msgId ]._routerData.deliveryLog[ uuid ] = status
//this.Deliver(msgId)
console.log(msgId, uuid, 'status is', status)
}
async Deliver(msgId) { async Deliver(msgId) {
if (this.messages[ msgId ].type === 'duplex')
this.DeliverDuplex(msgId)
else
this.DeliverOneWay(msgId)
}
_clearEventHandlers4MsgID(msgId) {
ConnectorRegistry.events.removeAllListeners(`msg:status:${ msgId }:delivered`)
ConnectorRegistry.events.removeAllListeners(`msg:status:${ msgId }:failed`)
}
async DeliverDuplex(msgId) {
const msg = this.messages[ msgId ]
if (!msg._routerData) throw `No Routerdata attached to msg with id ${ msgId }`
if (!!msg.locked) throw 'message is locked'
console.log(msg.routingParams.connectors)
let deliveryChain = msg.routingParams.connectors.map((connectorDeliveryTry) => {
const connectorName = connectorDeliveryTry[0],
connectorArgs = connectorDeliveryTry.slice(1),
connectorConfig = config.connectors[connectorName]
//const UUID = md5(JSON.stringify(connectorDeliveryTry))
const chainPromise = (res) => {
ConnectorRegistry.events.removeAllListeners(`msg:status:${ msgId }:delivered`)
ConnectorRegistry.events.removeAllListeners(`msg:status:${ msgId }:failed`)
setTimeout(() => {
res([false, 'timeout'])
}, !!connectorConfig && !!connectorConfig.duplexTimeout ? connectorConfig.duplexTimeout*1e3 : 30e3)
ConnectorRegistry.events.once(`msg:status:${ msgId }:failed`, () => {
console.log(`${ msgId } failed, continuing`)
res([false, 'failed'])
})
ConnectorRegistry.events.once(`msg:status:${ msgId }:delivered`, () => {
console.log(`${ msgId } delivered`)
res([true, 'delivered'])
})
console.log(`Trying to deliver msg#${ msg.id } with ${ JSON.stringify(connectorDeliveryTry) }`)
console.log(this.messages[ msgId ].deliveryLog)
ConnectorRegistry.transmit(connectorName, msg, connectorArgs)
}
return chainPromise
})
new Promise(async (res, rej) => {
for(let deliveryFunction of deliveryChain) {
let result = await new Promise(deliveryFunction)
if (result[0] === true) { res(result); break; }
// TODO: handle case, when a different verification channel is used for ACK
}
rej()
})
.then(($) => {
this._clearEventHandlers4MsgID(msgId)
console.log('DELIVERY WAS A SUCCESS', $, this.messages[ msgId ]._routerData.deliveryLog)
})
.catch(($) => {
this._clearEventHandlers4MsgID(msgId)
console.log('DELIVERY WAS A TOTAL FAILURE , FUCK THIS PLANET', $)
})
return true
}
async DeliverOneWay(msgId) {
const msg = this.messages[ msgId ] const msg = this.messages[ msgId ]
// Throw error, because wtf
if (!msg._routerData) throw `No Routerdata attached to msg with id ${ msgId }` if (!msg._routerData) throw `No Routerdata attached to msg with id ${ msgId }`
// attach Start Delivery Unix Timestamp if not already there if (!!msg.locked) throw 'message is locked'
if (!msg._routerData.startDelivery) msg._routerData.startDelivery = Math.floor(new Date().valueOf()/1e3)
if (!msg._routerData.deliveryLog) msg._routerData.deliveryLog = []
let logLength = msg._routerData.deliveryLog.length
if (logLength < msg.routingParams.connectors.length) {
let deliveryConnector = msg.routingParams.connectors[ logLength ]
console.log('delivering with ', deliveryConnector)
const connectorName = deliveryConnector[0]
await ConnectorRegistry.transmit(connectorName, msg, deliveryConnector.slice(1))
msg._routerData.deliveryLog.push(deliveryConnector)
if (msg._routerData.duplexCapable === true || ConnectorRegistry.supportDuplex(connectorName) === true) {
// we support duplex on connector AND the hardware, lets use it
} else { // if we dont support duplex on the pager or on the connector, we just keep delivering //console.log(msg.routingParams.connectors)
await this.Deliver(msgId) let deliveryChain = msg.routingParams.connectors.map((connectorDeliveryTry) => {
const connectorName = connectorDeliveryTry[0],
connectorArgs = connectorDeliveryTry.slice(1),
connectorConfig = config.connectors[connectorName]
const chainPromise = (res, rej) => {
this._clearEventHandlers4MsgID(msgId)
setTimeout(() => {
res([false, 'timeout'])
}, !!connectorConfig && !!connectorConfig.simplexTimeout ? connectorConfig.simplexTimeout*1e3 : 2e3)
ConnectorRegistry.events.once(`msg:status:${ msgId }:failed`, () => {
console.log(`${ msgId } failed, continuing`)
res([false, 'failed'])
})
console.log(`Trying to deliver msg#${ msg.id } with ${ JSON.stringify(connectorDeliveryTry) }`)
//console.log(this.messages[ msgId ].deliveryLog)
ConnectorRegistry.transmit(connectorName, msg, connectorArgs)
} }
// msg._routerData.duplexCapable // if PagerHardware supports Duplex return chainPromise
// ConnectorRegistry.supportDuplex(connectorName) // if Connector supports Duplex })
//console.log(deliveryChain)
new Promise(async (res, rej) => {
for(let deliveryFunction of deliveryChain) {
let result = await new Promise(deliveryFunction)
if (result[0] === true) { res(result); break; }
} }
rej()
})
.then(($) => {
this._clearEventHandlers4MsgID(msgId)
console.log('ROUTING WAS A SUCCESS', $, this.messages[ msgId ]._routerData.deliveryLog)
})
.catch(($) => {
this._clearEventHandlers4MsgID(msgId)
console.log('ROUTING WAS A TOTAL FAILURE , FUCK THIS PLANET', $)
})
} }
async BindMsg(msg) { async BindMsg(msg) {
this.messages[ msg.id ] = msg this.messages[ msg.id ] = msg

@ -4,7 +4,11 @@ class Connector {
this.name = "_base" this.name = "_base"
this.duplexCapable = false this.duplexCapable = false
} }
async transmitMessage(msg, params) { Hook (connectorRegistry) {
this.connectorRegistry = connectorRegistry
this.connectorRegistry.events.emit('ping', this.name)
}
async transmitMessage(uuid, msg, params) {
throw "not implemented" throw "not implemented"
} }
} }

@ -0,0 +1,37 @@
const Connector = require("./Connector")
const md5 = require('md5')
class DummyConnector extends Connector {
constructor (amqpConnMngr) {
super(amqpConnMngr)
this.name = "dummy"
this.duplexCapable = true
}
async test(msg, UUID) {
let msgId = msg.payload.substr(0, 5)
await new Promise((res)=>setTimeout(res,3e3))
// set Routed
this.connectorRegistry.reportState(msg, UUID, 'routed')
await new Promise((res)=>setTimeout(res,6e3))
// Pager Configuration Response Format "|5||S||V||G||D||C"
let virtGPS = "+001.38207+43.7717105"
let virtSerial = "1337"
let virtRSSI = "22"
let virtDate = "01/01/2024"
let virtBattery = 0x28.toString(16)
this.connectorRegistry.events.emit('response', `${ msgId }|${ virtSerial }|${ virtBattery }|${ virtGPS }|${ virtDate }|${ virtRSSI }`)
// this will happen somewhere else, down the response processing chain
this.connectorRegistry.events.emit(`msg:status:${ msgId }:delivered`)
this.connectorRegistry.reportDelivered(msg, UUID) // cheating lol
}
async transmitMessage(msg, params) { // lets pretend to be a Birdy Slim
const UUID = this.name+':'+md5(JSON.stringify([ this.name, ...params ])) // uuid=name+hash of name+args
if (params[0] == 'fail') {
await new Promise((res)=>setTimeout(res,3e3))
this.connectorRegistry.reportFail(msg, UUID)
} else {
this.test(msg, UUID)
}
}
}
module.exports = DummyConnector

@ -1,4 +1,6 @@
const { uuid } = require("@supercharge/strings/dist")
const Connector = require("./Connector") const Connector = require("./Connector")
const md5 = require('md5')
class POCSAGConnector extends Connector { class POCSAGConnector extends Connector {
constructor (amqpConnMngr) { constructor (amqpConnMngr) {
@ -13,6 +15,7 @@ class POCSAGConnector extends Connector {
}) })
} }
async transmitMessage(msg, params) { async transmitMessage(msg, params) {
const UUID = this.name+':'+md5(JSON.stringify([this.name,...params]))
if (params.length < 1) return false if (params.length < 1) return false
const RIC = params[0] const RIC = params[0]
const lastChar = RIC[RIC.length - 1].charCodeAt(0) - 65 const lastChar = RIC[RIC.length - 1].charCodeAt(0) - 65
@ -33,9 +36,11 @@ class POCSAGConnector extends Connector {
this.channelWrapper.sendToQueue('tx_pocsag', Buffer.from(msg.payload), { this.channelWrapper.sendToQueue('tx_pocsag', Buffer.from(msg.payload), {
headers headers
}) })
.then(function() { .then(() => {
this.connectorRegistry.reportState(msg, UUID, 'routed')
return true return true
}).catch(function(err) { }).catch((err) => {
this.connectorRegistry.reportFail(msg, UUID)
return false return false
}) })
} }

@ -2,5 +2,6 @@ module.exports = {
DAPNETConnector: require("./DAPNETConnector"), DAPNETConnector: require("./DAPNETConnector"),
LoRaWANConnector: require("./LoRaWANConnector"), LoRaWANConnector: require("./LoRaWANConnector"),
POCSAGConnector: require("./POCSAGConnector"), POCSAGConnector: require("./POCSAGConnector"),
DummyConnector: require("./DummyConnector"),
Connector: require("./Connector"), Connector: require("./Connector"),
} }

@ -1,3 +1,4 @@
const MessageManager = require("../MessageManager")
const PagerDevice = require("./Device") const PagerDevice = require("./Device")
const Str = require('@supercharge/strings') const Str = require('@supercharge/strings')
@ -14,10 +15,7 @@ class BirdySlim extends PagerDevice {
async formatTX(msg) { async formatTX(msg) {
msg.id = this.RandID() msg.id = this.RandID()
await MessageManager.BindMsg(msg) await MessageManager.BindMsg(msg)
msg.payload = `${ msg.id }${ msg.payload }` msg.payload = msg.type === 'duplex' ? `${ msg.id }${ msg.payload }` : msg.payload // only if duplex wanted we add the id
msg._routerData = {
duplexCapable: true,
}
} }
} }
module.exports = BirdySlim module.exports = BirdySlim

@ -14,9 +14,6 @@ class GenericPager extends PagerDevice {
async formatTX(msg) { async formatTX(msg) {
msg.id = this.RandID() msg.id = this.RandID()
await MessageManager.BindMsg(msg) await MessageManager.BindMsg(msg)
msg._routerData = {
duplexCapable: false,
}
// return msg // return msg
} }
} }

@ -17,9 +17,6 @@ class Skyper extends PagerDevice {
async formatTX(msg) { async formatTX(msg) {
msg.id = this.RandID() msg.id = this.RandID()
await MessageManager.BindMsg(msg) await MessageManager.BindMsg(msg)
msg._routerData = {
duplexCapable: false,
}
if (msg.routingParams.skyperNetwork === true) { if (msg.routingParams.skyperNetwork === true) {
let newPayload = msg.payload.substring(0, 3) let newPayload = msg.payload.substring(0, 3)
for(let chr of msg.payload.substring(3).split('')) { for(let chr of msg.payload.substring(3).split('')) {

Loading…
Cancel
Save