commit 4b4c7eefe654d18d9e49acf7d98121790a225f4c Author: cheetah.cat Date: Mon Aug 5 13:11:19 2024 +0200 initial commit diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..0e66271 --- /dev/null +++ b/client/client.go @@ -0,0 +1,514 @@ +package client + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "log" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +type ZelloClient struct { + Connection *websocket.Conn + AuthSuccess bool + JoinedChannels []string + + PlatformType string + PlatformName string + URL string + Username string + Password string + DefaultTimeout time.Duration + + SeqNum int64 + seqNumLock sync.Mutex + + txQueueLock sync.Mutex + + // Event Map for responses + SeqResponseChans map[int64]chan map[string]interface{} + // Event Chans + GeneralEventChan chan ZelloResponsePacked + BinaryDataChan chan []byte +} + +func NewZelloClient() (zelloClient *ZelloClient) { + return &ZelloClient{ + seqNumLock: sync.Mutex{}, + txQueueLock: sync.Mutex{}, + + DefaultTimeout: time.Millisecond * 1e3, + SeqResponseChans: make(map[int64]chan map[string]interface{}), + GeneralEventChan: make(chan ZelloResponsePacked), + BinaryDataChan: make(chan []byte), + } +} + +func (zc *ZelloClient) Connect(url string) (err error) { + zc.URL = url + zc.Connection, _, err = websocket.DefaultDialer.Dial(url, nil) + if err != nil { + return err + } + fmt.Printf("connected to %s\n", url) + + return nil +} +func (zc *ZelloClient) Disconnect() (err error) { + zc.AuthSuccess = false + return zc.Connection.Close() +} + +func (zc *ZelloClient) GetNextSequence() int64 { + zc.seqNumLock.Lock() + defer zc.seqNumLock.Unlock() + zc.SeqNum++ + return zc.SeqNum +} + +// non-public raw pdu sending functions +func (zc *ZelloClient) sendLoginWork(seq int64, channels []string) (err error) { + zc.txQueueLock.Lock() + defer zc.txQueueLock.Unlock() + + byt, err := json.Marshal(ZelloLogonReq{ + ZelloCommand: ZelloCommand{ + Command: "logon", + Sequence: seq, + }, + Username: zc.Username, + Password: zc.Password, + Channels: channels, + }) + if err != nil { + return err + } + err = zc.Connection.WriteMessage(websocket.TextMessage, byt) + if err != nil { + log.Println("write close:", err) + return err + } + return nil +} +func (zc *ZelloClient) sendLoginConsumer(seq int64, channel string, authToken string) (err error) { + zc.txQueueLock.Lock() + defer zc.txQueueLock.Unlock() + + byt, err := json.Marshal(ZelloLogonReq{ + ZelloCommand: ZelloCommand{ + Command: "logon", + Sequence: seq, + }, + Username: zc.Username, + Password: zc.Password, + Channels: []string{channel}, + AuthToken: authToken, + }) + if err != nil { + return err + } + err = zc.Connection.WriteMessage(websocket.TextMessage, byt) + if err != nil { + log.Println("write close:", err) + return err + } + return nil +} + +func (zc *ZelloClient) sendStartStream(seq int64, channel string, codecParams ZelloCodecHeader, recipient string) (err error) { + zc.txQueueLock.Lock() + defer zc.txQueueLock.Unlock() + + codecHeader := make([]byte, 4) + codecHeader[0] = byte(codecParams.SampleRate & 0xff) + codecHeader[1] = byte(codecParams.SampleRate >> 8 & 0xff) + codecHeader[2] = byte(codecParams.FramesPerPaket & 0xff) + codecHeader[3] = byte(codecParams.FrameSize & 0xff) + codecHeaderB64 := base64.StdEncoding.EncodeToString(codecHeader) + + byt, err := json.Marshal(ZelloStartStreamReq{ + ZelloCommand: ZelloCommand{ + Command: "start_stream", + Sequence: seq, + }, + Channel: channel, + Type: "audio", + Codec: "opus", + For: recipient, + CodecHeader: codecHeaderB64, + PacketDuration: float64(codecParams.PacketDuration), + }) + if err != nil { + return err + } + err = zc.Connection.WriteMessage(websocket.TextMessage, byt) + if err != nil { + log.Println("write close:", err) + return err + } + return nil +} +func (zc *ZelloClient) sendStopStream(seq int64, streamID int64, channel string) (err error) { + zc.txQueueLock.Lock() + defer zc.txQueueLock.Unlock() + + byt, err := json.Marshal(ZelloStopStreamReq{ + ZelloCommand: ZelloCommand{ + Command: "stop_stream", + Sequence: seq, + }, + Channel: channel, + StreamID: streamID, + }) + if err != nil { + return err + } + err = zc.Connection.WriteMessage(websocket.TextMessage, byt) + if err != nil { + log.Println("write close:", err) + return err + } + return nil +} +func (zc *ZelloClient) sendTextMessage(seq int64, channel string, message string, recipient string) (err error) { + zc.txQueueLock.Lock() + defer zc.txQueueLock.Unlock() + + byt, err := json.Marshal(ZelloSendTextMessage{ + ZelloCommand: ZelloCommand{ + Command: "send_text_message", + Sequence: seq, + }, + Channel: channel, + Text: message, + For: recipient, + }) + if err != nil { + return err + } + err = zc.Connection.WriteMessage(websocket.TextMessage, byt) + if err != nil { + log.Println("write close:", err) + return err + } + return nil +} +func (zc *ZelloClient) sendLocation(seq int64, channel string, location ZelloLocation, recipient string) (err error) { + zc.txQueueLock.Lock() + defer zc.txQueueLock.Unlock() + + byt, err := json.Marshal(ZelloSendLocation{ + ZelloCommand: ZelloCommand{ + Command: "send_location", + Sequence: seq, + }, + Channel: channel, + ZelloLocation: location, + For: recipient, + }) + if err != nil { + return err + } + err = zc.Connection.WriteMessage(websocket.TextMessage, byt) + if err != nil { + log.Println("write close:", err) + return err + } + return nil +} + +func (zc *ZelloClient) sendRaw(command map[string]interface{}) (err error) { + zc.txQueueLock.Lock() + defer zc.txQueueLock.Unlock() + + byt, err := json.Marshal(command) + if err != nil { + return err + } + err = zc.Connection.WriteMessage(websocket.TextMessage, byt) + if err != nil { + log.Println("write close:", err) + return err + } + return nil +} + +// public functions with timeout and error handlng +func (zc *ZelloClient) SendLoginWork(channels []string) (err error) { + seq := zc.GetNextSequence() + zc.SeqResponseChans[seq] = make(chan map[string]interface{}) + + err = zc.sendLoginWork(seq, channels) + if err != nil { + return err + } + + timeout := time.NewTimer(zc.DefaultTimeout) + defer timeout.Stop() + defer delete(zc.SeqResponseChans, seq) + + select { + case <-timeout.C: + return fmt.Errorf("timeout") + case res := <-zc.SeqResponseChans[seq]: + delete(zc.SeqResponseChans, seq) + if res["success"].(bool) { + zc.AuthSuccess = true + zc.JoinedChannels = channels + return nil + } else { + return fmt.Errorf(res["error"].(string)) + } + } +} +func (zc *ZelloClient) SendLoginConsmer(channel string, authToken string) (err error) { + seq := zc.GetNextSequence() + zc.SeqResponseChans[seq] = make(chan map[string]interface{}) + + err = zc.sendLoginConsumer(seq, channel, authToken) + if err != nil { + return err + } + + timeout := time.NewTimer(zc.DefaultTimeout) + defer timeout.Stop() + defer delete(zc.SeqResponseChans, seq) + + select { + case <-timeout.C: + return fmt.Errorf("timeout") + case res := <-zc.SeqResponseChans[seq]: + delete(zc.SeqResponseChans, seq) + if res["success"].(bool) { + zc.AuthSuccess = true + zc.JoinedChannels = []string{channel} + return nil + } else { + return fmt.Errorf(res["error"].(string)) + } + } +} +func (zc *ZelloClient) StartStream(channel string, codecParams ZelloCodecHeader, indRecipient string) (streamID int64, err error) { + seq := zc.GetNextSequence() + zc.SeqResponseChans[seq] = make(chan map[string]interface{}) + + err = zc.sendStartStream(seq, channel, codecParams, indRecipient) + if err != nil { + return -1, err + } + + timeout := time.NewTimer(zc.DefaultTimeout) + defer timeout.Stop() + defer delete(zc.SeqResponseChans, seq) + + select { + case <-timeout.C: + return -1, fmt.Errorf("timeout") + case res := <-zc.SeqResponseChans[seq]: + if _, ok := res["success"]; !ok { + return -1, fmt.Errorf(res["error"].(string)) + } + if res["success"].(bool) { + streamID = int64(res["stream_id"].(float64)) + return streamID, nil + } else { + return -1, fmt.Errorf(res["error"].(string)) + } + } +} +func (zc *ZelloClient) StopStream(channel string, streamID int64) (err error) { + seq := zc.GetNextSequence() + zc.SeqResponseChans[seq] = make(chan map[string]interface{}) + + err = zc.sendStopStream(seq, streamID, channel) + if err != nil { + return err + } + + timeout := time.NewTimer(zc.DefaultTimeout) + defer timeout.Stop() + defer delete(zc.SeqResponseChans, seq) + + select { + case <-timeout.C: + return fmt.Errorf("timeout") + case res := <-zc.SeqResponseChans[seq]: + if _, ok := res["success"]; !ok { + return fmt.Errorf(res["error"].(string)) + } + if res["success"].(bool) { + return nil + } else { + return fmt.Errorf(res["error"].(string)) + } + } +} +func (zc *ZelloClient) SendTextMessage(channel string, message string, indRecipient string) (err error) { + seq := zc.GetNextSequence() + zc.SeqResponseChans[seq] = make(chan map[string]interface{}) + + err = zc.sendTextMessage(seq, channel, message, indRecipient) + if err != nil { + return err + } + + timeout := time.NewTimer(zc.DefaultTimeout) + defer timeout.Stop() + defer delete(zc.SeqResponseChans, seq) + + select { + case <-timeout.C: + return fmt.Errorf("timeout") + case res := <-zc.SeqResponseChans[seq]: + if _, ok := res["success"]; !ok { + return fmt.Errorf(res["error"].(string)) + } + if res["success"].(bool) { + return nil + } else { + return fmt.Errorf(res["error"].(string)) + } + } +} +func (zc *ZelloClient) SendLocation(channel string, location ZelloLocation, indRecipient string) (err error) { + seq := zc.GetNextSequence() + zc.SeqResponseChans[seq] = make(chan map[string]interface{}) + + err = zc.sendLocation(seq, channel, location, indRecipient) + if err != nil { + return err + } + + timeout := time.NewTimer(zc.DefaultTimeout) + defer timeout.Stop() + defer delete(zc.SeqResponseChans, seq) + + select { + case <-timeout.C: + return fmt.Errorf("timeout") + case res := <-zc.SeqResponseChans[seq]: + if _, ok := res["success"]; !ok { + return fmt.Errorf(res["error"].(string)) + } + if res["success"].(bool) { + return nil + } else { + return fmt.Errorf(res["error"].(string)) + } + } +} +func (zc *ZelloClient) SendRaw(command map[string]interface{}) (err error) { + seq := zc.GetNextSequence() + zc.SeqResponseChans[seq] = make(chan map[string]interface{}) + + command["seq"] = seq + err = zc.sendRaw(command) + if err != nil { + return err + } + + timeout := time.NewTimer(zc.DefaultTimeout) + defer timeout.Stop() + defer delete(zc.SeqResponseChans, seq) + + select { + case <-timeout.C: + return fmt.Errorf("timeout") + case res := <-zc.SeqResponseChans[seq]: + delete(zc.SeqResponseChans, seq) + if res["success"].(bool) { + return nil + } else { + return fmt.Errorf(res["error"].(string)) + } + } +} +func (zc *ZelloClient) SendRawAsync(command map[string]interface{}) (err error, seq int64) { + seq = zc.GetNextSequence() + command["seq"] = float64(seq) + err = zc.sendRaw(command) + if err != nil { + return err, seq + } + return nil, seq +} +func (zc *ZelloClient) SendRawAsyncDirect(command map[string]interface{}, seq int64) (err error) { + command["seq"] = float64(seq) + err = zc.sendRaw(command) + if err != nil { + return err + } + return nil +} +func (zc *ZelloClient) SendBinary(data []byte) (err error) { + zc.txQueueLock.Lock() + defer zc.txQueueLock.Unlock() + + err = zc.Connection.WriteMessage(websocket.BinaryMessage, data) + if err != nil { + return err + } + return nil +} + +func (zc *ZelloClient) handleRXjson(message []byte) (err error) { + zres := ZelloResponse{} + err = json.Unmarshal(message, &zres) + if err != nil { + log.Println(err) + return + } + rawJSON := make(map[string]interface{}) + err = json.Unmarshal(message, &rawJSON) + if err != nil { + log.Println(err) + return + } + if _, ok := zc.SeqResponseChans[zres.Sequence]; ok { + zc.SeqResponseChans[zres.Sequence] <- rawJSON + } else { + zresPacked := ZelloResponsePacked{ + ZelloResponse: zres, + Raw: rawJSON, + } + zc.GeneralEventChan <- zresPacked + } + return nil +} +func (zc *ZelloClient) handleRXbinary(data []byte) (err error) { + //log.Printf("handleRXbinary: %s", data) + zc.BinaryDataChan <- data + return nil +} + +func (zc *ZelloClient) Work() { + for { + msgType, message, err := zc.Connection.ReadMessage() + if err != nil { + log.Println("read:", err) + return + } + if msgType == websocket.TextMessage { + err = zc.handleRXjson(message) + if err != nil { + log.Println("handleRXjson:", err) + return + } + } + if msgType == websocket.BinaryMessage { + err = zc.handleRXbinary(message) + if err != nil { + log.Println("handleRXbinary:", err) + return + } + } + } +} + +// TODO : overthink this +func (zc *ZelloClient) Close() error { + return zc.Connection.Close() +} diff --git a/client/types.go b/client/types.go new file mode 100644 index 0000000..a180f65 --- /dev/null +++ b/client/types.go @@ -0,0 +1,124 @@ +package client + +type ( + ZelloCodecHeader struct { + SampleRate int16 + FramesPerPaket int + FrameSize int + + PacketDuration int + } + ZelloLocation struct { + Latitude float64 `json:"latitude"` + Longitude float64 `json:"longitude"` + FormatedAddress string `json:"formatted_address"` + Accuracy float64 `json:"accuracy"` + } + ZelloCommand struct { + Command string `json:"command"` + Sequence int64 `json:"seq"` + } + ZelloResponse struct { + Sequence int64 `json:"seq"` + Command string `json:"command"` + } + ZelloResponsePacked struct { + ZelloResponse ZelloResponse + Raw map[string]interface{} + } + // Actual Cmds + ZelloLogonReq struct { + ZelloCommand + Username string `json:"username"` + Password string `json:"password"` + + Channels []string `json:"channels,omitempty"` + AuthToken string `json:"auth_token,omitempty"` + ListenOnly bool `json:"listen_only,omitempty"` + + PlatformType string `json:"platform_type,omitempty"` + PlatformName string `json:"platform_name,omitempty"` + } + ZelloLogonResp struct { + ZelloResponse + Success bool `json:"success"` + ErrorMsg string `json:"error,omitempty"` + } + // start_stream + ZelloStartStreamReq struct { + ZelloCommand + Channel string `json:"channel"` + Type string `json:"type"` + Codec string `json:"codec"` + CodecHeader string `json:"codec_header"` + PacketDuration float64 `json:"packet_duration"` + For string `json:"for,omitempty"` + } + ZelloStartStreamResp struct { + ZelloResponse + Success bool `json:"success"` + ErrorMsg string `json:"error,omitempty"` + StreamID int64 `json:"stream_id"` + } + // stop_stream + ZelloStopStreamReq struct { + ZelloCommand + Channel string `json:"channel"` + StreamID int64 `json:"stream_id"` + } + + //Messages + ZelloSendTextMessage struct { + ZelloCommand + Channel string `json:"channel"` + Text string `json:"text"` + For string `json:"for,omitempty"` + } + ZelloSendLocation struct { + ZelloCommand + Channel string `json:"channel"` + ZelloLocation + For string `json:"for,omitempty"` + } + + // Events + ZelloOnStreamStartEvent struct { + ZelloCommand + Channel string `json:"channel"` + StreamID int64 `json:"stream_id"` + Codec string `json:"codec"` + Type string `json:"type"` + From string `json:"from"` + CodecHeader string `json:"codec_header"` + PacketDuration float64 `json:"packet_duration"` + } + ZelloOnChannelStatusEvent struct { + ZelloCommand + Channel string `json:"channel"` + Status string `json:"status"` + UsersOnline float64 `json:"users_online"` + ImagesSupported bool `json:"images_supported"` + TextingSupported bool `json:"texting_supported"` + LocationsSupported bool `json:"locations_supported"` + } + ZelloOnStreamStopEvent struct { + ZelloCommand + StreamID int64 `json:"stream_id"` + } + ZelloOnTextMessageEvent struct { + ZelloCommand + Channel string `json:"channel"` + From string `json:"from"` + For string `json:"for,omitempty"` + MessageID int64 `json:"message_id"` + Text string `json:"text"` + } + ZelloOnLocationEvent struct { + ZelloCommand + Channel string `json:"channel"` + From string `json:"from"` + For string `json:"for,omitempty"` + MessageID int64 `json:"message_id"` + ZelloLocation + } +) diff --git a/client/types_test.go b/client/types_test.go new file mode 100644 index 0000000..ddc6122 --- /dev/null +++ b/client/types_test.go @@ -0,0 +1,39 @@ +package client_test + +import ( + "encoding/json" + "fmt" + "testing" + + "git.cheetah.cat/tetrapack/go-zello-client/client" +) + +func TestLoginMarshalling(t *testing.T) { + var err error + + workLoginReq := client.ZelloLogonReq{ + ZelloCommand: client.ZelloCommand{Command: "logon", Sequence: 1337}, + Username: "username", + Password: "password", + Channels: []string{"chan1", "chan2"}, + } + workBytes, err := json.Marshal(workLoginReq) + if err != nil { + t.Fatal(err) + } + fmt.Println(string(workBytes)) + + consLoginReq := client.ZelloLogonReq{ + ZelloCommand: client.ZelloCommand{Command: "logon", Sequence: 1337}, + Username: "username", + Password: "password", + Channels: []string{"consumer-chan1"}, + AuthToken: "authToken", + } + consBytes, err := json.Marshal(consLoginReq) + if err != nil { + t.Fatal(err) + } + fmt.Println(string(consBytes)) + +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..db8e284 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module git.cheetah.cat/tetrapack/go-zello-client + +go 1.21.1 + +require github.com/gorilla/websocket v1.5.3 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..25a9fc4 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=