initial commit

master
cheetah.cat 5 months ago
commit 4b4c7eefe6

@ -0,0 +1,514 @@
package client
import (
"encoding/base64"
"encoding/json"
"fmt"
"log"
"sync"
"time"
"github.com/gorilla/websocket"
)
type ZelloClient struct {
Connection *websocket.Conn
AuthSuccess bool
JoinedChannels []string
PlatformType string
PlatformName string
URL string
Username string
Password string
DefaultTimeout time.Duration
SeqNum int64
seqNumLock sync.Mutex
txQueueLock sync.Mutex
// Event Map for responses
SeqResponseChans map[int64]chan map[string]interface{}
// Event Chans
GeneralEventChan chan ZelloResponsePacked
BinaryDataChan chan []byte
}
func NewZelloClient() (zelloClient *ZelloClient) {
return &ZelloClient{
seqNumLock: sync.Mutex{},
txQueueLock: sync.Mutex{},
DefaultTimeout: time.Millisecond * 1e3,
SeqResponseChans: make(map[int64]chan map[string]interface{}),
GeneralEventChan: make(chan ZelloResponsePacked),
BinaryDataChan: make(chan []byte),
}
}
func (zc *ZelloClient) Connect(url string) (err error) {
zc.URL = url
zc.Connection, _, err = websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return err
}
fmt.Printf("connected to %s\n", url)
return nil
}
func (zc *ZelloClient) Disconnect() (err error) {
zc.AuthSuccess = false
return zc.Connection.Close()
}
func (zc *ZelloClient) GetNextSequence() int64 {
zc.seqNumLock.Lock()
defer zc.seqNumLock.Unlock()
zc.SeqNum++
return zc.SeqNum
}
// non-public raw pdu sending functions
func (zc *ZelloClient) sendLoginWork(seq int64, channels []string) (err error) {
zc.txQueueLock.Lock()
defer zc.txQueueLock.Unlock()
byt, err := json.Marshal(ZelloLogonReq{
ZelloCommand: ZelloCommand{
Command: "logon",
Sequence: seq,
},
Username: zc.Username,
Password: zc.Password,
Channels: channels,
})
if err != nil {
return err
}
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
if err != nil {
log.Println("write close:", err)
return err
}
return nil
}
func (zc *ZelloClient) sendLoginConsumer(seq int64, channel string, authToken string) (err error) {
zc.txQueueLock.Lock()
defer zc.txQueueLock.Unlock()
byt, err := json.Marshal(ZelloLogonReq{
ZelloCommand: ZelloCommand{
Command: "logon",
Sequence: seq,
},
Username: zc.Username,
Password: zc.Password,
Channels: []string{channel},
AuthToken: authToken,
})
if err != nil {
return err
}
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
if err != nil {
log.Println("write close:", err)
return err
}
return nil
}
func (zc *ZelloClient) sendStartStream(seq int64, channel string, codecParams ZelloCodecHeader, recipient string) (err error) {
zc.txQueueLock.Lock()
defer zc.txQueueLock.Unlock()
codecHeader := make([]byte, 4)
codecHeader[0] = byte(codecParams.SampleRate & 0xff)
codecHeader[1] = byte(codecParams.SampleRate >> 8 & 0xff)
codecHeader[2] = byte(codecParams.FramesPerPaket & 0xff)
codecHeader[3] = byte(codecParams.FrameSize & 0xff)
codecHeaderB64 := base64.StdEncoding.EncodeToString(codecHeader)
byt, err := json.Marshal(ZelloStartStreamReq{
ZelloCommand: ZelloCommand{
Command: "start_stream",
Sequence: seq,
},
Channel: channel,
Type: "audio",
Codec: "opus",
For: recipient,
CodecHeader: codecHeaderB64,
PacketDuration: float64(codecParams.PacketDuration),
})
if err != nil {
return err
}
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
if err != nil {
log.Println("write close:", err)
return err
}
return nil
}
func (zc *ZelloClient) sendStopStream(seq int64, streamID int64, channel string) (err error) {
zc.txQueueLock.Lock()
defer zc.txQueueLock.Unlock()
byt, err := json.Marshal(ZelloStopStreamReq{
ZelloCommand: ZelloCommand{
Command: "stop_stream",
Sequence: seq,
},
Channel: channel,
StreamID: streamID,
})
if err != nil {
return err
}
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
if err != nil {
log.Println("write close:", err)
return err
}
return nil
}
func (zc *ZelloClient) sendTextMessage(seq int64, channel string, message string, recipient string) (err error) {
zc.txQueueLock.Lock()
defer zc.txQueueLock.Unlock()
byt, err := json.Marshal(ZelloSendTextMessage{
ZelloCommand: ZelloCommand{
Command: "send_text_message",
Sequence: seq,
},
Channel: channel,
Text: message,
For: recipient,
})
if err != nil {
return err
}
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
if err != nil {
log.Println("write close:", err)
return err
}
return nil
}
func (zc *ZelloClient) sendLocation(seq int64, channel string, location ZelloLocation, recipient string) (err error) {
zc.txQueueLock.Lock()
defer zc.txQueueLock.Unlock()
byt, err := json.Marshal(ZelloSendLocation{
ZelloCommand: ZelloCommand{
Command: "send_location",
Sequence: seq,
},
Channel: channel,
ZelloLocation: location,
For: recipient,
})
if err != nil {
return err
}
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
if err != nil {
log.Println("write close:", err)
return err
}
return nil
}
func (zc *ZelloClient) sendRaw(command map[string]interface{}) (err error) {
zc.txQueueLock.Lock()
defer zc.txQueueLock.Unlock()
byt, err := json.Marshal(command)
if err != nil {
return err
}
err = zc.Connection.WriteMessage(websocket.TextMessage, byt)
if err != nil {
log.Println("write close:", err)
return err
}
return nil
}
// public functions with timeout and error handlng
func (zc *ZelloClient) SendLoginWork(channels []string) (err error) {
seq := zc.GetNextSequence()
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
err = zc.sendLoginWork(seq, channels)
if err != nil {
return err
}
timeout := time.NewTimer(zc.DefaultTimeout)
defer timeout.Stop()
defer delete(zc.SeqResponseChans, seq)
select {
case <-timeout.C:
return fmt.Errorf("timeout")
case res := <-zc.SeqResponseChans[seq]:
delete(zc.SeqResponseChans, seq)
if res["success"].(bool) {
zc.AuthSuccess = true
zc.JoinedChannels = channels
return nil
} else {
return fmt.Errorf(res["error"].(string))
}
}
}
func (zc *ZelloClient) SendLoginConsmer(channel string, authToken string) (err error) {
seq := zc.GetNextSequence()
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
err = zc.sendLoginConsumer(seq, channel, authToken)
if err != nil {
return err
}
timeout := time.NewTimer(zc.DefaultTimeout)
defer timeout.Stop()
defer delete(zc.SeqResponseChans, seq)
select {
case <-timeout.C:
return fmt.Errorf("timeout")
case res := <-zc.SeqResponseChans[seq]:
delete(zc.SeqResponseChans, seq)
if res["success"].(bool) {
zc.AuthSuccess = true
zc.JoinedChannels = []string{channel}
return nil
} else {
return fmt.Errorf(res["error"].(string))
}
}
}
func (zc *ZelloClient) StartStream(channel string, codecParams ZelloCodecHeader, indRecipient string) (streamID int64, err error) {
seq := zc.GetNextSequence()
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
err = zc.sendStartStream(seq, channel, codecParams, indRecipient)
if err != nil {
return -1, err
}
timeout := time.NewTimer(zc.DefaultTimeout)
defer timeout.Stop()
defer delete(zc.SeqResponseChans, seq)
select {
case <-timeout.C:
return -1, fmt.Errorf("timeout")
case res := <-zc.SeqResponseChans[seq]:
if _, ok := res["success"]; !ok {
return -1, fmt.Errorf(res["error"].(string))
}
if res["success"].(bool) {
streamID = int64(res["stream_id"].(float64))
return streamID, nil
} else {
return -1, fmt.Errorf(res["error"].(string))
}
}
}
func (zc *ZelloClient) StopStream(channel string, streamID int64) (err error) {
seq := zc.GetNextSequence()
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
err = zc.sendStopStream(seq, streamID, channel)
if err != nil {
return err
}
timeout := time.NewTimer(zc.DefaultTimeout)
defer timeout.Stop()
defer delete(zc.SeqResponseChans, seq)
select {
case <-timeout.C:
return fmt.Errorf("timeout")
case res := <-zc.SeqResponseChans[seq]:
if _, ok := res["success"]; !ok {
return fmt.Errorf(res["error"].(string))
}
if res["success"].(bool) {
return nil
} else {
return fmt.Errorf(res["error"].(string))
}
}
}
func (zc *ZelloClient) SendTextMessage(channel string, message string, indRecipient string) (err error) {
seq := zc.GetNextSequence()
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
err = zc.sendTextMessage(seq, channel, message, indRecipient)
if err != nil {
return err
}
timeout := time.NewTimer(zc.DefaultTimeout)
defer timeout.Stop()
defer delete(zc.SeqResponseChans, seq)
select {
case <-timeout.C:
return fmt.Errorf("timeout")
case res := <-zc.SeqResponseChans[seq]:
if _, ok := res["success"]; !ok {
return fmt.Errorf(res["error"].(string))
}
if res["success"].(bool) {
return nil
} else {
return fmt.Errorf(res["error"].(string))
}
}
}
func (zc *ZelloClient) SendLocation(channel string, location ZelloLocation, indRecipient string) (err error) {
seq := zc.GetNextSequence()
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
err = zc.sendLocation(seq, channel, location, indRecipient)
if err != nil {
return err
}
timeout := time.NewTimer(zc.DefaultTimeout)
defer timeout.Stop()
defer delete(zc.SeqResponseChans, seq)
select {
case <-timeout.C:
return fmt.Errorf("timeout")
case res := <-zc.SeqResponseChans[seq]:
if _, ok := res["success"]; !ok {
return fmt.Errorf(res["error"].(string))
}
if res["success"].(bool) {
return nil
} else {
return fmt.Errorf(res["error"].(string))
}
}
}
func (zc *ZelloClient) SendRaw(command map[string]interface{}) (err error) {
seq := zc.GetNextSequence()
zc.SeqResponseChans[seq] = make(chan map[string]interface{})
command["seq"] = seq
err = zc.sendRaw(command)
if err != nil {
return err
}
timeout := time.NewTimer(zc.DefaultTimeout)
defer timeout.Stop()
defer delete(zc.SeqResponseChans, seq)
select {
case <-timeout.C:
return fmt.Errorf("timeout")
case res := <-zc.SeqResponseChans[seq]:
delete(zc.SeqResponseChans, seq)
if res["success"].(bool) {
return nil
} else {
return fmt.Errorf(res["error"].(string))
}
}
}
func (zc *ZelloClient) SendRawAsync(command map[string]interface{}) (err error, seq int64) {
seq = zc.GetNextSequence()
command["seq"] = float64(seq)
err = zc.sendRaw(command)
if err != nil {
return err, seq
}
return nil, seq
}
func (zc *ZelloClient) SendRawAsyncDirect(command map[string]interface{}, seq int64) (err error) {
command["seq"] = float64(seq)
err = zc.sendRaw(command)
if err != nil {
return err
}
return nil
}
func (zc *ZelloClient) SendBinary(data []byte) (err error) {
zc.txQueueLock.Lock()
defer zc.txQueueLock.Unlock()
err = zc.Connection.WriteMessage(websocket.BinaryMessage, data)
if err != nil {
return err
}
return nil
}
func (zc *ZelloClient) handleRXjson(message []byte) (err error) {
zres := ZelloResponse{}
err = json.Unmarshal(message, &zres)
if err != nil {
log.Println(err)
return
}
rawJSON := make(map[string]interface{})
err = json.Unmarshal(message, &rawJSON)
if err != nil {
log.Println(err)
return
}
if _, ok := zc.SeqResponseChans[zres.Sequence]; ok {
zc.SeqResponseChans[zres.Sequence] <- rawJSON
} else {
zresPacked := ZelloResponsePacked{
ZelloResponse: zres,
Raw: rawJSON,
}
zc.GeneralEventChan <- zresPacked
}
return nil
}
func (zc *ZelloClient) handleRXbinary(data []byte) (err error) {
//log.Printf("handleRXbinary: %s", data)
zc.BinaryDataChan <- data
return nil
}
func (zc *ZelloClient) Work() {
for {
msgType, message, err := zc.Connection.ReadMessage()
if err != nil {
log.Println("read:", err)
return
}
if msgType == websocket.TextMessage {
err = zc.handleRXjson(message)
if err != nil {
log.Println("handleRXjson:", err)
return
}
}
if msgType == websocket.BinaryMessage {
err = zc.handleRXbinary(message)
if err != nil {
log.Println("handleRXbinary:", err)
return
}
}
}
}
// TODO : overthink this
func (zc *ZelloClient) Close() error {
return zc.Connection.Close()
}

@ -0,0 +1,124 @@
package client
type (
ZelloCodecHeader struct {
SampleRate int16
FramesPerPaket int
FrameSize int
PacketDuration int
}
ZelloLocation struct {
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
FormatedAddress string `json:"formatted_address"`
Accuracy float64 `json:"accuracy"`
}
ZelloCommand struct {
Command string `json:"command"`
Sequence int64 `json:"seq"`
}
ZelloResponse struct {
Sequence int64 `json:"seq"`
Command string `json:"command"`
}
ZelloResponsePacked struct {
ZelloResponse ZelloResponse
Raw map[string]interface{}
}
// Actual Cmds
ZelloLogonReq struct {
ZelloCommand
Username string `json:"username"`
Password string `json:"password"`
Channels []string `json:"channels,omitempty"`
AuthToken string `json:"auth_token,omitempty"`
ListenOnly bool `json:"listen_only,omitempty"`
PlatformType string `json:"platform_type,omitempty"`
PlatformName string `json:"platform_name,omitempty"`
}
ZelloLogonResp struct {
ZelloResponse
Success bool `json:"success"`
ErrorMsg string `json:"error,omitempty"`
}
// start_stream
ZelloStartStreamReq struct {
ZelloCommand
Channel string `json:"channel"`
Type string `json:"type"`
Codec string `json:"codec"`
CodecHeader string `json:"codec_header"`
PacketDuration float64 `json:"packet_duration"`
For string `json:"for,omitempty"`
}
ZelloStartStreamResp struct {
ZelloResponse
Success bool `json:"success"`
ErrorMsg string `json:"error,omitempty"`
StreamID int64 `json:"stream_id"`
}
// stop_stream
ZelloStopStreamReq struct {
ZelloCommand
Channel string `json:"channel"`
StreamID int64 `json:"stream_id"`
}
//Messages
ZelloSendTextMessage struct {
ZelloCommand
Channel string `json:"channel"`
Text string `json:"text"`
For string `json:"for,omitempty"`
}
ZelloSendLocation struct {
ZelloCommand
Channel string `json:"channel"`
ZelloLocation
For string `json:"for,omitempty"`
}
// Events
ZelloOnStreamStartEvent struct {
ZelloCommand
Channel string `json:"channel"`
StreamID int64 `json:"stream_id"`
Codec string `json:"codec"`
Type string `json:"type"`
From string `json:"from"`
CodecHeader string `json:"codec_header"`
PacketDuration float64 `json:"packet_duration"`
}
ZelloOnChannelStatusEvent struct {
ZelloCommand
Channel string `json:"channel"`
Status string `json:"status"`
UsersOnline float64 `json:"users_online"`
ImagesSupported bool `json:"images_supported"`
TextingSupported bool `json:"texting_supported"`
LocationsSupported bool `json:"locations_supported"`
}
ZelloOnStreamStopEvent struct {
ZelloCommand
StreamID int64 `json:"stream_id"`
}
ZelloOnTextMessageEvent struct {
ZelloCommand
Channel string `json:"channel"`
From string `json:"from"`
For string `json:"for,omitempty"`
MessageID int64 `json:"message_id"`
Text string `json:"text"`
}
ZelloOnLocationEvent struct {
ZelloCommand
Channel string `json:"channel"`
From string `json:"from"`
For string `json:"for,omitempty"`
MessageID int64 `json:"message_id"`
ZelloLocation
}
)

@ -0,0 +1,39 @@
package client_test
import (
"encoding/json"
"fmt"
"testing"
"git.cheetah.cat/tetrapack/go-zello-client/client"
)
func TestLoginMarshalling(t *testing.T) {
var err error
workLoginReq := client.ZelloLogonReq{
ZelloCommand: client.ZelloCommand{Command: "logon", Sequence: 1337},
Username: "username",
Password: "password",
Channels: []string{"chan1", "chan2"},
}
workBytes, err := json.Marshal(workLoginReq)
if err != nil {
t.Fatal(err)
}
fmt.Println(string(workBytes))
consLoginReq := client.ZelloLogonReq{
ZelloCommand: client.ZelloCommand{Command: "logon", Sequence: 1337},
Username: "username",
Password: "password",
Channels: []string{"consumer-chan1"},
AuthToken: "authToken",
}
consBytes, err := json.Marshal(consLoginReq)
if err != nil {
t.Fatal(err)
}
fmt.Println(string(consBytes))
}

@ -0,0 +1,5 @@
module git.cheetah.cat/tetrapack/go-zello-client
go 1.21.1
require github.com/gorilla/websocket v1.5.3 // indirect

@ -0,0 +1,2 @@
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
Loading…
Cancel
Save