diff --git a/README.md b/README.md index d3adb7e..0b26b42 100644 --- a/README.md +++ b/README.md @@ -1 +1,3 @@ # pocsag-gateway + +This WIP Software aims to provide an Interface for transmitting Messages sent over AMQP via an RFM69 Transceiver. \ No newline at end of file diff --git a/main.go b/main.go index 0f1dc3b..c1fcf0d 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,7 @@ package main import ( "fmt" "time" - //"sync" + "sync" "log" "github.com/kgolding/go-pocsagencode" "github.com/RPIZeroDuplexPOCSAG/rfm69" @@ -17,11 +17,16 @@ import ( ) var ( conf *settings.App + queueMutex sync.Mutex ) -var messageQueue []*pocsagencode.Message +var messageQueue []*pocsagencode.Message +var lastTransmitBatch int64 func queueMessage(msg *pocsagencode.Message) { + queueMutex.Lock() + lastTransmitBatch = time.Now().Unix() messageQueue = append(messageQueue, msg) + queueMutex.Unlock() } func resetModem() { @@ -61,6 +66,9 @@ func main() { if err != nil { log.Panic(err) } + if _, err := consumeCh.QueueDeclare("tx_pocsag", true, false, false, false, nil); err != nil { + log.Panic(err) + } go func() { d, err := consumeCh.Consume("tx_pocsag", "", false, false, false, false, nil) if err != nil { @@ -69,10 +77,10 @@ func main() { for msg := range d { if msg.Headers["ric"] == nil { continue } - log.Printf("ric: %s", string(msg.Headers["ric"].(string))) + log.Printf("ric: %s", string(msg.Headers["ric"].(int64))) log.Printf("msg: %s", string(msg.Body)) queueMessage(&pocsagencode.Message { - Addr: uint32(msg.Headers["ric"].(int)), + Addr: uint32(msg.Headers["ric"].(int64)), Content: string(msg.Body), IsNumeric: (msg.Headers["numeric"] != nil), }) @@ -80,10 +88,6 @@ func main() { } }() } - err = publishCh.ExchangeDeclare("rx", amqp.ExchangeFanout, true, false, false, false, nil) - if err != nil { - log.Panic(err) - } /***** RABBITMQ END**/ if rfm, err = rfm69.NewDevice(true); err != nil { @@ -110,30 +114,12 @@ func main() { log.Println("Frequency Offset(Correction): ", conf.FreqOffset, "Hz") log.Println("Transmit Freq: ", conf.TXFreq, "Hz @", conf.TXBaud, "bps") log.Println("Receive Freq: ", conf.RXFreq, "Hz @", conf.RXBaud, "bps") - /* - messages := []*pocsagencode.Message{ - &pocsagencode.Message{133701, "Hello 1234567890!", false}, - //&pocsagencode.Message{133702, "Hello d2efa947-7618-440c-8f79-fab32762af8ed2bb9c62-007e-4b2c-93d5-3124a247032eefe71db4-ef8d-46fb-9cf8-dac70db000bc12067966-da61-447c-a9ce-c0c24be17df5 Pager!", false}, - //&pocsagencode.Message{133703, "Hello c41554ca-7372-4975-b0c3-3e2eaccc1e8b Pager!", false}, - //&pocsagencode.Message{133704, "Hello Pager!", false}, - } - log.Println("Sending", len(messages), "messages") - - var burst pocsagencode.Burst - for len(messages) > 220 { - burst, messages = pocsagencode.Generate(messages, pocsagencode.OptionMaxLen(6000)) - // Options can be set as below for MaxLen and PreambleBits - // burst, messages = pocsagencode.Generate(messages, pocsagencode.OptionPreambleBits(250)) - //log.Println("% X\n\n", burst.Bytes()) - log.Println(len(burst.Bytes())) - data := &rfm69.Data{ - Data: burst.Bytes(), - } - rfm.Send(data) - } - */ if conf.RXFreq != 0 { + err = publishCh.ExchangeDeclare("rx_pocsag", amqp.ExchangeFanout, true, false, false, false, nil) + if err != nil { + log.Panic(err) + } rfm.OnReceive = func(stream *rfm69.RXStream) { rssiMeasurementArray := make([]int, 5) rssiStart := -0 @@ -164,7 +150,7 @@ func main() { data[i] = <-stream.ByteStream //fmt.Printf("%x", data[i]) } - err := publishCh.Publish("rx", "", false, false, amqp.Publishing{ + err := publishCh.Publish("rx_pocsag", "", false, false, amqp.Publishing{ ContentType: "application/octet-stream", Body: data, Headers: map[string]interface{} { @@ -184,6 +170,31 @@ func main() { rfm.PrepareRX() } + // Only if we allow TX, we process Messages + if conf.TXFreq != 0 { + go func() { + lastTransmitBatch = time.Now().Unix() + for { + log.Println(time.Now().Unix(), lastTransmitBatch + 5) + + time.Sleep(500 * time.Millisecond) + if len(messageQueue) > 0 && (time.Now().Unix() > lastTransmitBatch + 5) { + lastTransmitBatch = time.Now().Unix() + log.Println("transmitting pocsag batch") + queueMutex.Lock() + burst, _ := pocsagencode.Generate(messageQueue, pocsagencode.OptionMaxLen(6000)) + log.Println("Transmitting", len(burst.Bytes()), "bytes") + data := &rfm69.Data{ + Data: burst.Bytes(), + } + rfm.Send(data) + messageQueue = make([]*pocsagencode.Message, 0) + queueMutex.Unlock() + // Make Pocsag Burst + } + } + }() + } rfm.Loop() log.Println("Done") }