From 923ddd029d1f4dd5341ac36ac1bf9f07bf0f38ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Grome=C5=A1?= Date: Tue, 10 Apr 2018 20:49:44 +0200 Subject: [PATCH] Added basic MQTT support (CONNECT and PUBLISH) --- src/TypeDef.h | 1 + src/modules/ESP8266.cpp | 220 ++++++++++++++++++++++++++++++++++++++-- src/modules/ESP8266.h | 31 +++++- 3 files changed, 242 insertions(+), 10 deletions(-) diff --git a/src/TypeDef.h b/src/TypeDef.h index 9ebe6988..66ac4f64 100644 --- a/src/TypeDef.h +++ b/src/TypeDef.h @@ -51,6 +51,7 @@ #define ERR_URL_MALFORMED 0x02 #define ERR_RESPONSE_MALFORMED_AT 0x03 #define ERR_RESPONSE_MALFORMED 0x04 +#define ERR_MQTT_CONNECTION_REFUSED 0x05 // XBee error codes #define ERR_CMD_MODE_FAILED 0x02 diff --git a/src/modules/ESP8266.cpp b/src/modules/ESP8266.cpp index a26ada95..55edc2ea 100644 --- a/src/modules/ESP8266.cpp +++ b/src/modules/ESP8266.cpp @@ -3,6 +3,7 @@ ESP8266::ESP8266(Module* module) { portTcp = 80; // Default HTTP port (TCP application) portUdp = 53; // Default DNS port (UDP application) + portMqtt = 1883; _mod = module; } @@ -52,7 +53,7 @@ uint8_t ESP8266::join(const char* ssid, const char* password) { } // join AP - String cmd = "AT+CWJAP_CUR=\""; + String cmd = "AT+CWJAP_CUR=\""; cmd += ssid; cmd += "\",\""; cmd += password; @@ -184,20 +185,159 @@ uint16_t ESP8266::HttpPost(const char* url, String content, String& response, co return(statusString.toInt()); } -uint8_t ESP8266::startTcp(const char* host) { - openTransportConnection(host, "TCP", portTcp); +uint8_t ESP8266::MqttConnect(String host, String clientId, String username, String password) { + _MqttHost = host; + + // encode packet length + uint32_t len = 16 + clientId.length() + username.length() + password.length(); + /*uint8_t encoded[] = {0, 0, 0, 0}; + MqttEncodeLength(len, encoded);*/ + + // build the CONNECT packet + uint8_t* packet = new uint8_t[len + 2]; + packet[0] = (MQTT_CONNECT << 4) & 0xFF; + packet[1] = len; + /*for(uint8_t i = 1; i < 4; i++) { + packet[i] = encoded[i]; + }*/ + + packet[2] = 0x00; + packet[3] = 0x04; + packet[4] = 'M'; + packet[5] = 'Q'; + packet[6] = 'T'; + packet[7] = 'T'; + packet[8] = 0x04; //protocol level + packet[9] = 0b11000010; //flags: user name + password + clean session + packet[10] = 0x00; //keep-alive interval MSB + packet[11] = 0x3C; //keep-alive interval LSB + + packet[12] = 0x00; + packet[13] = clientId.length(); + for(uint8_t i = 0; i < clientId.length(); i++) { + packet[i + 14] = (uint8_t)clientId.charAt(i); + } + + packet[14 + clientId.length()] = 0x00; + packet[15 + clientId.length()] = username.length(); + for(uint8_t i = 0; i < username.length(); i++) { + packet[i + 16 + clientId.length()] = (uint8_t)username.charAt(i); + } + + packet[16 + clientId.length() + username.length()] = 0x00; + packet[17 + clientId.length() + username.length()] = password.length(); + for(uint8_t i = 0; i < password.length(); i++) { + packet[i + 18 + clientId.length() + username.length()] = (uint8_t)password.charAt(i); + } + + /*for(uint8_t i = 0; i < len + 2; i++) { + Serial.print(i); + Serial.print('\t'); + Serial.write(packet[i]); + Serial.print("\t0x"); + Serial.println(packet[i], HEX); + }*/ + + // create TCP connection + uint8_t state = openTransportConnection(_MqttHost.c_str(), "TCP", portMqtt, 7200); + if(state != ERR_NONE) { + return(state); + } + + // send MQTT packet + state = send(packet, len + 2); + if(state != ERR_NONE) { + return(state); + } + + // read the response + /*uint8_t response[] = {0, 0, 0, 0, 0}; + receive(response);*/ + String raw = receive(); + + /*for(uint8_t i = 0; i < raw.length(); i++) { + Serial.print(i); + Serial.print('\t'); + Serial.write(raw.charAt(i)); + Serial.print("\t0x"); + Serial.println(raw.charAt(i), HEX); + }*/ + + // parse the response + int32_t numBytesIndex = raw.indexOf(":"); + if(numBytesIndex == -1) { + return(ERR_RESPONSE_MALFORMED_AT); + } + + uint8_t response[] = {0, 0, 0, 0}; + for(uint8_t i = 0; i < 4; i++) { + response[i] = raw.charAt(i + numBytesIndex + 1); + } + + /*for(uint8_t i = 0; i < 4; i++) { + Serial.print(i); + Serial.print('\t'); + Serial.write(response[i]); + Serial.print("\t0x"); + Serial.println(response[i], HEX); + }*/ + if(response[3] != 0x00) { + return(ERR_MQTT_CONNECTION_REFUSED); + } + + return(ERR_NONE); +} + +uint8_t ESP8266::MqttPublish(String topic, String message) { + // encode packet length + uint32_t len = 2 + topic.length() + message.length(); + + // build the PUBLISH packet + uint8_t* packet = new uint8_t[len + 2]; + packet[0] = (MQTT_PUBLISH << 4) & 0xFF; + packet[1] = len; + + packet[2] = 0x00; + packet[3] = topic.length(); + for(uint8_t i = 0; i < topic.length(); i++) { + packet[i + 4] = (uint8_t)topic.charAt(i); + } + + for(uint8_t i = 0; i < message.length(); i++) { + packet[i + 4 + topic.length()] = (uint8_t)message.charAt(i); + } + + /*for(uint8_t i = 0; i < len + 2; i++) { + Serial.print(i); + Serial.print('\t'); + Serial.write(packet[i]); + Serial.print("\t0x"); + Serial.println(packet[i], HEX); + }*/ + + // send MQTT packet + uint8_t state = send(packet, len + 2); + if(state != ERR_NONE) { + return(state); + } + + return(ERR_NONE); +} + +uint8_t ESP8266::startTcp(const char* host, uint16_t tcpKeepAlive) { + return(openTransportConnection(host, "TCP", portTcp, tcpKeepAlive)); } uint8_t ESP8266::closeTcp() { - closeTransportConnection(); + return(closeTransportConnection()); } uint8_t ESP8266::startUdp(const char* host) { - openTransportConnection(host, "UDP", portUdp); + return(openTransportConnection(host, "UDP", portUdp)); } uint8_t ESP8266::closeUdp() { - closeTransportConnection(); + return(closeTransportConnection()); } uint8_t ESP8266::send(String data) { @@ -216,6 +356,22 @@ uint8_t ESP8266::send(String data) { return(ERR_NONE); } +uint8_t ESP8266::send(uint8_t* data, uint32_t len) { + // send data length in bytes + String cmd = "AT+CIPSEND="; + cmd += len; + if(!_mod->ATsendCommand(cmd)) { + return(ERR_AT_FAILED); + } + + // send data + if(!_mod->ATsendData(data, len)) { + return(ERR_AT_FAILED); + } + + return(ERR_NONE); +} + String ESP8266::receive(uint32_t timeout) { String data; uint32_t start = millis(); @@ -231,13 +387,36 @@ String ESP8266::receive(uint32_t timeout) { return(data); } -uint8_t ESP8266::openTransportConnection(const char* host, const char* protocol, uint16_t port) { +uint32_t ESP8266::receive(uint8_t* data, uint32_t timeout) { + uint8_t i = 0; + uint32_t start = millis(); + while(millis() - start < timeout) { + while(_mod->ModuleSerial->available() > 0) { + uint8_t b = _mod->ModuleSerial->read(); + /*Serial.write(b); + Serial.print("\t0x"); + Serial.println(b, HEX);*/ + #ifdef DEBUG + Serial.print(b); + #endif + data[i] = b; + i++; + } + } + return(i); +} + +uint8_t ESP8266::openTransportConnection(const char* host, const char* protocol, uint16_t port, uint16_t tcpKeepAlive) { String cmd = "AT+CIPSTART=\""; cmd += protocol; cmd += "\",\""; cmd += host; cmd += "\","; cmd += port; + if((protocol == "TCP") && (tcpKeepAlive > 0)) { + cmd += ","; + cmd += tcpKeepAlive; + } if(!_mod->ATsendCommand(cmd)) { return(ERR_AT_FAILED); } @@ -250,3 +429,30 @@ uint8_t ESP8266::closeTransportConnection() { } return(ERR_NONE); } + +void ESP8266::MqttEncodeLength(uint32_t len, uint8_t* encoded) { + uint8_t i = 0; + do { + encoded[i] = len % 128; + len /= 128; + if(len > 0) { + encoded[i] |= 128; + } + i++; + } while(len > 0); +} + +uint32_t ESP8266::MqttDecodeLength(uint8_t* encoded) { + uint32_t mult = 1; + uint32_t len = 0; + uint8_t i = 0; + do { + len += (encoded[i] & 127) * mult; + mult *= 128; + if(mult > 2097152) { + // malformed remaining length + return(0); + } + } while((encoded[i] & 128) != 0); + return len; +} diff --git a/src/modules/ESP8266.h b/src/modules/ESP8266.h index 41387a6d..77a93007 100644 --- a/src/modules/ESP8266.h +++ b/src/modules/ESP8266.h @@ -3,11 +3,26 @@ #include "Module.h" +#define MQTT_CONNECT 0x01 +#define MQTT_CONNACK 0x02 +#define MQTT_PUBLISH 0x03 +#define MQTT_PUBACK 0x04 +#define MQTT_PUBREC 0x05 +#define MQTT_PUBREL 0x06 +#define MQTT_PUBCOMP 0x07 +#define MQTT_SUBSCRIBE 0x08 +#define MQTT_SUBACK 0x09 +#define MQTT_UNSUBSCRIBE 0x0A +#define MQTT_UNSUBACK 0x0B +#define MQTT_PINGREQ 0x0C +#define MQTT_PINGRESP 0x0D +#define MQTT_DISCONNECT 0x0E + class ESP8266 { public: ESP8266(Module* module); - uint16_t portTdp, portUdp; + uint16_t portTcp, portUdp, portMqtt; // Basic methods uint8_t begin(long speed); @@ -18,19 +33,29 @@ class ESP8266 { uint16_t HttpGet(const char* url, String& response); uint16_t HttpPost(const char* url, String content, String& response, const char* contentType = ""); + // MQTT methods + uint8_t MqttConnect(String host, String clientId, String username, String password); + uint8_t MqttPublish(String topic, String message); + // Transport layer methods - uint8_t startTcp(const char* host); + uint8_t startTcp(const char* host, uint16_t tcpKeepAlive = 0); uint8_t closeTcp(); uint8_t startUdp(const char* host); uint8_t closeUdp(); uint8_t send(String data); + uint8_t send(uint8_t* data, uint32_t len); String receive(uint32_t timeout = 10000); + uint32_t receive(uint8_t* data, uint32_t timeout = 10000); private: Module* _mod; - uint8_t openTransportConnection(const char* host, const char* protocol, uint16_t port); + uint8_t openTransportConnection(const char* host, const char* protocol, uint16_t port, uint16_t tcpKeepAlive = 0); uint8_t closeTransportConnection(); + + String _MqttHost; + void MqttEncodeLength(uint32_t len, uint8_t* encoded); + uint32_t MqttDecodeLength(uint8_t* encoded); }; #endif