Added basic MQTT support (CONNECT and PUBLISH)

This commit is contained in:
Jan Gromeš 2018-04-10 20:49:44 +02:00
parent 79b4f6da20
commit 923ddd029d
3 changed files with 242 additions and 10 deletions

View file

@ -51,6 +51,7 @@
#define ERR_URL_MALFORMED 0x02
#define ERR_RESPONSE_MALFORMED_AT 0x03
#define ERR_RESPONSE_MALFORMED 0x04
#define ERR_MQTT_CONNECTION_REFUSED 0x05
// XBee error codes
#define ERR_CMD_MODE_FAILED 0x02

View file

@ -3,6 +3,7 @@
ESP8266::ESP8266(Module* module) {
portTcp = 80; // Default HTTP port (TCP application)
portUdp = 53; // Default DNS port (UDP application)
portMqtt = 1883;
_mod = module;
}
@ -52,7 +53,7 @@ uint8_t ESP8266::join(const char* ssid, const char* password) {
}
// join AP
String cmd = "AT+CWJAP_CUR=\"";
String cmd = "AT+CWJAP_CUR=\"";
cmd += ssid;
cmd += "\",\"";
cmd += password;
@ -184,20 +185,159 @@ uint16_t ESP8266::HttpPost(const char* url, String content, String& response, co
return(statusString.toInt());
}
uint8_t ESP8266::startTcp(const char* host) {
openTransportConnection(host, "TCP", portTcp);
uint8_t ESP8266::MqttConnect(String host, String clientId, String username, String password) {
_MqttHost = host;
// encode packet length
uint32_t len = 16 + clientId.length() + username.length() + password.length();
/*uint8_t encoded[] = {0, 0, 0, 0};
MqttEncodeLength(len, encoded);*/
// build the CONNECT packet
uint8_t* packet = new uint8_t[len + 2];
packet[0] = (MQTT_CONNECT << 4) & 0xFF;
packet[1] = len;
/*for(uint8_t i = 1; i < 4; i++) {
packet[i] = encoded[i];
}*/
packet[2] = 0x00;
packet[3] = 0x04;
packet[4] = 'M';
packet[5] = 'Q';
packet[6] = 'T';
packet[7] = 'T';
packet[8] = 0x04; //protocol level
packet[9] = 0b11000010; //flags: user name + password + clean session
packet[10] = 0x00; //keep-alive interval MSB
packet[11] = 0x3C; //keep-alive interval LSB
packet[12] = 0x00;
packet[13] = clientId.length();
for(uint8_t i = 0; i < clientId.length(); i++) {
packet[i + 14] = (uint8_t)clientId.charAt(i);
}
packet[14 + clientId.length()] = 0x00;
packet[15 + clientId.length()] = username.length();
for(uint8_t i = 0; i < username.length(); i++) {
packet[i + 16 + clientId.length()] = (uint8_t)username.charAt(i);
}
packet[16 + clientId.length() + username.length()] = 0x00;
packet[17 + clientId.length() + username.length()] = password.length();
for(uint8_t i = 0; i < password.length(); i++) {
packet[i + 18 + clientId.length() + username.length()] = (uint8_t)password.charAt(i);
}
/*for(uint8_t i = 0; i < len + 2; i++) {
Serial.print(i);
Serial.print('\t');
Serial.write(packet[i]);
Serial.print("\t0x");
Serial.println(packet[i], HEX);
}*/
// create TCP connection
uint8_t state = openTransportConnection(_MqttHost.c_str(), "TCP", portMqtt, 7200);
if(state != ERR_NONE) {
return(state);
}
// send MQTT packet
state = send(packet, len + 2);
if(state != ERR_NONE) {
return(state);
}
// read the response
/*uint8_t response[] = {0, 0, 0, 0, 0};
receive(response);*/
String raw = receive();
/*for(uint8_t i = 0; i < raw.length(); i++) {
Serial.print(i);
Serial.print('\t');
Serial.write(raw.charAt(i));
Serial.print("\t0x");
Serial.println(raw.charAt(i), HEX);
}*/
// parse the response
int32_t numBytesIndex = raw.indexOf(":");
if(numBytesIndex == -1) {
return(ERR_RESPONSE_MALFORMED_AT);
}
uint8_t response[] = {0, 0, 0, 0};
for(uint8_t i = 0; i < 4; i++) {
response[i] = raw.charAt(i + numBytesIndex + 1);
}
/*for(uint8_t i = 0; i < 4; i++) {
Serial.print(i);
Serial.print('\t');
Serial.write(response[i]);
Serial.print("\t0x");
Serial.println(response[i], HEX);
}*/
if(response[3] != 0x00) {
return(ERR_MQTT_CONNECTION_REFUSED);
}
return(ERR_NONE);
}
uint8_t ESP8266::MqttPublish(String topic, String message) {
// encode packet length
uint32_t len = 2 + topic.length() + message.length();
// build the PUBLISH packet
uint8_t* packet = new uint8_t[len + 2];
packet[0] = (MQTT_PUBLISH << 4) & 0xFF;
packet[1] = len;
packet[2] = 0x00;
packet[3] = topic.length();
for(uint8_t i = 0; i < topic.length(); i++) {
packet[i + 4] = (uint8_t)topic.charAt(i);
}
for(uint8_t i = 0; i < message.length(); i++) {
packet[i + 4 + topic.length()] = (uint8_t)message.charAt(i);
}
/*for(uint8_t i = 0; i < len + 2; i++) {
Serial.print(i);
Serial.print('\t');
Serial.write(packet[i]);
Serial.print("\t0x");
Serial.println(packet[i], HEX);
}*/
// send MQTT packet
uint8_t state = send(packet, len + 2);
if(state != ERR_NONE) {
return(state);
}
return(ERR_NONE);
}
uint8_t ESP8266::startTcp(const char* host, uint16_t tcpKeepAlive) {
return(openTransportConnection(host, "TCP", portTcp, tcpKeepAlive));
}
uint8_t ESP8266::closeTcp() {
closeTransportConnection();
return(closeTransportConnection());
}
uint8_t ESP8266::startUdp(const char* host) {
openTransportConnection(host, "UDP", portUdp);
return(openTransportConnection(host, "UDP", portUdp));
}
uint8_t ESP8266::closeUdp() {
closeTransportConnection();
return(closeTransportConnection());
}
uint8_t ESP8266::send(String data) {
@ -216,6 +356,22 @@ uint8_t ESP8266::send(String data) {
return(ERR_NONE);
}
uint8_t ESP8266::send(uint8_t* data, uint32_t len) {
// send data length in bytes
String cmd = "AT+CIPSEND=";
cmd += len;
if(!_mod->ATsendCommand(cmd)) {
return(ERR_AT_FAILED);
}
// send data
if(!_mod->ATsendData(data, len)) {
return(ERR_AT_FAILED);
}
return(ERR_NONE);
}
String ESP8266::receive(uint32_t timeout) {
String data;
uint32_t start = millis();
@ -231,13 +387,36 @@ String ESP8266::receive(uint32_t timeout) {
return(data);
}
uint8_t ESP8266::openTransportConnection(const char* host, const char* protocol, uint16_t port) {
uint32_t ESP8266::receive(uint8_t* data, uint32_t timeout) {
uint8_t i = 0;
uint32_t start = millis();
while(millis() - start < timeout) {
while(_mod->ModuleSerial->available() > 0) {
uint8_t b = _mod->ModuleSerial->read();
/*Serial.write(b);
Serial.print("\t0x");
Serial.println(b, HEX);*/
#ifdef DEBUG
Serial.print(b);
#endif
data[i] = b;
i++;
}
}
return(i);
}
uint8_t ESP8266::openTransportConnection(const char* host, const char* protocol, uint16_t port, uint16_t tcpKeepAlive) {
String cmd = "AT+CIPSTART=\"";
cmd += protocol;
cmd += "\",\"";
cmd += host;
cmd += "\",";
cmd += port;
if((protocol == "TCP") && (tcpKeepAlive > 0)) {
cmd += ",";
cmd += tcpKeepAlive;
}
if(!_mod->ATsendCommand(cmd)) {
return(ERR_AT_FAILED);
}
@ -250,3 +429,30 @@ uint8_t ESP8266::closeTransportConnection() {
}
return(ERR_NONE);
}
void ESP8266::MqttEncodeLength(uint32_t len, uint8_t* encoded) {
uint8_t i = 0;
do {
encoded[i] = len % 128;
len /= 128;
if(len > 0) {
encoded[i] |= 128;
}
i++;
} while(len > 0);
}
uint32_t ESP8266::MqttDecodeLength(uint8_t* encoded) {
uint32_t mult = 1;
uint32_t len = 0;
uint8_t i = 0;
do {
len += (encoded[i] & 127) * mult;
mult *= 128;
if(mult > 2097152) {
// malformed remaining length
return(0);
}
} while((encoded[i] & 128) != 0);
return len;
}

View file

@ -3,11 +3,26 @@
#include "Module.h"
#define MQTT_CONNECT 0x01
#define MQTT_CONNACK 0x02
#define MQTT_PUBLISH 0x03
#define MQTT_PUBACK 0x04
#define MQTT_PUBREC 0x05
#define MQTT_PUBREL 0x06
#define MQTT_PUBCOMP 0x07
#define MQTT_SUBSCRIBE 0x08
#define MQTT_SUBACK 0x09
#define MQTT_UNSUBSCRIBE 0x0A
#define MQTT_UNSUBACK 0x0B
#define MQTT_PINGREQ 0x0C
#define MQTT_PINGRESP 0x0D
#define MQTT_DISCONNECT 0x0E
class ESP8266 {
public:
ESP8266(Module* module);
uint16_t portTdp, portUdp;
uint16_t portTcp, portUdp, portMqtt;
// Basic methods
uint8_t begin(long speed);
@ -18,19 +33,29 @@ class ESP8266 {
uint16_t HttpGet(const char* url, String& response);
uint16_t HttpPost(const char* url, String content, String& response, const char* contentType = "");
// MQTT methods
uint8_t MqttConnect(String host, String clientId, String username, String password);
uint8_t MqttPublish(String topic, String message);
// Transport layer methods
uint8_t startTcp(const char* host);
uint8_t startTcp(const char* host, uint16_t tcpKeepAlive = 0);
uint8_t closeTcp();
uint8_t startUdp(const char* host);
uint8_t closeUdp();
uint8_t send(String data);
uint8_t send(uint8_t* data, uint32_t len);
String receive(uint32_t timeout = 10000);
uint32_t receive(uint8_t* data, uint32_t timeout = 10000);
private:
Module* _mod;
uint8_t openTransportConnection(const char* host, const char* protocol, uint16_t port);
uint8_t openTransportConnection(const char* host, const char* protocol, uint16_t port, uint16_t tcpKeepAlive = 0);
uint8_t closeTransportConnection();
String _MqttHost;
void MqttEncodeLength(uint32_t len, uint8_t* encoded);
uint32_t MqttDecodeLength(uint8_t* encoded);
};
#endif