You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
518 lines
12 KiB
Go
518 lines
12 KiB
Go
package client
|
|
|
|
import (
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
type ZelloClient struct {
|
|
Connection *websocket.Conn
|
|
AuthSuccess bool
|
|
JoinedChannels []string
|
|
|
|
PlatformType string
|
|
PlatformName string
|
|
URL string
|
|
Username string
|
|
Password string
|
|
DefaultTimeout time.Duration
|
|
|
|
SeqNum int64
|
|
seqNumLock sync.Mutex
|
|
|
|
txQueueLock sync.Mutex
|
|
|
|
// Event Map for responses
|
|
SeqResponseChans map[int64]chan map[string]interface{}
|
|
// Event Chans
|
|
GeneralEventChan chan ZelloResponsePacked
|
|
BinaryDataChan chan []byte
|
|
ErrorEventChan chan error
|
|
}
|
|
|
|
func NewZelloClient() (zelloClient *ZelloClient) {
|
|
return &ZelloClient{
|
|
seqNumLock: sync.Mutex{},
|
|
txQueueLock: sync.Mutex{},
|
|
|
|
DefaultTimeout: time.Millisecond * 1e3,
|
|
SeqResponseChans: make(map[int64]chan map[string]interface{}),
|
|
GeneralEventChan: make(chan ZelloResponsePacked),
|
|
ErrorEventChan: make(chan error),
|
|
BinaryDataChan: make(chan []byte),
|
|
}
|
|
}
|
|
|
|
func (zc *ZelloClient) Connect(url string) (err error) {
|
|
zc.URL = url
|
|
zc.Connection, _, err = websocket.DefaultDialer.Dial(url, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fmt.Printf("connected to %s\n", url)
|
|
|
|
return nil
|
|
}
|
|
func (zc *ZelloClient) Disconnect() (err error) {
|
|
zc.AuthSuccess = false
|
|
return zc.Connection.Close()
|
|
}
|
|
|
|
func (zc *ZelloClient) GetNextSequence() int64 {
|
|
zc.seqNumLock.Lock()
|
|
defer zc.seqNumLock.Unlock()
|
|
zc.SeqNum++
|
|
return zc.SeqNum
|
|
}
|
|
|
|
// non-public raw pdu sending functions
|
|
func (zc *ZelloClient) sendLoginWork(seq int64, channels []string) (err error) {
|
|
zc.txQueueLock.Lock()
|
|
defer zc.txQueueLock.Unlock()
|
|
|
|
byt, err := json.Marshal(ZelloLogonReq{
|
|
ZelloCommand: ZelloCommand{
|
|
Command: "logon",
|
|
Sequence: seq,
|
|
},
|
|
Username: zc.Username,
|
|
Password: zc.Password,
|
|
Channels: channels,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
|
|
if err != nil {
|
|
log.Println("write close:", err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
func (zc *ZelloClient) sendLoginConsumer(seq int64, channel string, authToken string) (err error) {
|
|
zc.txQueueLock.Lock()
|
|
defer zc.txQueueLock.Unlock()
|
|
|
|
byt, err := json.Marshal(ZelloLogonReq{
|
|
ZelloCommand: ZelloCommand{
|
|
Command: "logon",
|
|
Sequence: seq,
|
|
},
|
|
Username: zc.Username,
|
|
Password: zc.Password,
|
|
Channels: []string{channel},
|
|
AuthToken: authToken,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
|
|
if err != nil {
|
|
log.Println("write close:", err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (zc *ZelloClient) sendStartStream(seq int64, channel string, codecParams ZelloCodecHeader, recipient string) (err error) {
|
|
zc.txQueueLock.Lock()
|
|
defer zc.txQueueLock.Unlock()
|
|
|
|
codecHeader := make([]byte, 4)
|
|
codecHeader[0] = byte(codecParams.SampleRate & 0xff)
|
|
codecHeader[1] = byte(codecParams.SampleRate >> 8 & 0xff)
|
|
codecHeader[2] = byte(codecParams.FramesPerPaket & 0xff)
|
|
codecHeader[3] = byte(codecParams.FrameSize & 0xff)
|
|
codecHeaderB64 := base64.StdEncoding.EncodeToString(codecHeader)
|
|
|
|
byt, err := json.Marshal(ZelloStartStreamReq{
|
|
ZelloCommand: ZelloCommand{
|
|
Command: "start_stream",
|
|
Sequence: seq,
|
|
},
|
|
Channel: channel,
|
|
Type: "audio",
|
|
Codec: "opus",
|
|
For: recipient,
|
|
CodecHeader: codecHeaderB64,
|
|
PacketDuration: float64(codecParams.PacketDuration),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
|
|
if err != nil {
|
|
log.Println("write close:", err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
func (zc *ZelloClient) sendStopStream(seq int64, streamID int64, channel string) (err error) {
|
|
zc.txQueueLock.Lock()
|
|
defer zc.txQueueLock.Unlock()
|
|
|
|
byt, err := json.Marshal(ZelloStopStreamReq{
|
|
ZelloCommand: ZelloCommand{
|
|
Command: "stop_stream",
|
|
Sequence: seq,
|
|
},
|
|
Channel: channel,
|
|
StreamID: streamID,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
|
|
if err != nil {
|
|
log.Println("write close:", err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
func (zc *ZelloClient) sendTextMessage(seq int64, channel string, message string, recipient string) (err error) {
|
|
zc.txQueueLock.Lock()
|
|
defer zc.txQueueLock.Unlock()
|
|
|
|
byt, err := json.Marshal(ZelloSendTextMessage{
|
|
ZelloCommand: ZelloCommand{
|
|
Command: "send_text_message",
|
|
Sequence: seq,
|
|
},
|
|
Channel: channel,
|
|
Text: message,
|
|
For: recipient,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
|
|
if err != nil {
|
|
log.Println("write close:", err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
func (zc *ZelloClient) sendLocation(seq int64, channel string, location ZelloLocation, recipient string) (err error) {
|
|
zc.txQueueLock.Lock()
|
|
defer zc.txQueueLock.Unlock()
|
|
|
|
byt, err := json.Marshal(ZelloSendLocation{
|
|
ZelloCommand: ZelloCommand{
|
|
Command: "send_location",
|
|
Sequence: seq,
|
|
},
|
|
Channel: channel,
|
|
ZelloLocation: location,
|
|
For: recipient,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
|
|
if err != nil {
|
|
log.Println("write close:", err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (zc *ZelloClient) sendRaw(command map[string]interface{}) (err error) {
|
|
zc.txQueueLock.Lock()
|
|
defer zc.txQueueLock.Unlock()
|
|
|
|
byt, err := json.Marshal(command)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
|
|
if err != nil {
|
|
log.Println("write close:", err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// public functions with timeout and error handlng
|
|
func (zc *ZelloClient) SendLoginWork(channels []string) (err error) {
|
|
seq := zc.GetNextSequence()
|
|
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
|
|
|
|
err = zc.sendLoginWork(seq, channels)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
timeout := time.NewTimer(zc.DefaultTimeout)
|
|
defer timeout.Stop()
|
|
defer delete(zc.SeqResponseChans, seq)
|
|
|
|
select {
|
|
case <-timeout.C:
|
|
return fmt.Errorf("timeout")
|
|
case res := <-zc.SeqResponseChans[seq]:
|
|
delete(zc.SeqResponseChans, seq)
|
|
if res["success"].(bool) {
|
|
zc.AuthSuccess = true
|
|
zc.JoinedChannels = channels
|
|
return nil
|
|
} else {
|
|
return fmt.Errorf(res["error"].(string))
|
|
}
|
|
}
|
|
}
|
|
func (zc *ZelloClient) SendLoginConsmer(channel string, authToken string) (err error) {
|
|
seq := zc.GetNextSequence()
|
|
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
|
|
|
|
err = zc.sendLoginConsumer(seq, channel, authToken)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
timeout := time.NewTimer(zc.DefaultTimeout)
|
|
defer timeout.Stop()
|
|
defer delete(zc.SeqResponseChans, seq)
|
|
|
|
select {
|
|
case <-timeout.C:
|
|
return fmt.Errorf("timeout")
|
|
case res := <-zc.SeqResponseChans[seq]:
|
|
delete(zc.SeqResponseChans, seq)
|
|
if res["success"].(bool) {
|
|
zc.AuthSuccess = true
|
|
zc.JoinedChannels = []string{channel}
|
|
return nil
|
|
} else {
|
|
return fmt.Errorf(res["error"].(string))
|
|
}
|
|
}
|
|
}
|
|
func (zc *ZelloClient) StartStream(channel string, codecParams ZelloCodecHeader, indRecipient string) (streamID int64, err error) {
|
|
seq := zc.GetNextSequence()
|
|
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
|
|
|
|
err = zc.sendStartStream(seq, channel, codecParams, indRecipient)
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
timeout := time.NewTimer(zc.DefaultTimeout)
|
|
defer timeout.Stop()
|
|
defer delete(zc.SeqResponseChans, seq)
|
|
|
|
select {
|
|
case <-timeout.C:
|
|
return -1, fmt.Errorf("timeout")
|
|
case res := <-zc.SeqResponseChans[seq]:
|
|
if _, ok := res["success"]; !ok {
|
|
return -1, fmt.Errorf(res["error"].(string))
|
|
}
|
|
if res["success"].(bool) {
|
|
streamID = int64(res["stream_id"].(float64))
|
|
return streamID, nil
|
|
} else {
|
|
return -1, fmt.Errorf(res["error"].(string))
|
|
}
|
|
}
|
|
}
|
|
func (zc *ZelloClient) StopStream(channel string, streamID int64) (err error) {
|
|
seq := zc.GetNextSequence()
|
|
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
|
|
|
|
err = zc.sendStopStream(seq, streamID, channel)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
timeout := time.NewTimer(zc.DefaultTimeout)
|
|
defer timeout.Stop()
|
|
defer delete(zc.SeqResponseChans, seq)
|
|
|
|
select {
|
|
case <-timeout.C:
|
|
return fmt.Errorf("timeout")
|
|
case res := <-zc.SeqResponseChans[seq]:
|
|
if _, ok := res["success"]; !ok {
|
|
return fmt.Errorf(res["error"].(string))
|
|
}
|
|
if res["success"].(bool) {
|
|
return nil
|
|
} else {
|
|
return fmt.Errorf(res["error"].(string))
|
|
}
|
|
}
|
|
}
|
|
func (zc *ZelloClient) SendTextMessage(channel string, message string, indRecipient string) (err error) {
|
|
seq := zc.GetNextSequence()
|
|
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
|
|
|
|
err = zc.sendTextMessage(seq, channel, message, indRecipient)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
timeout := time.NewTimer(zc.DefaultTimeout)
|
|
defer timeout.Stop()
|
|
defer delete(zc.SeqResponseChans, seq)
|
|
|
|
select {
|
|
case <-timeout.C:
|
|
return fmt.Errorf("timeout")
|
|
case res := <-zc.SeqResponseChans[seq]:
|
|
if _, ok := res["success"]; !ok {
|
|
return fmt.Errorf(res["error"].(string))
|
|
}
|
|
if res["success"].(bool) {
|
|
return nil
|
|
} else {
|
|
return fmt.Errorf(res["error"].(string))
|
|
}
|
|
}
|
|
}
|
|
func (zc *ZelloClient) SendLocation(channel string, location ZelloLocation, indRecipient string) (err error) {
|
|
seq := zc.GetNextSequence()
|
|
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
|
|
|
|
err = zc.sendLocation(seq, channel, location, indRecipient)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
timeout := time.NewTimer(zc.DefaultTimeout)
|
|
defer timeout.Stop()
|
|
defer delete(zc.SeqResponseChans, seq)
|
|
|
|
select {
|
|
case <-timeout.C:
|
|
return fmt.Errorf("timeout")
|
|
case res := <-zc.SeqResponseChans[seq]:
|
|
if _, ok := res["success"]; !ok {
|
|
return fmt.Errorf(res["error"].(string))
|
|
}
|
|
if res["success"].(bool) {
|
|
return nil
|
|
} else {
|
|
return fmt.Errorf(res["error"].(string))
|
|
}
|
|
}
|
|
}
|
|
func (zc *ZelloClient) SendRaw(command map[string]interface{}) (err error) {
|
|
seq := zc.GetNextSequence()
|
|
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
|
|
|
|
command["seq"] = seq
|
|
err = zc.sendRaw(command)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
timeout := time.NewTimer(zc.DefaultTimeout)
|
|
defer timeout.Stop()
|
|
defer delete(zc.SeqResponseChans, seq)
|
|
|
|
select {
|
|
case <-timeout.C:
|
|
return fmt.Errorf("timeout")
|
|
case res := <-zc.SeqResponseChans[seq]:
|
|
delete(zc.SeqResponseChans, seq)
|
|
if res["success"].(bool) {
|
|
return nil
|
|
} else {
|
|
return fmt.Errorf(res["error"].(string))
|
|
}
|
|
}
|
|
}
|
|
func (zc *ZelloClient) SendRawAsync(command map[string]interface{}) (err error, seq int64) {
|
|
seq = zc.GetNextSequence()
|
|
command["seq"] = float64(seq)
|
|
err = zc.sendRaw(command)
|
|
if err != nil {
|
|
return err, seq
|
|
}
|
|
return nil, seq
|
|
}
|
|
func (zc *ZelloClient) SendRawAsyncDirect(command map[string]interface{}, seq int64) (err error) {
|
|
command["seq"] = float64(seq)
|
|
err = zc.sendRaw(command)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
func (zc *ZelloClient) SendBinary(data []byte) (err error) {
|
|
zc.txQueueLock.Lock()
|
|
defer zc.txQueueLock.Unlock()
|
|
|
|
err = zc.Connection.WriteMessage(websocket.BinaryMessage, data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (zc *ZelloClient) handleRXjson(message []byte) (err error) {
|
|
zres := ZelloResponse{}
|
|
err = json.Unmarshal(message, &zres)
|
|
if err != nil {
|
|
log.Println(err)
|
|
return
|
|
}
|
|
rawJSON := make(map[string]interface{})
|
|
err = json.Unmarshal(message, &rawJSON)
|
|
if err != nil {
|
|
log.Println(err)
|
|
return
|
|
}
|
|
if _, ok := zc.SeqResponseChans[zres.Sequence]; ok {
|
|
zc.SeqResponseChans[zres.Sequence] <- rawJSON
|
|
} else {
|
|
zresPacked := ZelloResponsePacked{
|
|
ZelloResponse: zres,
|
|
Raw: rawJSON,
|
|
}
|
|
zc.GeneralEventChan <- zresPacked
|
|
}
|
|
return nil
|
|
}
|
|
func (zc *ZelloClient) handleRXbinary(data []byte) (err error) {
|
|
//log.Printf("handleRXbinary: %s", data)
|
|
zc.BinaryDataChan <- data
|
|
return nil
|
|
}
|
|
|
|
func (zc *ZelloClient) Work() (err error) {
|
|
for {
|
|
msgType, message, err := zc.Connection.ReadMessage()
|
|
if err != nil {
|
|
//log.Println("read:", err)
|
|
zc.ErrorEventChan <- err
|
|
return err
|
|
}
|
|
if msgType == websocket.TextMessage {
|
|
err = zc.handleRXjson(message)
|
|
if err != nil {
|
|
log.Println("handleRXjson:", err)
|
|
return err
|
|
}
|
|
}
|
|
if msgType == websocket.BinaryMessage {
|
|
err = zc.handleRXbinary(message)
|
|
if err != nil {
|
|
log.Println("handleRXbinary:", err)
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// TODO : overthink this
|
|
func (zc *ZelloClient) Close() error {
|
|
return zc.Connection.Close()
|
|
}
|