diff --git a/.gitignore b/.gitignore index 1c3ba189..a42cc406 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,5 @@ tests/bin +.pioenvs +.piolibdeps +.clang_complete +.gcc-flags.json diff --git a/CHANGES.txt b/CHANGES.txt index 8c8bef64..ff4da62a 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,8 +1,16 @@ +2.7 + * Fix remaining-length handling to prevent buffer overrun + * Add large-payload API - beginPublish/write/publish/endPublish + * Add yield call to improve reliability on ESP + * Add Clean Session flag to connect options + * Add ESP32 support for functional callback signature + * Various other fixes + 2.4 * Add MQTT_SOCKET_TIMEOUT to prevent it blocking indefinitely whilst waiting for inbound data * Fixed return code when publishing >256 bytes - + 2.3 * Add publish(topic,payload,retained) function diff --git a/README.md b/README.md index c3b5cce4..2751cb52 100644 --- a/README.md +++ b/README.md @@ -8,14 +8,14 @@ a server that supports MQTT. The library comes with a number of example sketches. See File > Examples > PubSubClient within the Arduino application. -Full API documentation is available here: http://pubsubclient.knolleary.net +Full API documentation is available here: https://pubsubclient.knolleary.net ## Limitations - It can only publish QoS 0 messages. It can subscribe at QoS 0 or QoS 1. - - The maximum message size, including header, is **128 bytes** by default. This - is configurable via `MQTT_MAX_PACKET_SIZE` in `PubSubClient.h` or at runtime - using PubSubClient::setBufferSize(). + - The maximum message size, including header, is **256 bytes** by default. This + is configurable via `MQTT_MAX_PACKET_SIZE` in `PubSubClient.h` or can be changed + by calling `PubSubClient::setBufferSize(size)`. - The keepalive interval is set to 15 seconds by default. This is configurable via `MQTT_KEEPALIVE` in `PubSubClient.h`. - The client uses MQTT 3.1.1 by default. It can be changed to use MQTT 3.1 by @@ -38,6 +38,7 @@ boards and shields, including: - TI CC3000 WiFi - [library](https://github.com/sparkfun/SFE_CC3000_Library) - Intel Galileo/Edison - ESP8266 + - ESP32 The library cannot currently be used with hardware based on the ENC28J60 chip – such as the Nanode or the Nuelectronics Ethernet Shield. For those, there is an diff --git a/examples/mqtt_esp8266/mqtt_esp8266.ino b/examples/mqtt_esp8266/mqtt_esp8266.ino index 34333c9c..7e6effd2 100644 --- a/examples/mqtt_esp8266/mqtt_esp8266.ino +++ b/examples/mqtt_esp8266/mqtt_esp8266.ino @@ -1,26 +1,21 @@ /* Basic ESP8266 MQTT example - This sketch demonstrates the capabilities of the pubsub library in combination with the ESP8266 board/library. - It connects to an MQTT server then: - publishes "hello world" to the topic "outTopic" every two seconds - subscribes to the topic "inTopic", printing out any messages it receives. NB - it assumes the received payloads are strings not binary - If the first character of the topic "inTopic" is an 1, switch ON the ESP Led, else switch it off - It will reconnect to the server if the connection is lost using a blocking reconnect function. See the 'mqtt_reconnect_nonblocking' example for how to achieve the same result without blocking the main loop. - To install the ESP8266 board, (using Arduino 1.6.4+): - Add the following 3rd party board manager under "File -> Preferences -> Additional Boards Manager URLs": http://arduino.esp8266.com/stable/package_esp8266com_index.json - Open the "Tools -> Board -> Board Manager" and click install for the ESP8266" - Select your ESP8266 in "Tools -> Board" - */ #include @@ -34,8 +29,9 @@ const char* mqtt_server = "broker.mqtt-dashboard.com"; WiFiClient espClient; PubSubClient client(espClient); -long lastMsg = 0; -char msg[50]; +unsigned long lastMsg = 0; +#define MSG_BUFFER_SIZE (50) +char msg[MSG_BUFFER_SIZE]; int value = 0; void setup_wifi() { @@ -46,6 +42,7 @@ void setup_wifi() { Serial.print("Connecting to "); Serial.println(ssid); + WiFi.mode(WIFI_STA); WiFi.begin(ssid, password); while (WiFi.status() != WL_CONNECTED) { @@ -74,7 +71,7 @@ void callback(char* topic, byte* payload, unsigned int length) { if ((char)payload[0] == '1') { digitalWrite(BUILTIN_LED, LOW); // Turn the LED on (Note that LOW is the voltage level // but actually the LED is on; this is because - // it is acive low on the ESP-01) + // it is active low on the ESP-01) } else { digitalWrite(BUILTIN_LED, HIGH); // Turn the LED off by making the voltage HIGH } @@ -120,11 +117,11 @@ void loop() { } client.loop(); - long now = millis(); + unsigned long now = millis(); if (now - lastMsg > 2000) { lastMsg = now; ++value; - snprintf (msg, 75, "hello world #%ld", value); + snprintf (msg, MSG_BUFFER_SIZE, "hello world #%ld", value); Serial.print("Publish message: "); Serial.println(msg); client.publish("outTopic", msg); diff --git a/examples/mqtt_large_message/mqtt_large_message.ino b/examples/mqtt_large_message/mqtt_large_message.ino new file mode 100644 index 00000000..e048c3ed --- /dev/null +++ b/examples/mqtt_large_message/mqtt_large_message.ino @@ -0,0 +1,179 @@ +/* + Long message ESP8266 MQTT example + + This sketch demonstrates sending arbitrarily large messages in combination + with the ESP8266 board/library. + + It connects to an MQTT server then: + - publishes "hello world" to the topic "outTopic" + - subscribes to the topic "greenBottles/#", printing out any messages + it receives. NB - it assumes the received payloads are strings not binary + - If the sub-topic is a number, it publishes a "greenBottles/lyrics" message + with a payload consisting of the lyrics to "10 green bottles", replacing + 10 with the number given in the sub-topic. + + It will reconnect to the server if the connection is lost using a blocking + reconnect function. See the 'mqtt_reconnect_nonblocking' example for how to + achieve the same result without blocking the main loop. + + To install the ESP8266 board, (using Arduino 1.6.4+): + - Add the following 3rd party board manager under "File -> Preferences -> Additional Boards Manager URLs": + http://arduino.esp8266.com/stable/package_esp8266com_index.json + - Open the "Tools -> Board -> Board Manager" and click install for the ESP8266" + - Select your ESP8266 in "Tools -> Board" + +*/ + +#include +#include + +// Update these with values suitable for your network. + +const char* ssid = "........"; +const char* password = "........"; +const char* mqtt_server = "broker.mqtt-dashboard.com"; + +WiFiClient espClient; +PubSubClient client(espClient); +long lastMsg = 0; +char msg[50]; +int value = 0; + +void setup_wifi() { + + delay(10); + // We start by connecting to a WiFi network + Serial.println(); + Serial.print("Connecting to "); + Serial.println(ssid); + + WiFi.begin(ssid, password); + + while (WiFi.status() != WL_CONNECTED) { + delay(500); + Serial.print("."); + } + + randomSeed(micros()); + + Serial.println(""); + Serial.println("WiFi connected"); + Serial.println("IP address: "); + Serial.println(WiFi.localIP()); +} + +void callback(char* topic, byte* payload, unsigned int length) { + Serial.print("Message arrived ["); + Serial.print(topic); + Serial.print("] "); + for (int i = 0; i < length; i++) { + Serial.print((char)payload[i]); + } + Serial.println(); + + // Find out how many bottles we should generate lyrics for + String topicStr(topic); + int bottleCount = 0; // assume no bottles unless we correctly parse a value from the topic + if (topicStr.indexOf('/') >= 0) { + // The topic includes a '/', we'll try to read the number of bottles from just after that + topicStr.remove(0, topicStr.indexOf('/')+1); + // Now see if there's a number of bottles after the '/' + bottleCount = topicStr.toInt(); + } + + if (bottleCount > 0) { + // Work out how big our resulting message will be + int msgLen = 0; + for (int i = bottleCount; i > 0; i--) { + String numBottles(i); + msgLen += 2*numBottles.length(); + if (i == 1) { + msgLen += 2*String(" green bottle, standing on the wall\n").length(); + } else { + msgLen += 2*String(" green bottles, standing on the wall\n").length(); + } + msgLen += String("And if one green bottle should accidentally fall\nThere'll be ").length(); + switch (i) { + case 1: + msgLen += String("no green bottles, standing on the wall\n\n").length(); + break; + case 2: + msgLen += String("1 green bottle, standing on the wall\n\n").length(); + break; + default: + numBottles = i-1; + msgLen += numBottles.length(); + msgLen += String(" green bottles, standing on the wall\n\n").length(); + break; + }; + } + + // Now we can start to publish the message + client.beginPublish("greenBottles/lyrics", msgLen, false); + for (int i = bottleCount; i > 0; i--) { + for (int j = 0; j < 2; j++) { + client.print(i); + if (i == 1) { + client.print(" green bottle, standing on the wall\n"); + } else { + client.print(" green bottles, standing on the wall\n"); + } + } + client.print("And if one green bottle should accidentally fall\nThere'll be "); + switch (i) { + case 1: + client.print("no green bottles, standing on the wall\n\n"); + break; + case 2: + client.print("1 green bottle, standing on the wall\n\n"); + break; + default: + client.print(i-1); + client.print(" green bottles, standing on the wall\n\n"); + break; + }; + } + // Now we're done! + client.endPublish(); + } +} + +void reconnect() { + // Loop until we're reconnected + while (!client.connected()) { + Serial.print("Attempting MQTT connection..."); + // Create a random client ID + String clientId = "ESP8266Client-"; + clientId += String(random(0xffff), HEX); + // Attempt to connect + if (client.connect(clientId.c_str())) { + Serial.println("connected"); + // Once connected, publish an announcement... + client.publish("outTopic", "hello world"); + // ... and resubscribe + client.subscribe("greenBottles/#"); + } else { + Serial.print("failed, rc="); + Serial.print(client.state()); + Serial.println(" try again in 5 seconds"); + // Wait 5 seconds before retrying + delay(5000); + } + } +} + +void setup() { + pinMode(BUILTIN_LED, OUTPUT); // Initialize the BUILTIN_LED pin as an output + Serial.begin(115200); + setup_wifi(); + client.setServer(mqtt_server, 1883); + client.setCallback(callback); +} + +void loop() { + + if (!client.connected()) { + reconnect(); + } + client.loop(); +} diff --git a/keywords.txt b/keywords.txt index b979588f..1ee23d0f 100755 --- a/keywords.txt +++ b/keywords.txt @@ -16,6 +16,9 @@ connect KEYWORD2 disconnect KEYWORD2 publish KEYWORD2 publish_P KEYWORD2 +beginPublish KEYWORD2 +endPublish KEYWORD2 +write KEYWORD2 subscribe KEYWORD2 unsubscribe KEYWORD2 loop KEYWORD2 diff --git a/library.json b/library.json index b9673907..8a36a1c5 100644 --- a/library.json +++ b/library.json @@ -6,7 +6,7 @@ "type": "git", "url": "https://github.com/knolleary/pubsubclient.git" }, - "version": "2.6", + "version": "2.7", "exclude": "tests", "examples": "examples/*/*.ino", "frameworks": "arduino", diff --git a/library.properties b/library.properties index 3ceeda81..1ae97882 100644 --- a/library.properties +++ b/library.properties @@ -1,5 +1,5 @@ name=PubSubClient -version=2.6 +version=2.7 author=Nick O'Leary maintainer=Nick O'Leary sentence=A client library for MQTT messaging. diff --git a/src/PubSubClient.cpp b/src/PubSubClient.cpp index 73714bd2..7efe9b0f 100755 --- a/src/PubSubClient.cpp +++ b/src/PubSubClient.cpp @@ -1,4 +1,5 @@ /* + PubSubClient.cpp - A simple client for MQTT. Nick O'Leary http://knolleary.net @@ -12,16 +13,16 @@ PubSubClient::PubSubClient() { this->_client = NULL; this->stream = NULL; setCallback(NULL); - this->buffer_size = MQTT_MAX_PACKET_SIZE; - this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); } PubSubClient::PubSubClient(Client& client) { this->_state = MQTT_DISCONNECTED; setClient(client); this->stream = NULL; - this->buffer_size = MQTT_MAX_PACKET_SIZE; - this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); } PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { @@ -29,16 +30,16 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { setServer(addr, port); setClient(client); this->stream = NULL; - this->buffer_size = MQTT_MAX_PACKET_SIZE; - this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); } PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(addr,port); setClient(client); setStream(stream); - this->buffer_size = MQTT_MAX_PACKET_SIZE; - this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); } PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; @@ -46,8 +47,8 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR setCallback(callback); setClient(client); this->stream = NULL; - this->buffer_size = MQTT_MAX_PACKET_SIZE; - this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); } PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; @@ -55,8 +56,8 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR setCallback(callback); setClient(client); setStream(stream); - this->buffer_size = MQTT_MAX_PACKET_SIZE; - this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { @@ -64,16 +65,16 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { setServer(ip, port); setClient(client); this->stream = NULL; - this->buffer_size = MQTT_MAX_PACKET_SIZE; - this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(ip,port); setClient(client); setStream(stream); - this->buffer_size = MQTT_MAX_PACKET_SIZE; - this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; @@ -81,8 +82,8 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, setCallback(callback); setClient(client); this->stream = NULL; - this->buffer_size = MQTT_MAX_PACKET_SIZE; - this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; @@ -90,8 +91,8 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, setCallback(callback); setClient(client); setStream(stream); - this->buffer_size = MQTT_MAX_PACKET_SIZE; - this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); } PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { @@ -99,16 +100,16 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { setServer(domain,port); setClient(client); this->stream = NULL; - this->buffer_size = MQTT_MAX_PACKET_SIZE; - this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); } PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(domain,port); setClient(client); setStream(stream); - this->buffer_size = MQTT_MAX_PACKET_SIZE; - this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); } PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; @@ -116,8 +117,8 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN setCallback(callback); setClient(client); this->stream = NULL; - this->buffer_size = MQTT_MAX_PACKET_SIZE; - this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); } PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; @@ -125,8 +126,8 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN setCallback(callback); setClient(client); setStream(stream); - this->buffer_size = MQTT_MAX_PACKET_SIZE; - this->buffer = (uint8_t*)malloc(MQTT_MAX_PACKET_SIZE); + this->bufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE); } PubSubClient::~PubSubClient() { @@ -134,30 +135,40 @@ PubSubClient::~PubSubClient() { } boolean PubSubClient::connect(const char *id) { - return connect(id,NULL,NULL,0,0,0,0); + return connect(id,NULL,NULL,0,0,0,0,1); } boolean PubSubClient::connect(const char *id, const char *user, const char *pass) { - return connect(id,user,pass,0,0,0,0); + return connect(id,user,pass,0,0,0,0,1); } boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { - return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); + return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage,1); } boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { + return connect(id,user,pass,willTopic,willQos,willRetain,willMessage,1); +} + +boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession) { if (!connected()) { int result = 0; - if (domain != NULL) { - result = _client->connect(this->domain, this->port); + + if(_client->connected()) { + result = 1; } else { - result = _client->connect(this->ip, this->port); + if (domain != NULL) { + result = _client->connect(this->domain, this->port); + } else { + result = _client->connect(this->ip, this->port); + } } + if (result == 1) { nextMsgId = 1; // Leave room in the buffer for header and variable length field - uint16_t length = 5; + uint16_t length = MQTT_MAX_HEADER_SIZE; unsigned int j; #if MQTT_VERSION == MQTT_VERSION_3_1 @@ -168,14 +179,17 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass #define MQTT_HEADER_VERSION_LENGTH 7 #endif for (j = 0;jbuffer[length++] = d[j]; } uint8_t v; if (willTopic) { - v = 0x06|(willQos<<3)|(willRetain<<5); + v = 0x04|(willQos<<3)|(willRetain<<5); } else { - v = 0x02; + v = 0x00; + } + if (cleanSession) { + v = v|0x02; } if(user != NULL) { @@ -185,25 +199,30 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass v = v|(0x80>>1); } } + this->buffer[length++] = v; - buffer[length++] = v; + this->buffer[length++] = ((MQTT_KEEPALIVE) >> 8); + this->buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); - buffer[length++] = ((MQTT_KEEPALIVE) >> 8); - buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); - length = writeString(id,buffer,length); + CHECK_STRING_LENGTH(length,id) + length = writeString(id,this->buffer,length); if (willTopic) { - length = writeString(willTopic,buffer,length); - length = writeString(willMessage,buffer,length); + CHECK_STRING_LENGTH(length,willTopic) + length = writeString(willTopic,this->buffer,length); + CHECK_STRING_LENGTH(length,willMessage) + length = writeString(willMessage,this->buffer,length); } if(user != NULL) { - length = writeString(user,buffer,length); + CHECK_STRING_LENGTH(length,user) + length = writeString(user,this->buffer,length); if(pass != NULL) { - length = writeString(pass,buffer,length); + CHECK_STRING_LENGTH(length,pass) + length = writeString(pass,this->buffer,length); } } - write(MQTTCONNECT,buffer,length-5); + write(MQTTCONNECT,this->buffer,length-MQTT_MAX_HEADER_SIZE); lastInActivity = lastOutActivity = millis(); @@ -216,7 +235,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass } } uint8_t llen; - uint16_t len = readPacket(&llen); + uint32_t len = readPacket(&llen); if (len == 4) { if (buffer[3] == 0) { @@ -241,6 +260,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass boolean PubSubClient::readByte(uint8_t * result) { uint32_t previousMillis = millis(); while(!_client->available()) { + yield(); uint32_t currentMillis = millis(); if(currentMillis - previousMillis >= ((int32_t) MQTT_SOCKET_TIMEOUT * 1000)){ return false; @@ -261,53 +281,61 @@ boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){ return false; } -uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { +uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { uint16_t len = 0; - if(!readByte(buffer, &len)) return 0; - bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH; + if(!readByte(this->buffer, &len)) return 0; + bool isPublish = (this->buffer[0]&0xF0) == MQTTPUBLISH; uint32_t multiplier = 1; - uint16_t length = 0; + uint32_t length = 0; uint8_t digit = 0; uint16_t skip = 0; - uint8_t start = 0; + uint32_t start = 0; do { + if (len == 5) { + // Invalid remaining length encoding - kill the connection + _state = MQTT_DISCONNECTED; + _client->stop(); + return 0; + } if(!readByte(&digit)) return 0; - buffer[len++] = digit; + this->buffer[len++] = digit; length += (digit & 127) * multiplier; - multiplier *= 128; + multiplier <<=7; //multiplier *= 128 } while ((digit & 128) != 0); *lengthLength = len-1; if (isPublish) { // Read in topic length to calculate bytes to skip over for Stream writing - if(!readByte(buffer, &len)) return 0; - if(!readByte(buffer, &len)) return 0; - skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2]; + if(!readByte(this->buffer, &len)) return 0; + if(!readByte(this->buffer, &len)) return 0; + skip = (this->buffer[*lengthLength+1]<<8)+this->buffer[*lengthLength+2]; start = 2; - if (buffer[0]&MQTTQOS1) { + if (this->buffer[0]&MQTTQOS1) { // skip message id skip += 2; } } + uint32_t idx = len; - for (uint16_t i = start;istream) { - if (isPublish && len-*lengthLength-2>skip) { + if (isPublish && idx-*lengthLength-2>skip) { this->stream->write(digit); } } - if (len < this->buffer_size) { - buffer[len] = digit; + + if (len < this->bufferSize) { + this->buffer[len] = digit; + len++; } - len++; + idx++; } - if (!this->stream && len > this->buffer_size) { + if (!this->stream && idx > this->bufferSize) { len = 0; // This will cause the packet to be ignored. } - return len; } @@ -320,9 +348,9 @@ boolean PubSubClient::loop() { _client->stop(); return false; } else { - buffer[0] = MQTTPINGREQ; - buffer[1] = 0; - _client->write(buffer,2); + this->buffer[0] = MQTTPINGREQ; + this->buffer[1] = 0; + _client->write(this->buffer,2); lastOutActivity = t; lastInActivity = t; pingOutstanding = true; @@ -335,38 +363,41 @@ boolean PubSubClient::loop() { uint8_t *payload; if (len > 0) { lastInActivity = t; - uint8_t type = buffer[0]&0xF0; + uint8_t type = this->buffer[0]&0xF0; if (type == MQTTPUBLISH) { if (callback) { - uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */ - memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */ - buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */ - char *topic = (char*) buffer+llen+2; + uint16_t tl = (this->buffer[llen+1]<<8)+this->buffer[llen+2]; /* topic length in bytes */ + memmove(this->buffer+llen+2,this->buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */ + this->buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */ + char *topic = (char*) this->buffer+llen+2; // msgId only present for QOS>0 - if ((buffer[0]&0x06) == MQTTQOS1) { - msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1]; - payload = buffer+llen+3+tl+2; + if ((this->buffer[0]&0x06) == MQTTQOS1) { + msgId = (this->buffer[llen+3+tl]<<8)+this->buffer[llen+3+tl+1]; + payload = this->buffer+llen+3+tl+2; callback(topic,payload,len-llen-3-tl-2); - buffer[0] = MQTTPUBACK; - buffer[1] = 2; - buffer[2] = (msgId >> 8); - buffer[3] = (msgId & 0xFF); - _client->write(buffer,4); + this->buffer[0] = MQTTPUBACK; + this->buffer[1] = 2; + this->buffer[2] = (msgId >> 8); + this->buffer[3] = (msgId & 0xFF); + _client->write(this->buffer,4); lastOutActivity = t; } else { - payload = buffer+llen+3+tl; + payload = this->buffer+llen+3+tl; callback(topic,payload,len-llen-3-tl); } } } else if (type == MQTTPINGREQ) { - buffer[0] = MQTTPINGRESP; - buffer[1] = 0; - _client->write(buffer,2); + this->buffer[0] = MQTTPINGRESP; + this->buffer[1] = 0; + _client->write(this->buffer,2); } else if (type == MQTTPINGRESP) { pingOutstanding = false; } + } else if (!connected()) { + // readPacket has closed the connection + return false; } } return true; @@ -375,11 +406,11 @@ boolean PubSubClient::loop() { } boolean PubSubClient::publish(const char* topic, const char* payload) { - return publish(topic,(const uint8_t*)payload,strlen(payload),false); + return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,false); } boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) { - return publish(topic,(const uint8_t*)payload,strlen(payload),retained); + return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,retained); } boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) { @@ -388,26 +419,34 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigne boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { if (connected()) { - if (this->buffer_size < 5 + 2+strlen(topic) + plength) { + if (this->bufferSize < MQTT_MAX_HEADER_SIZE + 2+strnlen(topic, this->bufferSize) + plength) { // Too long return false; } // Leave room in the buffer for header and variable length field - uint16_t length = 5; - length = writeString(topic,buffer,length); + uint16_t length = MQTT_MAX_HEADER_SIZE; + length = writeString(topic,this->buffer,length); + + // Add payload uint16_t i; for (i=0;ibuffer[length++] = payload[i]; } + + // Write the header uint8_t header = MQTTPUBLISH; if (retained) { header |= 1; } - return write(header,buffer,length-5); + return write(header,this->buffer,length-MQTT_MAX_HEADER_SIZE); } return false; } +boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean retained) { + return publish_P(topic, (const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0, retained); +} + boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { uint8_t llen = 0; uint8_t digit; @@ -417,32 +456,33 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig unsigned int i; uint8_t header; unsigned int len; + int expectedLength; if (!connected()) { return false; } - tlen = strlen(topic); + tlen = strnlen(topic, this->bufferSize); header = MQTTPUBLISH; if (retained) { header |= 1; } - buffer[pos++] = header; + this->buffer[pos++] = header; len = plength + 2 + tlen; do { - digit = len % 128; - len = len / 128; + digit = len & 127; //digit = len %128 + len >>= 7; //len = len / 128 if (len > 0) { digit |= 0x80; } - buffer[pos++] = digit; + this->buffer[pos++] = digit; llen++; } while(len>0); - pos = writeString(topic,buffer,pos); + pos = writeString(topic,this->buffer,pos); - rc += _client->write(buffer,pos); + rc += _client->write(this->buffer,pos); for (i=0;iwrite((char)pgm_read_byte_near(payload + i)); @@ -450,19 +490,52 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig lastOutActivity = millis(); - return rc == tlen + 4 + plength; + expectedLength = 1 + llen + 2 + tlen + plength; + + return (rc == expectedLength); } -boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { +boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) { + if (connected()) { + // Send the header and variable length field + uint16_t length = MQTT_MAX_HEADER_SIZE; + length = writeString(topic,this->buffer,length); + uint8_t header = MQTTPUBLISH; + if (retained) { + header |= 1; + } + size_t hlen = buildHeader(header, this->buffer, plength+length-MQTT_MAX_HEADER_SIZE); + uint16_t rc = _client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen)); + lastOutActivity = millis(); + return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen))); + } + return false; +} + +int PubSubClient::endPublish() { + return 1; +} + +size_t PubSubClient::write(uint8_t data) { + lastOutActivity = millis(); + return _client->write(data); +} + +size_t PubSubClient::write(const uint8_t *buffer, size_t size) { + lastOutActivity = millis(); + return _client->write(buffer,size); +} + +size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) { uint8_t lenBuf[4]; uint8_t llen = 0; uint8_t digit; uint8_t pos = 0; - uint16_t rc; uint16_t len = length; do { - digit = len % 128; - len = len / 128; + + digit = len & 127; //digit = len %128 + len >>= 7; //len = len / 128 if (len > 0) { digit |= 0x80; } @@ -472,12 +545,18 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { buf[4-llen] = header; for (int i=0;i 0) && result) { @@ -489,9 +568,9 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { } return result; #else - rc = _client->write(buf+(4-llen),length+1+llen); + rc = _client->write(buf+(MQTT_MAX_HEADER_SIZE-hlen),length+hlen); lastOutActivity = millis(); - return (rc == 1+llen+length); + return (rc == hlen+length); #endif } @@ -500,53 +579,62 @@ boolean PubSubClient::subscribe(const char* topic) { } boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { - if (qos < 0 || qos > 1) { + size_t topicLength = strnlen(topic, this->bufferSize); + if (topic == 0) { + return false; + } + if (qos > 1) { return false; } - if (this->buffer_size < 9 + strlen(topic)) { + if (this->bufferSize < 9 + topicLength) { // Too long return false; } if (connected()) { // Leave room in the buffer for header and variable length field - uint16_t length = 5; + uint16_t length = MQTT_MAX_HEADER_SIZE; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; } - buffer[length++] = (nextMsgId >> 8); - buffer[length++] = (nextMsgId & 0xFF); - length = writeString((char*)topic, buffer,length); - buffer[length++] = qos; - return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); + this->buffer[length++] = (nextMsgId >> 8); + this->buffer[length++] = (nextMsgId & 0xFF); + length = writeString((char*)topic, this->buffer,length); + this->buffer[length++] = qos; + return write(MQTTSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE); } return false; } boolean PubSubClient::unsubscribe(const char* topic) { - if (this->buffer_size < 9 + strlen(topic)) { + size_t topicLength = strnlen(topic, this->bufferSize); + if (topic == 0) { + return false; + } + if (this->bufferSize < 9 + topicLength) { // Too long return false; } if (connected()) { - uint16_t length = 5; + uint16_t length = MQTT_MAX_HEADER_SIZE; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; } - buffer[length++] = (nextMsgId >> 8); - buffer[length++] = (nextMsgId & 0xFF); - length = writeString(topic, buffer,length); - return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); + this->buffer[length++] = (nextMsgId >> 8); + this->buffer[length++] = (nextMsgId & 0xFF); + length = writeString(topic, this->buffer,length); + return write(MQTTUNSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE); } return false; } void PubSubClient::disconnect() { - buffer[0] = MQTTDISCONNECT; - buffer[1] = 0; - _client->write(buffer,2); + this->buffer[0] = MQTTDISCONNECT; + this->buffer[1] = 0; + _client->write(this->buffer,2); _state = MQTT_DISCONNECTED; + _client->flush(); _client->stop(); lastInActivity = lastOutActivity = millis(); } @@ -577,6 +665,8 @@ boolean PubSubClient::connected() { _client->flush(); _client->stop(); } + } else { + return this->_state == MQTT_CONNECTED; } } return rc; @@ -620,11 +710,24 @@ int PubSubClient::state() { } boolean PubSubClient::setBufferSize(uint16_t size) { - this->buffer = (uint8_t*)realloc(this->buffer, size); - this->buffer_size = size; - return (this->buffer == NULL); + if (size == 0) { + // Cannot set it back to 0 + return false; + } + if (this->bufferSize == 0) { + this->buffer = (uint8_t*)malloc(size); + } else { + uint8_t* newBuffer = (uint8_t*)realloc(this->buffer, size); + if (newBuffer != NULL) { + this->buffer = newBuffer; + } else { + return false; + } + } + this->bufferSize = size; + return (this->buffer != NULL); } uint16_t PubSubClient::getBufferSize() { - return this->buffer_size; + return this->bufferSize; } diff --git a/src/PubSubClient.h b/src/PubSubClient.h index c8b0ff14..bd44f2a9 100755 --- a/src/PubSubClient.h +++ b/src/PubSubClient.h @@ -23,7 +23,7 @@ // MQTT_MAX_PACKET_SIZE : Maximum packet size. Override with setBufferSize(). #ifndef MQTT_MAX_PACKET_SIZE -#define MQTT_MAX_PACKET_SIZE 128 +#define MQTT_MAX_PACKET_SIZE 256 #endif // MQTT_KEEPALIVE : keepAlive interval in Seconds @@ -73,28 +73,38 @@ #define MQTTQOS1 (1 << 1) #define MQTTQOS2 (2 << 1) -#ifdef ESP8266 +// Maximum size of fixed header and variable length size header +#define MQTT_MAX_HEADER_SIZE 5 + +#if defined(ESP8266) || defined(ESP32) #include #define MQTT_CALLBACK_SIGNATURE std::function callback #else #define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, unsigned int) #endif -class PubSubClient { +#define CHECK_STRING_LENGTH(l,s) if (l+2+strnlen(s, this->bufferSize) > this->bufferSize) {_client->stop();return false;} + +class PubSubClient : public Print { private: Client* _client; uint8_t* buffer; - uint16_t buffer_size; + uint16_t bufferSize; uint16_t nextMsgId; unsigned long lastOutActivity; unsigned long lastInActivity; bool pingOutstanding; MQTT_CALLBACK_SIGNATURE; - uint16_t readPacket(uint8_t*); + uint32_t readPacket(uint8_t*); boolean readByte(uint8_t * result); boolean readByte(uint8_t * result, uint16_t * index); boolean write(uint8_t header, uint8_t* buf, uint16_t length); uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos); + // Build up the header ready to send + // Returns the size of the header + // Note: the header is built at the end of the first MQTT_MAX_HEADER_SIZE bytes, so will start + // (MQTT_MAX_HEADER_SIZE - ) bytes into the buffer + size_t buildHeader(uint8_t header, uint8_t* buf, uint16_t length); IPAddress ip; const char* domain; uint16_t port; @@ -125,24 +135,45 @@ class PubSubClient { PubSubClient& setClient(Client& client); PubSubClient& setStream(Stream& stream); + boolean setBufferSize(uint16_t size); + uint16_t getBufferSize(); + boolean connect(const char* id); boolean connect(const char* id, const char* user, const char* pass); boolean connect(const char* id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage); boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage); + boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession); void disconnect(); boolean publish(const char* topic, const char* payload); boolean publish(const char* topic, const char* payload, boolean retained); boolean publish(const char* topic, const uint8_t * payload, unsigned int plength); boolean publish(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); + boolean publish_P(const char* topic, const char* payload, boolean retained); boolean publish_P(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); + // Start to publish a message. + // This API: + // beginPublish(...) + // one or more calls to write(...) + // endPublish() + // Allows for arbitrarily large payloads to be sent without them having to be copied into + // a new buffer and held in memory at one time + // Returns 1 if the message was started successfully, 0 if there was an error + boolean beginPublish(const char* topic, unsigned int plength, boolean retained); + // Finish off this publish message (started with beginPublish) + // Returns 1 if the packet was sent successfully, 0 if there was an error + int endPublish(); + // Write a single byte of payload (only to be used with beginPublish/endPublish) + virtual size_t write(uint8_t); + // Write size bytes from buffer into the payload (only to be used with beginPublish/endPublish) + // Returns the number of bytes written + virtual size_t write(const uint8_t *buffer, size_t size); boolean subscribe(const char* topic); boolean subscribe(const char* topic, uint8_t qos); boolean unsubscribe(const char* topic); boolean loop(); boolean connected(); int state(); - boolean setBufferSize(uint16_t size); - uint16_t getBufferSize(); + }; diff --git a/tests/src/connect_spec.cpp b/tests/src/connect_spec.cpp index 69f18646..e27a1f59 100644 --- a/tests/src/connect_spec.cpp +++ b/tests/src/connect_spec.cpp @@ -98,6 +98,33 @@ int test_connect_fails_on_bad_rc() { END_IT } +int test_connect_non_clean_session() { + IT("sends a properly formatted non-clean session connect packet and succeeds"); + ShimClient shimClient; + + shimClient.setAllowConnect(true); + byte expectServer[] = { 172, 16, 0, 2 }; + shimClient.expectConnect(expectServer,1883); + byte connect[] = {0x10,0x18,0x0,0x4,0x4d,0x51,0x54,0x54,0x4,0x0,0x0,0xf,0x0,0xc,0x63,0x6c,0x69,0x65,0x6e,0x74,0x5f,0x74,0x65,0x73,0x74,0x31}; + byte connack[] = { 0x20, 0x02, 0x00, 0x00 }; + + shimClient.expect(connect,26); + shimClient.respond(connack,4); + + PubSubClient client(server, 1883, callback, shimClient); + int state = client.state(); + IS_TRUE(state == MQTT_DISCONNECTED); + + int rc = client.connect((char*)"client_test1",0,0,0,0,0,0,0); + IS_TRUE(rc); + IS_FALSE(shimClient.error()); + + state = client.state(); + IS_TRUE(state == MQTT_CONNECTED); + + END_IT +} + int test_connect_accepts_username_password() { IT("accepts a username and password"); ShimClient shimClient; @@ -133,6 +160,23 @@ int test_connect_accepts_username_no_password() { END_IT } +int test_connect_accepts_username_blank_password() { + IT("accepts a username and blank password"); + ShimClient shimClient; + shimClient.setAllowConnect(true); + + byte connect[] = { 0x10,0x20,0x0,0x4,0x4d,0x51,0x54,0x54,0x4,0xc2,0x0,0xf,0x0,0xc,0x63,0x6c,0x69,0x65,0x6e,0x74,0x5f,0x74,0x65,0x73,0x74,0x31,0x0,0x4,0x75,0x73,0x65,0x72,0x0,0x0}; + byte connack[] = { 0x20, 0x02, 0x00, 0x00 }; + shimClient.expect(connect,0x26); + shimClient.respond(connack,4); + + PubSubClient client(server, 1883, callback, shimClient); + int rc = client.connect((char*)"client_test1",(char*)"user",(char*)"pass"); + IS_TRUE(rc); + IS_FALSE(shimClient.error()); + + END_IT +} int test_connect_ignores_password_no_username() { IT("ignores a password but no username"); @@ -239,10 +283,12 @@ int test_connect_disconnect_connect() { int main() { SUITE("Connect"); + test_connect_fails_no_network(); test_connect_fails_on_no_response(); test_connect_properly_formatted(); + test_connect_non_clean_session(); test_connect_accepts_username_password(); test_connect_fails_on_bad_rc(); test_connect_properly_formatted_hostname(); diff --git a/tests/src/lib/Arduino.h b/tests/src/lib/Arduino.h index c6752801..2a00f24b 100644 --- a/tests/src/lib/Arduino.h +++ b/tests/src/lib/Arduino.h @@ -5,6 +5,7 @@ #include #include #include +#include "Print.h" extern "C"{ @@ -20,4 +21,6 @@ extern "C"{ #define PROGMEM #define pgm_read_byte_near(x) *(x) +#define yield(x) {} + #endif // Arduino_h diff --git a/tests/src/lib/Buffer.cpp b/tests/src/lib/Buffer.cpp index 59a2fbbb..f07759a3 100644 --- a/tests/src/lib/Buffer.cpp +++ b/tests/src/lib/Buffer.cpp @@ -2,9 +2,13 @@ #include "Arduino.h" Buffer::Buffer() { + this->pos = 0; + this->length = 0; } Buffer::Buffer(uint8_t* buf, size_t size) { + this->pos = 0; + this->length = 0; this->add(buf,size); } bool Buffer::available() { diff --git a/tests/src/lib/Buffer.h b/tests/src/lib/Buffer.h index f448cade..c6a2cb58 100644 --- a/tests/src/lib/Buffer.h +++ b/tests/src/lib/Buffer.h @@ -5,18 +5,18 @@ class Buffer { private: - uint8_t buffer[1024]; + uint8_t buffer[2048]; uint16_t pos; uint16_t length; - + public: Buffer(); Buffer(uint8_t* buf, size_t size); - + virtual bool available(); virtual uint8_t next(); virtual void reset(); - + virtual void add(uint8_t* buf, size_t size); }; diff --git a/tests/src/lib/Print.h b/tests/src/lib/Print.h new file mode 100644 index 00000000..02ef77c2 --- /dev/null +++ b/tests/src/lib/Print.h @@ -0,0 +1,28 @@ +/* + Print.h - Base class that provides print() and println() + Copyright (c) 2008 David A. Mellis. All right reserved. + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef Print_h +#define Print_h + +class Print { + public: + virtual size_t write(uint8_t) = 0; +}; + +#endif diff --git a/tests/src/publish_spec.cpp b/tests/src/publish_spec.cpp index 232df0d3..ee3d3bed 100644 --- a/tests/src/publish_spec.cpp +++ b/tests/src/publish_spec.cpp @@ -134,6 +134,7 @@ int test_publish_too_long() { shimClient.respond(connack,4); PubSubClient client(server, 1883, callback, shimClient); + client.setBufferSize(128); int rc = client.connect((char*)"client_test1"); IS_TRUE(rc); diff --git a/tests/src/receive_spec.cpp b/tests/src/receive_spec.cpp index 29975190..93e909ae 100644 --- a/tests/src/receive_spec.cpp +++ b/tests/src/receive_spec.cpp @@ -20,6 +20,7 @@ void reset_callback() { } void callback(char* topic, byte* payload, unsigned int length) { + TRACE("Callback received topic=[" << topic << "] length=" << length << "\n") callback_called = true; strcpy(lastTopic,topic); memcpy(lastPayload,payload,length); @@ -102,10 +103,15 @@ int test_receive_max_sized_message() { shimClient.respond(connack,4); PubSubClient client(server, 1883, callback, shimClient); + int length = 80; // If this is changed to > 128 then the publish packet below + // is no longer valid as it assumes the remaining length + // is a single-byte. Don't make that mistake like I just + // did and lose a whole evening tracking down the issue. + client.setBufferSize(length); int rc = client.connect((char*)"client_test1"); IS_TRUE(rc); - int length = MQTT_MAX_PACKET_SIZE; + byte publish[] = {0x30,length-2,0x0,0x5,0x74,0x6f,0x70,0x69,0x63,0x70,0x61,0x79,0x6c,0x6f,0x61,0x64}; byte bigPublish[length]; memset(bigPublish,'A',length); @@ -137,11 +143,13 @@ int test_receive_oversized_message() { byte connack[] = { 0x20, 0x02, 0x00, 0x00 }; shimClient.respond(connack,4); + int length = 80; // See comment in test_receive_max_sized_message before changing this value + PubSubClient client(server, 1883, callback, shimClient); + client.setBufferSize(length-1); int rc = client.connect((char*)"client_test1"); IS_TRUE(rc); - int length = MQTT_MAX_PACKET_SIZE+1; byte publish[] = {0x30,length-2,0x0,0x5,0x74,0x6f,0x70,0x69,0x63,0x70,0x61,0x79,0x6c,0x6f,0x61,0x64}; byte bigPublish[length]; memset(bigPublish,'A',length); @@ -160,6 +168,34 @@ int test_receive_oversized_message() { END_IT } +int test_drop_invalid_remaining_length_message() { + IT("drops invalid remaining length message"); + reset_callback(); + + ShimClient shimClient; + shimClient.setAllowConnect(true); + + byte connack[] = { 0x20, 0x02, 0x00, 0x00 }; + shimClient.respond(connack,4); + + PubSubClient client(server, 1883, callback, shimClient); + int rc = client.connect((char*)"client_test1"); + IS_TRUE(rc); + + byte publish[] = {0x30,0x92,0x92,0x92,0x92,0x01,0x0,0x5,0x74,0x6f,0x70,0x69,0x63,0x70,0x61,0x79,0x6c,0x6f,0x61,0x64}; + shimClient.respond(publish,20); + + rc = client.loop(); + + IS_FALSE(rc); + + IS_FALSE(callback_called); + + IS_FALSE(shimClient.error()); + + END_IT +} + int test_resize_buffer() { IT("receives a message larger than the default maximum"); reset_callback(); @@ -170,24 +206,36 @@ int test_resize_buffer() { byte connack[] = { 0x20, 0x02, 0x00, 0x00 }; shimClient.respond(connack,4); + int length = 80; // See comment in test_receive_max_sized_message before changing this value + PubSubClient client(server, 1883, callback, shimClient); + client.setBufferSize(length-1); int rc = client.connect((char*)"client_test1"); IS_TRUE(rc); - int length = MQTT_MAX_PACKET_SIZE+1; - client.setBufferSize(length); byte publish[] = {0x30,length-2,0x0,0x5,0x74,0x6f,0x70,0x69,0x63,0x70,0x61,0x79,0x6c,0x6f,0x61,0x64}; byte bigPublish[length]; memset(bigPublish,'A',length); bigPublish[length] = 'B'; memcpy(bigPublish,publish,16); + // Send it twice + shimClient.respond(bigPublish,length); shimClient.respond(bigPublish,length); rc = client.loop(); + IS_TRUE(rc); + + // First message fails as it is too big + IS_FALSE(callback_called); + + // Resize the buffer + client.setBufferSize(length); + rc = client.loop(); IS_TRUE(rc); IS_TRUE(callback_called); + IS_TRUE(strcmp(lastTopic,"topic")==0); IS_TRUE(lastLength == length-9); IS_TRUE(memcmp(lastPayload,bigPublish+9,lastLength)==0); @@ -197,8 +245,9 @@ int test_resize_buffer() { END_IT } + int test_receive_oversized_stream_message() { - IT("drops an oversized message"); + IT("receive an oversized streamed message"); reset_callback(); Stream stream; @@ -209,11 +258,13 @@ int test_receive_oversized_stream_message() { byte connack[] = { 0x20, 0x02, 0x00, 0x00 }; shimClient.respond(connack,4); + int length = 80; // See comment in test_receive_max_sized_message before changing this value + PubSubClient client(server, 1883, callback, shimClient, stream); + client.setBufferSize(length-1); int rc = client.connect((char*)"client_test1"); IS_TRUE(rc); - int length = MQTT_MAX_PACKET_SIZE+1; byte publish[] = {0x30,length-2,0x0,0x5,0x74,0x6f,0x70,0x69,0x63,0x70,0x61,0x79,0x6c,0x6f,0x61,0x64}; byte bigPublish[length]; @@ -230,7 +281,8 @@ int test_receive_oversized_stream_message() { IS_TRUE(callback_called); IS_TRUE(strcmp(lastTopic,"topic")==0); - IS_TRUE(lastLength == length-9); + + IS_TRUE(lastLength == length-10); IS_FALSE(stream.error()); IS_FALSE(shimClient.error()); @@ -278,6 +330,7 @@ int main() test_receive_callback(); test_receive_stream(); test_receive_max_sized_message(); + test_drop_invalid_remaining_length_message(); test_receive_oversized_message(); test_resize_buffer(); test_receive_oversized_stream_message(); diff --git a/tests/src/subscribe_spec.cpp b/tests/src/subscribe_spec.cpp index a4198235..22dc8a44 100644 --- a/tests/src/subscribe_spec.cpp +++ b/tests/src/subscribe_spec.cpp @@ -106,6 +106,7 @@ int test_subscribe_too_long() { shimClient.respond(connack,4); PubSubClient client(server, 1883, callback, shimClient); + client.setBufferSize(128); int rc = client.connect((char*)"client_test1"); IS_TRUE(rc);