ESP8266 - Implemented MQTT subscribe and unsubscribe
This commit is contained in:
parent
146822d84e
commit
a5276bd3c1
2 changed files with 133 additions and 16 deletions
|
@ -4,6 +4,7 @@ ESP8266::ESP8266(Module* module) {
|
|||
portHttp = 80;
|
||||
portMqtt = 1883;
|
||||
_mod = module;
|
||||
_MqttPacketId = 1;
|
||||
}
|
||||
|
||||
uint8_t ESP8266::begin(long speed) {
|
||||
|
@ -298,24 +299,24 @@ uint8_t ESP8266::MqttConnect(const char* host, const char* clientId, const char*
|
|||
size_t passwordLen = strlen(password);
|
||||
size_t willTopicLen = strlen(willTopic);
|
||||
size_t willMessageLen = strlen(willMessage);
|
||||
uint32_t len = 10 + 2 + clientIdLen;
|
||||
uint32_t remainingLength = 10 + (2 + clientIdLen);
|
||||
if(userNameLen > 0) {
|
||||
len += 2 + userNameLen;
|
||||
remainingLength += (2 + userNameLen);
|
||||
}
|
||||
if(passwordLen > 0) {
|
||||
len += 2 + passwordLen;
|
||||
remainingLength += (2 + passwordLen);
|
||||
}
|
||||
if((willTopicLen > 0) && (willMessageLen > 0)) {
|
||||
len += 4 + willTopicLen + willMessageLen;
|
||||
remainingLength += (2 + willTopicLen) + (2 + willMessageLen);
|
||||
}
|
||||
uint8_t encoded[] = {0, 0, 0, 0};
|
||||
size_t encodedBytes = MqttEncodeLength(len, encoded);
|
||||
size_t encodedBytes = MqttEncodeLength(remainingLength, encoded);
|
||||
|
||||
// build the CONNECT packet
|
||||
uint8_t* packet = new uint8_t[len + encodedBytes + 2];
|
||||
uint8_t* packet = new uint8_t[1 + encodedBytes + remainingLength];
|
||||
|
||||
// fixed header
|
||||
packet[0] = (MQTT_CONNECT << 4) & 0xFF;
|
||||
packet[0] = (MQTT_CONNECT << 4) | 0b0000;
|
||||
memcpy(packet + 1, encoded, encodedBytes);
|
||||
|
||||
// variable header
|
||||
|
@ -387,13 +388,13 @@ uint8_t ESP8266::MqttConnect(const char* host, const char* clientId, const char*
|
|||
}
|
||||
|
||||
// send MQTT packet
|
||||
state = send(packet, len + 2);
|
||||
state = send(packet, 1 + encodedBytes + remainingLength);
|
||||
delete[] packet;
|
||||
if(state != ERR_NONE) {
|
||||
return(state);
|
||||
}
|
||||
|
||||
// get the response length (MQTT response has to be 4 bytes long)
|
||||
// get the response length (MQTT CONNACK response has to be 4 bytes long)
|
||||
uint16_t numBytes = getNumBytes();
|
||||
if(numBytes != 4) {
|
||||
return(ERR_RESPONSE_MALFORMED_AT);
|
||||
|
@ -402,7 +403,7 @@ uint8_t ESP8266::MqttConnect(const char* host, const char* clientId, const char*
|
|||
// read the response
|
||||
uint8_t* response = new uint8_t[numBytes];
|
||||
receive(response);
|
||||
if((response[0x00] == MQTT_CONNACK << 4) && (response[0x01] == 2)) {
|
||||
if((response[0] == MQTT_CONNACK << 4) && (response[1] == 2)) {
|
||||
uint8_t returnCode = response[0x03];
|
||||
delete[] response;
|
||||
return(returnCode);
|
||||
|
@ -417,7 +418,7 @@ uint8_t ESP8266::MqttDisconnect() {
|
|||
uint8_t packet[2];
|
||||
|
||||
// fixed header
|
||||
packet[0] = (MQTT_DISCONNECT << 4);
|
||||
packet[0] = (MQTT_DISCONNECT << 4) | 0b0000;
|
||||
packet[1] = 0x00;
|
||||
|
||||
// send MQTT packet
|
||||
|
@ -429,15 +430,15 @@ uint8_t ESP8266::MqttPublish(const char* topic, const char* message) {
|
|||
// encode packet length
|
||||
size_t topicLen = strlen(topic);
|
||||
size_t messageLen = strlen(message);
|
||||
uint32_t len = 2 + topicLen + messageLen;
|
||||
uint32_t remainingLength = (2 + topicLen) + messageLen;
|
||||
uint8_t encoded[] = {0, 0, 0, 0};
|
||||
size_t encodedBytes = MqttEncodeLength(len, encoded);
|
||||
size_t encodedBytes = MqttEncodeLength(remainingLength, encoded);
|
||||
|
||||
// build the PUBLISH packet
|
||||
uint8_t* packet = new uint8_t[len + 2];
|
||||
uint8_t* packet = new uint8_t[1 + encodedBytes + remainingLength];
|
||||
|
||||
// fixed header
|
||||
packet[0] = (MQTT_PUBLISH << 4);
|
||||
packet[0] = (MQTT_PUBLISH << 4) | 0b0000;
|
||||
memcpy(packet + 1, encoded, encodedBytes);
|
||||
|
||||
// variable header
|
||||
|
@ -448,18 +449,130 @@ uint8_t ESP8266::MqttPublish(const char* topic, const char* message) {
|
|||
memcpy(packet + pos, topic, topicLen);
|
||||
pos += topicLen;
|
||||
|
||||
// packet ID
|
||||
|
||||
// payload
|
||||
// message
|
||||
memcpy(packet + pos, message, messageLen);
|
||||
pos += messageLen;
|
||||
|
||||
// send MQTT packet
|
||||
uint8_t state = send(packet, len + 2);
|
||||
uint8_t state = send(packet, 1 + encodedBytes + remainingLength);
|
||||
delete[] packet;
|
||||
return(state);
|
||||
|
||||
//TODO: implement QoS > 0 and PUBACK response checking
|
||||
}
|
||||
|
||||
uint8_t ESP8266::MqttSubscribe(const char* topicFilter) {
|
||||
// encode packet length
|
||||
size_t topicFilterLen = strlen(topicFilter);
|
||||
uint32_t remainingLength = 2 + (2 + topicFilterLen + 1);
|
||||
uint8_t encoded[] = {0, 0, 0, 0};
|
||||
size_t encodedBytes = MqttEncodeLength(remainingLength, encoded);
|
||||
|
||||
// build the SUBSCRIBE packet
|
||||
uint8_t* packet = new uint8_t[1 + encodedBytes + remainingLength];
|
||||
|
||||
// fixed header
|
||||
packet[0] = (MQTT_SUBSCRIBE << 4) | 0b0010;
|
||||
memcpy(packet + 1, encoded, encodedBytes);
|
||||
|
||||
// variable header
|
||||
// packet ID
|
||||
size_t pos = encodedBytes + 1;
|
||||
uint16_t packetId = _MqttPacketId++;
|
||||
packet[pos++] = (packetId & 0xFF00) >> 8;
|
||||
packet[pos++] = packetId & 0x00FF;
|
||||
|
||||
// payload
|
||||
// topic filter
|
||||
packet[pos++] = (topicFilterLen & 0xFF00) >> 8;;
|
||||
packet[pos++] = topicFilterLen & 0x00FF;
|
||||
memcpy(packet + pos, topicFilter, topicFilterLen);
|
||||
pos += topicFilterLen;
|
||||
packet[pos++] = 0x00; // QoS 0
|
||||
|
||||
// send MQTT packet
|
||||
uint8_t state = send(packet, 1 + encodedBytes + remainingLength);
|
||||
delete[] packet;
|
||||
return(state);
|
||||
|
||||
// get the response length (MQTT SUBACK response has to be 5 bytes long)
|
||||
uint16_t numBytes = getNumBytes();
|
||||
if(numBytes != 5) {
|
||||
return(ERR_RESPONSE_MALFORMED_AT);
|
||||
}
|
||||
|
||||
// read the response
|
||||
uint8_t* response = new uint8_t[numBytes];
|
||||
receive(response);
|
||||
if((response[0] == MQTT_SUBACK << 4) && (response[2] == packetId)) {
|
||||
uint8_t returnCode = response[0x04];
|
||||
delete[] response;
|
||||
return(returnCode);
|
||||
}
|
||||
|
||||
delete[] response;
|
||||
return(ERR_RESPONSE_MALFORMED);
|
||||
}
|
||||
|
||||
uint8_t ESP8266::MqttUnsubscribe(const char* topicFilter) {
|
||||
// encode packet length
|
||||
size_t topicFilterLen = strlen(topicFilter);
|
||||
uint32_t remainingLength = 2 + (2 + topicFilterLen);
|
||||
uint8_t encoded[] = {0, 0, 0, 0};
|
||||
size_t encodedBytes = MqttEncodeLength(remainingLength, encoded);
|
||||
|
||||
// build the UNSUBSCRIBE packet
|
||||
uint8_t* packet = new uint8_t[1 + encodedBytes + remainingLength];
|
||||
|
||||
// fixed header
|
||||
packet[0] = (MQTT_UNSUBSCRIBE << 4) | 0b0010;
|
||||
memcpy(packet + 1, encoded, encodedBytes);
|
||||
|
||||
// variable header
|
||||
// packet ID
|
||||
size_t pos = encodedBytes + 1;
|
||||
uint16_t packetId = _MqttPacketId++;
|
||||
packet[pos++] = (packetId & 0xFF00) >> 8;
|
||||
packet[pos++] = packetId & 0x00FF;
|
||||
|
||||
// payload
|
||||
// topic filter
|
||||
packet[pos++] = (topicFilterLen & 0xFF00) >> 8;;
|
||||
packet[pos++] = topicFilterLen & 0x00FF;
|
||||
memcpy(packet + pos, topicFilter, topicFilterLen);
|
||||
pos += topicFilterLen;
|
||||
|
||||
// send MQTT packet
|
||||
uint8_t state = send(packet, 1 + encodedBytes + remainingLength);
|
||||
delete[] packet;
|
||||
return(state);
|
||||
|
||||
// get the response length (MQTT UNSUBACK response has to be 4 bytes long)
|
||||
uint16_t numBytes = getNumBytes();
|
||||
if(numBytes != 4) {
|
||||
return(ERR_RESPONSE_MALFORMED_AT);
|
||||
}
|
||||
|
||||
// read the response
|
||||
uint8_t* response = new uint8_t[numBytes];
|
||||
receive(response);
|
||||
if((response[0] == MQTT_UNSUBACK << 4) && (response[1] == 2)) {
|
||||
// check packet ID
|
||||
uint16_t receivedId = response[2] | response[3] << 8;
|
||||
delete[] response;
|
||||
if(receivedId != packetId) {
|
||||
return(ERR_RESPONSE_MALFORMED);
|
||||
}
|
||||
return(ERR_NONE);
|
||||
}
|
||||
|
||||
delete[] response;
|
||||
return(ERR_RESPONSE_MALFORMED);
|
||||
}
|
||||
|
||||
uint8_t ESP8266::send(const char* data) {
|
||||
// build AT command
|
||||
char lenStr[8];
|
||||
|
|
|
@ -46,6 +46,8 @@ class ESP8266 {
|
|||
uint8_t MqttConnect(const char* host, const char* clientId, const char* userName = "", const char* password = "", uint16_t keepAlive = 60, bool cleanSession = true, const char* willTopic = "", const char* willMessage = "");
|
||||
uint8_t MqttDisconnect();
|
||||
uint8_t MqttPublish(const char* topic, const char* message);
|
||||
uint8_t MqttSubscribe(const char* topicFilter);
|
||||
uint8_t MqttUnsubscribe(const char* topicFilter);
|
||||
|
||||
// Transport layer methods
|
||||
uint8_t openTransportConnection(const char* host, const char* protocol, uint16_t port, uint16_t tcpKeepAlive = 0);
|
||||
|
@ -57,6 +59,8 @@ class ESP8266 {
|
|||
private:
|
||||
Module* _mod;
|
||||
|
||||
uint16_t _MqttPacketId;
|
||||
|
||||
size_t MqttEncodeLength(uint32_t len, uint8_t* encoded);
|
||||
uint32_t MqttDecodeLength(uint8_t* encoded);
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue