You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

207 lines
5.4 KiB
Go

package main
import (
"fmt"
"time"
"sync"
"log"
"github.com/RPIZeroDuplexPOCSAG/go-pocsagencode"
// "github.com/kgolding/go-pocsagencode"
"github.com/RPIZeroDuplexPOCSAG/rfm69"
"github.com/davecheney/gpio"
"github.com/streadway/amqp"
"github.com/sirius1024/go-amqp-reconnect/rabbitmq"
"github.com/RPIZeroDuplexPOCSAG/pocsag-gateway/settings"
)
var (
conf *settings.App
queueMutex sync.Mutex
)
var messageQueue []*pocsagencode.Message
var lastTransmitBatch int64
func queueMessage(msg *pocsagencode.Message) {
queueMutex.Lock()
lastTransmitBatch = time.Now().Unix()
messageQueue = append(messageQueue, msg)
queueMutex.Unlock()
}
func resetModem() {
pin, err := gpio.OpenPin(29, gpio.ModeOutput)
defer pin.Close()
if err != nil {
panic(err)
}
pin.Set()
time.Sleep(100 * time.Millisecond)
pin.Clear()
time.Sleep(100 * time.Millisecond)
}
func main() {
resetModem()
var (
err error
rfm *rfm69.Device
)
if conf, err = settings.LoadSettings(); err != nil {
fmt.Errorf("Please check your .env File or Environment Vars")
return
}
/*** RABBITMQ ***/
rabbitmq.Debug = true
connection, err := rabbitmq.Dial(conf.AMQPURL)
if err != nil {
panic("could not establish connection with RabbitMQ:" + err.Error())
}
publishCh, err := connection.Channel()
if err != nil {
log.Panic(err)
}
// Only if we allow TX, we process Messages
if conf.TXFreq != 0 {
consumeCh, err := connection.Channel()
if err != nil {
log.Panic(err)
}
if _, err := consumeCh.QueueDeclare("tx_pocsag", true, false, false, false, nil); err != nil {
log.Panic(err)
}
go func() {
d, err := consumeCh.Consume("tx_pocsag", "", false, false, false, false, nil)
if err != nil {
log.Panic(err)
}
for msg := range d {
if msg.Headers["ric"] == nil { continue }
if msg.Headers["function"] == nil { continue }
log.Printf("ric: %d", uint32(msg.Headers["ric"].(int64)))
log.Printf("function %d", byte(msg.Headers["function"].(int64)))
log.Printf("msg: %s", string(msg.Body))
queueMessage(&pocsagencode.Message {
Addr: uint32(msg.Headers["ric"].(int64)),
Function: uint8(msg.Headers["function"].(int64)),
Content: string(msg.Body),
IsNumeric: (msg.Headers["numeric"] != nil),
})
msg.Ack(true)
}
}()
}
/***** RABBITMQ END**/
if rfm, err = rfm69.NewDevice(true); err != nil {
log.Fatal(err)
}
rfm.FreqOffset = conf.FreqOffset
rfm.TXFreq = conf.TXFreq
rfm.TXBaud = conf.TXBaud
rfm.RXFreq = conf.RXFreq
rfm.RXBaud = conf.RXBaud
if err = rfm.SetModeAndWait(rfm69.RF_OPMODE_STANDBY); err != nil {
panic(err)
}
if err = rfm.SetInvert(conf.InvertBits); err != nil {
panic(err)
}
log.Println("Running with following Config:")
log.Println("AMQP Server: ", conf.AMQPURL)
log.Println("Frequency Offset(Correction): ", conf.FreqOffset, "Hz")
log.Println("Transmit Freq: ", conf.TXFreq, "Hz @", conf.TXBaud, "bps")
log.Println("Receive Freq: ", conf.RXFreq, "Hz @", conf.RXBaud, "bps")
if conf.RXFreq != 0 {
err = publishCh.ExchangeDeclare("rx_pocsag", amqp.ExchangeFanout, true, false, false, false, nil)
if err != nil {
log.Panic(err)
}
rfm.OnReceive = func(stream *rfm69.RXStream) {
rssiMeasurementArray := make([]int, 5)
rssiStart := -0
for {
select {
case rssi := <-stream.RSSI:
//fmt.Printf("RSSI:%d\n", rssiStart - rssi)
if (stream.ByteCounter < 20) {
rssiMeasurementArray[int(stream.ByteCounter / 4)] = rssi
rssiStart = 0
for i := 0; i<5; i++ {
rssiStart += rssiMeasurementArray[i]
}
rssiStart = rssiStart / 5
} else {
if rssiStart - rssi > 30 {
stream.Cancel = true
}
if stream.ByteCounter > 1024e2 {
stream.Cancel = true
}
}
break
case <-stream.Process:
data := make([]byte, len(stream.ByteStream))
//log.Println("--PROCESS=", len(stream.ByteStream), " bytes--")
for i:= 0; i < len(stream.ByteStream); i++ {
data[i] = <-stream.ByteStream
//fmt.Printf("%x", data[i])
}
err := publishCh.Publish("rx_pocsag", "", false, false, amqp.Publishing{
ContentType: "application/octet-stream",
Body: data,
Headers: map[string]interface{} {
"rssi": rssiStart,
"len": len(data),
},
})
if err != nil {
log.Panic(err)
}
//fmt.Print("\n")
log.Println("--RX SENT--")
break
}
}
}
rfm.PrepareRX()
}
// Only if we allow TX, we process Messages
if conf.TXFreq != 0 {
go func() {
lastTransmitBatch = time.Now().Unix()
for {
//log.Println(time.Now().Unix(), lastTransmitBatch + 5)
time.Sleep(500 * time.Millisecond)
if len(messageQueue) > 0 && (time.Now().Unix() > lastTransmitBatch + 5) {
lastTransmitBatch = time.Now().Unix()
log.Println("transmitting pocsag batch")
queueMutex.Lock()
burst, _ := pocsagencode.Generate(messageQueue, pocsagencode.OptionMaxLen(6000))
log.Println("Transmitting", len(burst.Bytes()), "bytes")
data := &rfm69.Data{
Data: burst.Bytes(),
}
// Make Pocsag Burst
rfm.Send(data)
messageQueue = make([]*pocsagencode.Message, 0)
queueMutex.Unlock()
}
}
}()
}
rfm.Loop()
log.Println("Done")
}