You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-zello-client/client/client.go

518 lines
12 KiB
Go

package client
import (
"encoding/base64"
"encoding/json"
"fmt"
"log"
"sync"
"time"
"github.com/gorilla/websocket"
)
type ZelloClient struct {
Connection *websocket.Conn
AuthSuccess bool
JoinedChannels []string
PlatformType string
PlatformName string
URL string
Username string
Password string
DefaultTimeout time.Duration
SeqNum int64
seqNumLock sync.Mutex
txQueueLock sync.Mutex
// Event Map for responses
SeqResponseChans map[int64]chan map[string]interface{}
// Event Chans
GeneralEventChan chan ZelloResponsePacked
BinaryDataChan chan []byte
ErrorEventChan chan error
}
func NewZelloClient() (zelloClient *ZelloClient) {
return &ZelloClient{
seqNumLock: sync.Mutex{},
txQueueLock: sync.Mutex{},
DefaultTimeout: time.Millisecond * 1e3,
SeqResponseChans: make(map[int64]chan map[string]interface{}),
GeneralEventChan: make(chan ZelloResponsePacked),
ErrorEventChan: make(chan error),
BinaryDataChan: make(chan []byte),
}
}
func (zc *ZelloClient) Connect(url string) (err error) {
zc.URL = url
zc.Connection, _, err = websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return err
}
fmt.Printf("connected to %s\n", url)
return nil
}
func (zc *ZelloClient) Disconnect() (err error) {
zc.AuthSuccess = false
return zc.Connection.Close()
}
func (zc *ZelloClient) GetNextSequence() int64 {
zc.seqNumLock.Lock()
defer zc.seqNumLock.Unlock()
zc.SeqNum++
return zc.SeqNum
}
// non-public raw pdu sending functions
func (zc *ZelloClient) sendLoginWork(seq int64, channels []string) (err error) {
zc.txQueueLock.Lock()
defer zc.txQueueLock.Unlock()
byt, err := json.Marshal(ZelloLogonReq{
ZelloCommand: ZelloCommand{
Command: "logon",
Sequence: seq,
},
Username: zc.Username,
Password: zc.Password,
Channels: channels,
})
if err != nil {
return err
}
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
if err != nil {
log.Println("write close:", err)
return err
}
return nil
}
func (zc *ZelloClient) sendLoginConsumer(seq int64, channel string, authToken string) (err error) {
zc.txQueueLock.Lock()
defer zc.txQueueLock.Unlock()
byt, err := json.Marshal(ZelloLogonReq{
ZelloCommand: ZelloCommand{
Command: "logon",
Sequence: seq,
},
Username: zc.Username,
Password: zc.Password,
Channels: []string{channel},
AuthToken: authToken,
})
if err != nil {
return err
}
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
if err != nil {
log.Println("write close:", err)
return err
}
return nil
}
func (zc *ZelloClient) sendStartStream(seq int64, channel string, codecParams ZelloCodecHeader, recipient string) (err error) {
zc.txQueueLock.Lock()
defer zc.txQueueLock.Unlock()
codecHeader := make([]byte, 4)
codecHeader[0] = byte(codecParams.SampleRate & 0xff)
codecHeader[1] = byte(codecParams.SampleRate >> 8 & 0xff)
codecHeader[2] = byte(codecParams.FramesPerPaket & 0xff)
codecHeader[3] = byte(codecParams.FrameSize & 0xff)
codecHeaderB64 := base64.StdEncoding.EncodeToString(codecHeader)
byt, err := json.Marshal(ZelloStartStreamReq{
ZelloCommand: ZelloCommand{
Command: "start_stream",
Sequence: seq,
},
Channel: channel,
Type: "audio",
Codec: "opus",
For: recipient,
CodecHeader: codecHeaderB64,
PacketDuration: float64(codecParams.PacketDuration),
})
if err != nil {
return err
}
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
if err != nil {
log.Println("write close:", err)
return err
}
return nil
}
func (zc *ZelloClient) sendStopStream(seq int64, streamID int64, channel string) (err error) {
zc.txQueueLock.Lock()
defer zc.txQueueLock.Unlock()
byt, err := json.Marshal(ZelloStopStreamReq{
ZelloCommand: ZelloCommand{
Command: "stop_stream",
Sequence: seq,
},
Channel: channel,
StreamID: streamID,
})
if err != nil {
return err
}
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
if err != nil {
log.Println("write close:", err)
return err
}
return nil
}
func (zc *ZelloClient) sendTextMessage(seq int64, channel string, message string, recipient string) (err error) {
zc.txQueueLock.Lock()
defer zc.txQueueLock.Unlock()
byt, err := json.Marshal(ZelloSendTextMessage{
ZelloCommand: ZelloCommand{
Command: "send_text_message",
Sequence: seq,
},
Channel: channel,
Text: message,
For: recipient,
})
if err != nil {
return err
}
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
if err != nil {
log.Println("write close:", err)
return err
}
return nil
}
func (zc *ZelloClient) sendLocation(seq int64, channel string, location ZelloLocation, recipient string) (err error) {
zc.txQueueLock.Lock()
defer zc.txQueueLock.Unlock()
byt, err := json.Marshal(ZelloSendLocation{
ZelloCommand: ZelloCommand{
Command: "send_location",
Sequence: seq,
},
Channel: channel,
ZelloLocation: location,
For: recipient,
})
if err != nil {
return err
}
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
if err != nil {
log.Println("write close:", err)
return err
}
return nil
}
func (zc *ZelloClient) sendRaw(command map[string]interface{}) (err error) {
zc.txQueueLock.Lock()
defer zc.txQueueLock.Unlock()
byt, err := json.Marshal(command)
if err != nil {
return err
}
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
if err != nil {
log.Println("write close:", err)
return err
}
return nil
}
// public functions with timeout and error handlng
func (zc *ZelloClient) SendLoginWork(channels []string) (err error) {
seq := zc.GetNextSequence()
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
err = zc.sendLoginWork(seq, channels)
if err != nil {
return err
}
timeout := time.NewTimer(zc.DefaultTimeout)
defer timeout.Stop()
defer delete(zc.SeqResponseChans, seq)
select {
case <-timeout.C:
return fmt.Errorf("timeout")
case res := <-zc.SeqResponseChans[seq]:
delete(zc.SeqResponseChans, seq)
if res["success"].(bool) {
zc.AuthSuccess = true
zc.JoinedChannels = channels
return nil
} else {
return fmt.Errorf(res["error"].(string))
}
}
}
func (zc *ZelloClient) SendLoginConsmer(channel string, authToken string) (err error) {
seq := zc.GetNextSequence()
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
err = zc.sendLoginConsumer(seq, channel, authToken)
if err != nil {
return err
}
timeout := time.NewTimer(zc.DefaultTimeout)
defer timeout.Stop()
defer delete(zc.SeqResponseChans, seq)
select {
case <-timeout.C:
return fmt.Errorf("timeout")
case res := <-zc.SeqResponseChans[seq]:
delete(zc.SeqResponseChans, seq)
if res["success"].(bool) {
zc.AuthSuccess = true
zc.JoinedChannels = []string{channel}
return nil
} else {
return fmt.Errorf(res["error"].(string))
}
}
}
func (zc *ZelloClient) StartStream(channel string, codecParams ZelloCodecHeader, indRecipient string) (streamID int64, err error) {
seq := zc.GetNextSequence()
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
err = zc.sendStartStream(seq, channel, codecParams, indRecipient)
if err != nil {
return -1, err
}
timeout := time.NewTimer(zc.DefaultTimeout)
defer timeout.Stop()
defer delete(zc.SeqResponseChans, seq)
select {
case <-timeout.C:
return -1, fmt.Errorf("timeout")
case res := <-zc.SeqResponseChans[seq]:
if _, ok := res["success"]; !ok {
return -1, fmt.Errorf(res["error"].(string))
}
if res["success"].(bool) {
streamID = int64(res["stream_id"].(float64))
return streamID, nil
} else {
return -1, fmt.Errorf(res["error"].(string))
}
}
}
func (zc *ZelloClient) StopStream(channel string, streamID int64) (err error) {
seq := zc.GetNextSequence()
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
err = zc.sendStopStream(seq, streamID, channel)
if err != nil {
return err
}
timeout := time.NewTimer(zc.DefaultTimeout)
defer timeout.Stop()
defer delete(zc.SeqResponseChans, seq)
select {
case <-timeout.C:
return fmt.Errorf("timeout")
case res := <-zc.SeqResponseChans[seq]:
if _, ok := res["success"]; !ok {
return fmt.Errorf(res["error"].(string))
}
if res["success"].(bool) {
return nil
} else {
return fmt.Errorf(res["error"].(string))
}
}
}
func (zc *ZelloClient) SendTextMessage(channel string, message string, indRecipient string) (err error) {
seq := zc.GetNextSequence()
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
err = zc.sendTextMessage(seq, channel, message, indRecipient)
if err != nil {
return err
}
timeout := time.NewTimer(zc.DefaultTimeout)
defer timeout.Stop()
defer delete(zc.SeqResponseChans, seq)
select {
case <-timeout.C:
return fmt.Errorf("timeout")
case res := <-zc.SeqResponseChans[seq]:
if _, ok := res["success"]; !ok {
return fmt.Errorf(res["error"].(string))
}
if res["success"].(bool) {
return nil
} else {
return fmt.Errorf(res["error"].(string))
}
}
}
func (zc *ZelloClient) SendLocation(channel string, location ZelloLocation, indRecipient string) (err error) {
seq := zc.GetNextSequence()
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
err = zc.sendLocation(seq, channel, location, indRecipient)
if err != nil {
return err
}
timeout := time.NewTimer(zc.DefaultTimeout)
defer timeout.Stop()
defer delete(zc.SeqResponseChans, seq)
select {
case <-timeout.C:
return fmt.Errorf("timeout")
case res := <-zc.SeqResponseChans[seq]:
if _, ok := res["success"]; !ok {
return fmt.Errorf(res["error"].(string))
}
if res["success"].(bool) {
return nil
} else {
return fmt.Errorf(res["error"].(string))
}
}
}
func (zc *ZelloClient) SendRaw(command map[string]interface{}) (err error) {
seq := zc.GetNextSequence()
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
command["seq"] = seq
err = zc.sendRaw(command)
if err != nil {
return err
}
timeout := time.NewTimer(zc.DefaultTimeout)
defer timeout.Stop()
defer delete(zc.SeqResponseChans, seq)
select {
case <-timeout.C:
return fmt.Errorf("timeout")
case res := <-zc.SeqResponseChans[seq]:
delete(zc.SeqResponseChans, seq)
if res["success"].(bool) {
return nil
} else {
return fmt.Errorf(res["error"].(string))
}
}
}
func (zc *ZelloClient) SendRawAsync(command map[string]interface{}) (err error, seq int64) {
seq = zc.GetNextSequence()
command["seq"] = float64(seq)
err = zc.sendRaw(command)
if err != nil {
return err, seq
}
return nil, seq
}
func (zc *ZelloClient) SendRawAsyncDirect(command map[string]interface{}, seq int64) (err error) {
command["seq"] = float64(seq)
err = zc.sendRaw(command)
if err != nil {
return err
}
return nil
}
func (zc *ZelloClient) SendBinary(data []byte) (err error) {
zc.txQueueLock.Lock()
defer zc.txQueueLock.Unlock()
err = zc.Connection.WriteMessage(websocket.BinaryMessage, data)
if err != nil {
return err
}
return nil
}
func (zc *ZelloClient) handleRXjson(message []byte) (err error) {
zres := ZelloResponse{}
err = json.Unmarshal(message, &zres)
if err != nil {
log.Println(err)
return
}
rawJSON := make(map[string]interface{})
err = json.Unmarshal(message, &rawJSON)
if err != nil {
log.Println(err)
return
}
if _, ok := zc.SeqResponseChans[zres.Sequence]; ok {
zc.SeqResponseChans[zres.Sequence] <- rawJSON
} else {
zresPacked := ZelloResponsePacked{
ZelloResponse: zres,
Raw: rawJSON,
}
zc.GeneralEventChan <- zresPacked
}
return nil
}
func (zc *ZelloClient) handleRXbinary(data []byte) (err error) {
//log.Printf("handleRXbinary: %s", data)
zc.BinaryDataChan <- data
return nil
}
func (zc *ZelloClient) Work() (err error) {
for {
msgType, message, err := zc.Connection.ReadMessage()
if err != nil {
//log.Println("read:", err)
zc.ErrorEventChan <- err
return err
}
if msgType == websocket.TextMessage {
err = zc.handleRXjson(message)
if err != nil {
log.Println("handleRXjson:", err)
return err
}
}
if msgType == websocket.BinaryMessage {
err = zc.handleRXbinary(message)
if err != nil {
log.Println("handleRXbinary:", err)
return err
}
}
}
}
// TODO : overthink this
func (zc *ZelloClient) Close() error {
return zc.Connection.Close()
}