added http delivery api
made delivery process easier and optimized it
This commit is contained in:
parent
87f209a62c
commit
733819ad4e
3 changed files with 28 additions and 18 deletions
6
index.js
6
index.js
|
@ -37,8 +37,14 @@ app.post('/api/message/advanced', async (req, res) => {
|
||||||
await types.MessageManager.Deliver(id)
|
await types.MessageManager.Deliver(id)
|
||||||
return res.json(id)
|
return res.json(id)
|
||||||
})
|
})
|
||||||
|
|
||||||
app.get('/api/message/status/:id', async (req, res) => { //TODO: make this fancy
|
app.get('/api/message/status/:id', async (req, res) => { //TODO: make this fancy
|
||||||
return res.json(types.MessageManager.messages[ req.params.id ])
|
return res.json(types.MessageManager.messages[ req.params.id ])
|
||||||
})
|
})
|
||||||
|
|
||||||
|
app.get('/api/message/ack/recv/:id', async (req, res) => { //TODO: make this fancy
|
||||||
|
types.ConnectorRegistry.reportDelivered({ id: req.params.id }, 'http')
|
||||||
|
return res.json(true)
|
||||||
|
})
|
||||||
|
|
||||||
app.listen(3000)
|
app.listen(3000)
|
|
@ -29,7 +29,7 @@ class ConnectorRegistry {
|
||||||
this.reportState(msg, uuid, 'failed')
|
this.reportState(msg, uuid, 'failed')
|
||||||
}
|
}
|
||||||
reportDelivered(msg, uuid) {
|
reportDelivered(msg, uuid) {
|
||||||
this.events.emit(`msg:status:${ msg.id }:delivered`)
|
this.events.emit(`msg:status:${ msg.id }:delivered`, msg, uuid)
|
||||||
this.reportState(msg, uuid, 'delivered')
|
this.reportState(msg, uuid, 'delivered')
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,9 +23,9 @@ class MessageManager {
|
||||||
const hwDuplexSupport = require("./DeviceRegistry").Devices[ routingParams.device ].duplex || false
|
const hwDuplexSupport = require("./DeviceRegistry").Devices[ routingParams.device ].duplex || false
|
||||||
Object.assign(this.messages[ msgObj.id ]._routerData, hwDuplexSupport ? {
|
Object.assign(this.messages[ msgObj.id ]._routerData, hwDuplexSupport ? {
|
||||||
duplexCapable: true,
|
duplexCapable: true,
|
||||||
recvAck: false,
|
recvAck: false, // aka "delivered"
|
||||||
readAck: false,
|
readAck: false, // "read"
|
||||||
response: false,
|
response: false, // "resp"
|
||||||
deliveryLog: {},
|
deliveryLog: {},
|
||||||
} : {
|
} : {
|
||||||
duplexCapable: false,
|
duplexCapable: false,
|
||||||
|
@ -40,6 +40,7 @@ class MessageManager {
|
||||||
}
|
}
|
||||||
async msgStatus(msgId, uuid, status) {
|
async msgStatus(msgId, uuid, status) {
|
||||||
this.messages[ msgId ]._routerData.deliveryLog[ uuid ] = status
|
this.messages[ msgId ]._routerData.deliveryLog[ uuid ] = status
|
||||||
|
if (status === 'delivered') this.messages[ msgId ]._routerData.recvAck = true
|
||||||
//this.Deliver(msgId)
|
//this.Deliver(msgId)
|
||||||
console.log(msgId, uuid, 'status is', status)
|
console.log(msgId, uuid, 'status is', status)
|
||||||
}
|
}
|
||||||
|
@ -65,7 +66,6 @@ class MessageManager {
|
||||||
connectorConfig = config.connectors[connectorName]
|
connectorConfig = config.connectors[connectorName]
|
||||||
//const UUID = md5(JSON.stringify(connectorDeliveryTry))
|
//const UUID = md5(JSON.stringify(connectorDeliveryTry))
|
||||||
const chainPromise = (res) => {
|
const chainPromise = (res) => {
|
||||||
ConnectorRegistry.events.removeAllListeners(`msg:status:${ msgId }:delivered`)
|
|
||||||
ConnectorRegistry.events.removeAllListeners(`msg:status:${ msgId }:failed`)
|
ConnectorRegistry.events.removeAllListeners(`msg:status:${ msgId }:failed`)
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
res([false, 'timeout'])
|
res([false, 'timeout'])
|
||||||
|
@ -74,24 +74,28 @@ class MessageManager {
|
||||||
console.log(`${ msgId } failed via ${ connectorName }:${ connectorArgs.join(',') }, continuing...`)
|
console.log(`${ msgId } failed via ${ connectorName }:${ connectorArgs.join(',') }, continuing...`)
|
||||||
res([false, 'failed'])
|
res([false, 'failed'])
|
||||||
})
|
})
|
||||||
ConnectorRegistry.events.once(`msg:status:${ msgId }:delivered`, () => {
|
|
||||||
console.log(`${ msgId } delivered via ${ connectorName }:${ connectorArgs.join(',') }`)
|
|
||||||
res([true, 'delivered'])
|
|
||||||
})
|
|
||||||
console.log(`Trying to deliver msg#${ msg.id } with ${ connectorName }:${ connectorArgs.join(',') }`)
|
console.log(`Trying to deliver msg#${ msg.id } with ${ connectorName }:${ connectorArgs.join(',') }`)
|
||||||
//console.log(this.messages[ msgId ].deliveryLog)
|
|
||||||
ConnectorRegistry.transmit(connectorName, msg, connectorArgs)
|
ConnectorRegistry.transmit(connectorName, msg, connectorArgs)
|
||||||
}
|
}
|
||||||
return chainPromise
|
return chainPromise
|
||||||
})
|
})
|
||||||
new Promise(async (res, rej) => {
|
Promise.race([
|
||||||
for(let deliveryFunction of deliveryChain) {
|
new Promise(res => { // Delivery Event for this message
|
||||||
let result = await new Promise(deliveryFunction)
|
ConnectorRegistry.events.once(`msg:status:${ msgId }:delivered`, (_, uuid) => {
|
||||||
if (result[0] === true) { res(result); break; }
|
console.log(`${ msgId } delivered via ${ uuid }`)
|
||||||
// TODO: handle case, when a different verification channel is used for ACK
|
return res()
|
||||||
}
|
})
|
||||||
rej()
|
}),
|
||||||
})
|
new Promise(async (res, rej) => {
|
||||||
|
for(let deliveryFunction of deliveryChain) {
|
||||||
|
//when a different verification channel is used for ACK
|
||||||
|
if (this.messages[ msgId ]._routerData.recvAck === true) break;
|
||||||
|
let result = await new Promise(deliveryFunction)
|
||||||
|
if (result[0] === true) { res(); break; }
|
||||||
|
}
|
||||||
|
rej()
|
||||||
|
})
|
||||||
|
])
|
||||||
.then(($) => {
|
.then(($) => {
|
||||||
this._clearEventHandlers4MsgID(msgId)
|
this._clearEventHandlers4MsgID(msgId)
|
||||||
const dLog = this.messages[ msgId ]._routerData.deliveryLog
|
const dLog = this.messages[ msgId ]._routerData.deliveryLog
|
||||||
|
|
Loading…
Add table
Reference in a new issue