From a5276bd3c168b40e81dcb5a773cf4ccb082170c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Grome=C5=A1?= Date: Mon, 9 Jul 2018 17:41:31 +0200 Subject: [PATCH] ESP8266 - Implemented MQTT subscribe and unsubscribe --- src/modules/ESP8266.cpp | 145 +++++++++++++++++++++++++++++++++++----- src/modules/ESP8266.h | 4 ++ 2 files changed, 133 insertions(+), 16 deletions(-) diff --git a/src/modules/ESP8266.cpp b/src/modules/ESP8266.cpp index ad319de8..2ed5f23a 100644 --- a/src/modules/ESP8266.cpp +++ b/src/modules/ESP8266.cpp @@ -4,6 +4,7 @@ ESP8266::ESP8266(Module* module) { portHttp = 80; portMqtt = 1883; _mod = module; + _MqttPacketId = 1; } uint8_t ESP8266::begin(long speed) { @@ -298,24 +299,24 @@ uint8_t ESP8266::MqttConnect(const char* host, const char* clientId, const char* size_t passwordLen = strlen(password); size_t willTopicLen = strlen(willTopic); size_t willMessageLen = strlen(willMessage); - uint32_t len = 10 + 2 + clientIdLen; + uint32_t remainingLength = 10 + (2 + clientIdLen); if(userNameLen > 0) { - len += 2 + userNameLen; + remainingLength += (2 + userNameLen); } if(passwordLen > 0) { - len += 2 + passwordLen; + remainingLength += (2 + passwordLen); } if((willTopicLen > 0) && (willMessageLen > 0)) { - len += 4 + willTopicLen + willMessageLen; + remainingLength += (2 + willTopicLen) + (2 + willMessageLen); } uint8_t encoded[] = {0, 0, 0, 0}; - size_t encodedBytes = MqttEncodeLength(len, encoded); + size_t encodedBytes = MqttEncodeLength(remainingLength, encoded); // build the CONNECT packet - uint8_t* packet = new uint8_t[len + encodedBytes + 2]; + uint8_t* packet = new uint8_t[1 + encodedBytes + remainingLength]; // fixed header - packet[0] = (MQTT_CONNECT << 4) & 0xFF; + packet[0] = (MQTT_CONNECT << 4) | 0b0000; memcpy(packet + 1, encoded, encodedBytes); // variable header @@ -387,13 +388,13 @@ uint8_t ESP8266::MqttConnect(const char* host, const char* clientId, const char* } // send MQTT packet - state = send(packet, len + 2); + state = send(packet, 1 + encodedBytes + remainingLength); delete[] packet; if(state != ERR_NONE) { return(state); } - // get the response length (MQTT response has to be 4 bytes long) + // get the response length (MQTT CONNACK response has to be 4 bytes long) uint16_t numBytes = getNumBytes(); if(numBytes != 4) { return(ERR_RESPONSE_MALFORMED_AT); @@ -402,7 +403,7 @@ uint8_t ESP8266::MqttConnect(const char* host, const char* clientId, const char* // read the response uint8_t* response = new uint8_t[numBytes]; receive(response); - if((response[0x00] == MQTT_CONNACK << 4) && (response[0x01] == 2)) { + if((response[0] == MQTT_CONNACK << 4) && (response[1] == 2)) { uint8_t returnCode = response[0x03]; delete[] response; return(returnCode); @@ -417,7 +418,7 @@ uint8_t ESP8266::MqttDisconnect() { uint8_t packet[2]; // fixed header - packet[0] = (MQTT_DISCONNECT << 4); + packet[0] = (MQTT_DISCONNECT << 4) | 0b0000; packet[1] = 0x00; // send MQTT packet @@ -429,15 +430,15 @@ uint8_t ESP8266::MqttPublish(const char* topic, const char* message) { // encode packet length size_t topicLen = strlen(topic); size_t messageLen = strlen(message); - uint32_t len = 2 + topicLen + messageLen; + uint32_t remainingLength = (2 + topicLen) + messageLen; uint8_t encoded[] = {0, 0, 0, 0}; - size_t encodedBytes = MqttEncodeLength(len, encoded); + size_t encodedBytes = MqttEncodeLength(remainingLength, encoded); // build the PUBLISH packet - uint8_t* packet = new uint8_t[len + 2]; + uint8_t* packet = new uint8_t[1 + encodedBytes + remainingLength]; // fixed header - packet[0] = (MQTT_PUBLISH << 4); + packet[0] = (MQTT_PUBLISH << 4) | 0b0000; memcpy(packet + 1, encoded, encodedBytes); // variable header @@ -448,18 +449,130 @@ uint8_t ESP8266::MqttPublish(const char* topic, const char* message) { memcpy(packet + pos, topic, topicLen); pos += topicLen; + // packet ID + + // payload // message memcpy(packet + pos, message, messageLen); pos += messageLen; // send MQTT packet - uint8_t state = send(packet, len + 2); + uint8_t state = send(packet, 1 + encodedBytes + remainingLength); delete[] packet; return(state); //TODO: implement QoS > 0 and PUBACK response checking } +uint8_t ESP8266::MqttSubscribe(const char* topicFilter) { + // encode packet length + size_t topicFilterLen = strlen(topicFilter); + uint32_t remainingLength = 2 + (2 + topicFilterLen + 1); + uint8_t encoded[] = {0, 0, 0, 0}; + size_t encodedBytes = MqttEncodeLength(remainingLength, encoded); + + // build the SUBSCRIBE packet + uint8_t* packet = new uint8_t[1 + encodedBytes + remainingLength]; + + // fixed header + packet[0] = (MQTT_SUBSCRIBE << 4) | 0b0010; + memcpy(packet + 1, encoded, encodedBytes); + + // variable header + // packet ID + size_t pos = encodedBytes + 1; + uint16_t packetId = _MqttPacketId++; + packet[pos++] = (packetId & 0xFF00) >> 8; + packet[pos++] = packetId & 0x00FF; + + // payload + // topic filter + packet[pos++] = (topicFilterLen & 0xFF00) >> 8;; + packet[pos++] = topicFilterLen & 0x00FF; + memcpy(packet + pos, topicFilter, topicFilterLen); + pos += topicFilterLen; + packet[pos++] = 0x00; // QoS 0 + + // send MQTT packet + uint8_t state = send(packet, 1 + encodedBytes + remainingLength); + delete[] packet; + return(state); + + // get the response length (MQTT SUBACK response has to be 5 bytes long) + uint16_t numBytes = getNumBytes(); + if(numBytes != 5) { + return(ERR_RESPONSE_MALFORMED_AT); + } + + // read the response + uint8_t* response = new uint8_t[numBytes]; + receive(response); + if((response[0] == MQTT_SUBACK << 4) && (response[2] == packetId)) { + uint8_t returnCode = response[0x04]; + delete[] response; + return(returnCode); + } + + delete[] response; + return(ERR_RESPONSE_MALFORMED); +} + +uint8_t ESP8266::MqttUnsubscribe(const char* topicFilter) { + // encode packet length + size_t topicFilterLen = strlen(topicFilter); + uint32_t remainingLength = 2 + (2 + topicFilterLen); + uint8_t encoded[] = {0, 0, 0, 0}; + size_t encodedBytes = MqttEncodeLength(remainingLength, encoded); + + // build the UNSUBSCRIBE packet + uint8_t* packet = new uint8_t[1 + encodedBytes + remainingLength]; + + // fixed header + packet[0] = (MQTT_UNSUBSCRIBE << 4) | 0b0010; + memcpy(packet + 1, encoded, encodedBytes); + + // variable header + // packet ID + size_t pos = encodedBytes + 1; + uint16_t packetId = _MqttPacketId++; + packet[pos++] = (packetId & 0xFF00) >> 8; + packet[pos++] = packetId & 0x00FF; + + // payload + // topic filter + packet[pos++] = (topicFilterLen & 0xFF00) >> 8;; + packet[pos++] = topicFilterLen & 0x00FF; + memcpy(packet + pos, topicFilter, topicFilterLen); + pos += topicFilterLen; + + // send MQTT packet + uint8_t state = send(packet, 1 + encodedBytes + remainingLength); + delete[] packet; + return(state); + + // get the response length (MQTT UNSUBACK response has to be 4 bytes long) + uint16_t numBytes = getNumBytes(); + if(numBytes != 4) { + return(ERR_RESPONSE_MALFORMED_AT); + } + + // read the response + uint8_t* response = new uint8_t[numBytes]; + receive(response); + if((response[0] == MQTT_UNSUBACK << 4) && (response[1] == 2)) { + // check packet ID + uint16_t receivedId = response[2] | response[3] << 8; + delete[] response; + if(receivedId != packetId) { + return(ERR_RESPONSE_MALFORMED); + } + return(ERR_NONE); + } + + delete[] response; + return(ERR_RESPONSE_MALFORMED); +} + uint8_t ESP8266::send(const char* data) { // build AT command char lenStr[8]; diff --git a/src/modules/ESP8266.h b/src/modules/ESP8266.h index a57c5e8c..e97c94bd 100644 --- a/src/modules/ESP8266.h +++ b/src/modules/ESP8266.h @@ -46,6 +46,8 @@ class ESP8266 { uint8_t MqttConnect(const char* host, const char* clientId, const char* userName = "", const char* password = "", uint16_t keepAlive = 60, bool cleanSession = true, const char* willTopic = "", const char* willMessage = ""); uint8_t MqttDisconnect(); uint8_t MqttPublish(const char* topic, const char* message); + uint8_t MqttSubscribe(const char* topicFilter); + uint8_t MqttUnsubscribe(const char* topicFilter); // Transport layer methods uint8_t openTransportConnection(const char* host, const char* protocol, uint16_t port, uint16_t tcpKeepAlive = 0); @@ -57,6 +59,8 @@ class ESP8266 { private: Module* _mod; + uint16_t _MqttPacketId; + size_t MqttEncodeLength(uint32_t len, uint8_t* encoded); uint32_t MqttDecodeLength(uint8_t* encoded);