package client import ( "encoding/base64" "encoding/json" "fmt" "log" "sync" "time" "github.com/gorilla/websocket" ) type ZelloClient struct { Connection *websocket.Conn AuthSuccess bool JoinedChannels []string PlatformType string PlatformName string URL string Username string Password string DefaultTimeout time.Duration SeqNum int64 seqNumLock sync.Mutex txQueueLock sync.Mutex // Event Map for responses SeqResponseChans map[int64]chan map[string]interface{} // Event Chans GeneralEventChan chan ZelloResponsePacked BinaryDataChan chan []byte ErrorEventChan chan error } func NewZelloClient() (zelloClient *ZelloClient) { return &ZelloClient{ seqNumLock: sync.Mutex{}, txQueueLock: sync.Mutex{}, DefaultTimeout: time.Millisecond * 1e3, SeqResponseChans: make(map[int64]chan map[string]interface{}), GeneralEventChan: make(chan ZelloResponsePacked), ErrorEventChan: make(chan error), BinaryDataChan: make(chan []byte), } } func (zc *ZelloClient) Connect(url string) (err error) { zc.URL = url zc.Connection, _, err = websocket.DefaultDialer.Dial(url, nil) if err != nil { return err } fmt.Printf("connected to %s\n", url) return nil } func (zc *ZelloClient) Disconnect() (err error) { zc.AuthSuccess = false return zc.Connection.Close() } func (zc *ZelloClient) GetNextSequence() int64 { zc.seqNumLock.Lock() defer zc.seqNumLock.Unlock() zc.SeqNum++ return zc.SeqNum } // non-public raw pdu sending functions func (zc *ZelloClient) sendLoginWork(seq int64, channels []string) (err error) { zc.txQueueLock.Lock() defer zc.txQueueLock.Unlock() byt, err := json.Marshal(ZelloLogonReq{ ZelloCommand: ZelloCommand{ Command: "logon", Sequence: seq, }, Username: zc.Username, Password: zc.Password, Channels: channels, }) if err != nil { return err } err = zc.Connection.WriteMessage(websocket.TextMessage, byt) if err != nil { log.Println("write close:", err) return err } return nil } func (zc *ZelloClient) sendLoginConsumer(seq int64, channel string, authToken string) (err error) { zc.txQueueLock.Lock() defer zc.txQueueLock.Unlock() byt, err := json.Marshal(ZelloLogonReq{ ZelloCommand: ZelloCommand{ Command: "logon", Sequence: seq, }, Username: zc.Username, Password: zc.Password, Channels: []string{channel}, AuthToken: authToken, }) if err != nil { return err } err = zc.Connection.WriteMessage(websocket.TextMessage, byt) if err != nil { log.Println("write close:", err) return err } return nil } func (zc *ZelloClient) sendStartStream(seq int64, channel string, codecParams ZelloCodecHeader, recipient string) (err error) { zc.txQueueLock.Lock() defer zc.txQueueLock.Unlock() codecHeader := make([]byte, 4) codecHeader[0] = byte(codecParams.SampleRate & 0xff) codecHeader[1] = byte(codecParams.SampleRate >> 8 & 0xff) codecHeader[2] = byte(codecParams.FramesPerPaket & 0xff) codecHeader[3] = byte(codecParams.FrameSize & 0xff) codecHeaderB64 := base64.StdEncoding.EncodeToString(codecHeader) byt, err := json.Marshal(ZelloStartStreamReq{ ZelloCommand: ZelloCommand{ Command: "start_stream", Sequence: seq, }, Channel: channel, Type: "audio", Codec: "opus", For: recipient, CodecHeader: codecHeaderB64, PacketDuration: float64(codecParams.PacketDuration), }) if err != nil { return err } err = zc.Connection.WriteMessage(websocket.TextMessage, byt) if err != nil { log.Println("write close:", err) return err } return nil } func (zc *ZelloClient) sendStopStream(seq int64, streamID int64, channel string) (err error) { zc.txQueueLock.Lock() defer zc.txQueueLock.Unlock() byt, err := json.Marshal(ZelloStopStreamReq{ ZelloCommand: ZelloCommand{ Command: "stop_stream", Sequence: seq, }, Channel: channel, StreamID: streamID, }) if err != nil { return err } err = zc.Connection.WriteMessage(websocket.TextMessage, byt) if err != nil { log.Println("write close:", err) return err } return nil } func (zc *ZelloClient) sendTextMessage(seq int64, channel string, message string, recipient string) (err error) { zc.txQueueLock.Lock() defer zc.txQueueLock.Unlock() byt, err := json.Marshal(ZelloSendTextMessage{ ZelloCommand: ZelloCommand{ Command: "send_text_message", Sequence: seq, }, Channel: channel, Text: message, For: recipient, }) if err != nil { return err } err = zc.Connection.WriteMessage(websocket.TextMessage, byt) if err != nil { log.Println("write close:", err) return err } return nil } func (zc *ZelloClient) sendLocation(seq int64, channel string, location ZelloLocation, recipient string) (err error) { zc.txQueueLock.Lock() defer zc.txQueueLock.Unlock() byt, err := json.Marshal(ZelloSendLocation{ ZelloCommand: ZelloCommand{ Command: "send_location", Sequence: seq, }, Channel: channel, ZelloLocation: location, For: recipient, }) if err != nil { return err } err = zc.Connection.WriteMessage(websocket.TextMessage, byt) if err != nil { log.Println("write close:", err) return err } return nil } func (zc *ZelloClient) sendRaw(command map[string]interface{}) (err error) { zc.txQueueLock.Lock() defer zc.txQueueLock.Unlock() byt, err := json.Marshal(command) if err != nil { return err } err = zc.Connection.WriteMessage(websocket.TextMessage, byt) if err != nil { log.Println("write close:", err) return err } return nil } // public functions with timeout and error handlng func (zc *ZelloClient) SendLoginWork(channels []string) (err error) { seq := zc.GetNextSequence() zc.SeqResponseChans[seq] = make(chan map[string]interface{}) err = zc.sendLoginWork(seq, channels) if err != nil { return err } timeout := time.NewTimer(zc.DefaultTimeout) defer timeout.Stop() defer delete(zc.SeqResponseChans, seq) select { case <-timeout.C: return fmt.Errorf("timeout") case res := <-zc.SeqResponseChans[seq]: delete(zc.SeqResponseChans, seq) if res["success"].(bool) { zc.AuthSuccess = true zc.JoinedChannels = channels return nil } else { return fmt.Errorf(res["error"].(string)) } } } func (zc *ZelloClient) SendLoginConsmer(channel string, authToken string) (err error) { seq := zc.GetNextSequence() zc.SeqResponseChans[seq] = make(chan map[string]interface{}) err = zc.sendLoginConsumer(seq, channel, authToken) if err != nil { return err } timeout := time.NewTimer(zc.DefaultTimeout) defer timeout.Stop() defer delete(zc.SeqResponseChans, seq) select { case <-timeout.C: return fmt.Errorf("timeout") case res := <-zc.SeqResponseChans[seq]: delete(zc.SeqResponseChans, seq) if res["success"].(bool) { zc.AuthSuccess = true zc.JoinedChannels = []string{channel} return nil } else { return fmt.Errorf(res["error"].(string)) } } } func (zc *ZelloClient) StartStream(channel string, codecParams ZelloCodecHeader, indRecipient string) (streamID int64, err error) { seq := zc.GetNextSequence() zc.SeqResponseChans[seq] = make(chan map[string]interface{}) err = zc.sendStartStream(seq, channel, codecParams, indRecipient) if err != nil { return -1, err } timeout := time.NewTimer(zc.DefaultTimeout) defer timeout.Stop() defer delete(zc.SeqResponseChans, seq) select { case <-timeout.C: return -1, fmt.Errorf("timeout") case res := <-zc.SeqResponseChans[seq]: if _, ok := res["success"]; !ok { return -1, fmt.Errorf(res["error"].(string)) } if res["success"].(bool) { streamID = int64(res["stream_id"].(float64)) return streamID, nil } else { return -1, fmt.Errorf(res["error"].(string)) } } } func (zc *ZelloClient) StopStream(channel string, streamID int64) (err error) { seq := zc.GetNextSequence() zc.SeqResponseChans[seq] = make(chan map[string]interface{}) err = zc.sendStopStream(seq, streamID, channel) if err != nil { return err } timeout := time.NewTimer(zc.DefaultTimeout) defer timeout.Stop() defer delete(zc.SeqResponseChans, seq) select { case <-timeout.C: return fmt.Errorf("timeout") case res := <-zc.SeqResponseChans[seq]: if _, ok := res["success"]; !ok { return fmt.Errorf(res["error"].(string)) } if res["success"].(bool) { return nil } else { return fmt.Errorf(res["error"].(string)) } } } func (zc *ZelloClient) SendTextMessage(channel string, message string, indRecipient string) (err error) { seq := zc.GetNextSequence() zc.SeqResponseChans[seq] = make(chan map[string]interface{}) err = zc.sendTextMessage(seq, channel, message, indRecipient) if err != nil { return err } timeout := time.NewTimer(zc.DefaultTimeout) defer timeout.Stop() defer delete(zc.SeqResponseChans, seq) select { case <-timeout.C: return fmt.Errorf("timeout") case res := <-zc.SeqResponseChans[seq]: if _, ok := res["success"]; !ok { return fmt.Errorf(res["error"].(string)) } if res["success"].(bool) { return nil } else { return fmt.Errorf(res["error"].(string)) } } } func (zc *ZelloClient) SendLocation(channel string, location ZelloLocation, indRecipient string) (err error) { seq := zc.GetNextSequence() zc.SeqResponseChans[seq] = make(chan map[string]interface{}) err = zc.sendLocation(seq, channel, location, indRecipient) if err != nil { return err } timeout := time.NewTimer(zc.DefaultTimeout) defer timeout.Stop() defer delete(zc.SeqResponseChans, seq) select { case <-timeout.C: return fmt.Errorf("timeout") case res := <-zc.SeqResponseChans[seq]: if _, ok := res["success"]; !ok { return fmt.Errorf(res["error"].(string)) } if res["success"].(bool) { return nil } else { return fmt.Errorf(res["error"].(string)) } } } func (zc *ZelloClient) SendRaw(command map[string]interface{}) (err error) { seq := zc.GetNextSequence() zc.SeqResponseChans[seq] = make(chan map[string]interface{}) command["seq"] = seq err = zc.sendRaw(command) if err != nil { return err } timeout := time.NewTimer(zc.DefaultTimeout) defer timeout.Stop() defer delete(zc.SeqResponseChans, seq) select { case <-timeout.C: return fmt.Errorf("timeout") case res := <-zc.SeqResponseChans[seq]: delete(zc.SeqResponseChans, seq) if res["success"].(bool) { return nil } else { return fmt.Errorf(res["error"].(string)) } } } func (zc *ZelloClient) SendRawAsync(command map[string]interface{}) (err error, seq int64) { seq = zc.GetNextSequence() command["seq"] = float64(seq) err = zc.sendRaw(command) if err != nil { return err, seq } return nil, seq } func (zc *ZelloClient) SendRawAsyncDirect(command map[string]interface{}, seq int64) (err error) { command["seq"] = float64(seq) err = zc.sendRaw(command) if err != nil { return err } return nil } func (zc *ZelloClient) SendBinary(data []byte) (err error) { zc.txQueueLock.Lock() defer zc.txQueueLock.Unlock() err = zc.Connection.WriteMessage(websocket.BinaryMessage, data) if err != nil { return err } return nil } func (zc *ZelloClient) handleRXjson(message []byte) (err error) { zres := ZelloResponse{} err = json.Unmarshal(message, &zres) if err != nil { log.Println(err) return } rawJSON := make(map[string]interface{}) err = json.Unmarshal(message, &rawJSON) if err != nil { log.Println(err) return } if _, ok := zc.SeqResponseChans[zres.Sequence]; ok { zc.SeqResponseChans[zres.Sequence] <- rawJSON } else { zresPacked := ZelloResponsePacked{ ZelloResponse: zres, Raw: rawJSON, } zc.GeneralEventChan <- zresPacked } return nil } func (zc *ZelloClient) handleRXbinary(data []byte) (err error) { //log.Printf("handleRXbinary: %s", data) zc.BinaryDataChan <- data return nil } func (zc *ZelloClient) Work() (err error) { for { msgType, message, err := zc.Connection.ReadMessage() if err != nil { //log.Println("read:", err) zc.ErrorEventChan <- err return err } if msgType == websocket.TextMessage { err = zc.handleRXjson(message) if err != nil { log.Println("handleRXjson:", err) return err } } if msgType == websocket.BinaryMessage { err = zc.handleRXbinary(message) if err != nil { log.Println("handleRXbinary:", err) return err } } } } // TODO : overthink this func (zc *ZelloClient) Close() error { return zc.Connection.Close() }