// async_publish.cpp // // This is a Paho MQTT C++ client, sample application. // // It's an example of how to send messages as an MQTT publisher using the // C++ asynchronous client interface. // // The sample demonstrates: // - Connecting to an MQTT server/broker // - Publishing messages // - Default file persistence // - Last will and testament // - Using asynchronous tokens // - Implementing callbacks and action listeners // /******************************************************************************* * Copyright (c) 2013-2023 Frank Pagliughi * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v2.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v20.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Frank Pagliughi - initial implementation and documentation *******************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "mqtt/async_client.h" using namespace std; const string DFLT_SERVER_ADDRESS { "tcp://172.17.0.3:1883" }; const string CLIENT_ID { "paho_cpp_async_publish" }; const string PERSIST_DIR { "./persist" }; const string TOPIC { "TOPICID" }; const char* PAYLOAD1 = "Hello World!"; const char* PAYLOAD2 = "Hi there!"; const char* PAYLOAD3 = "Is anyone listening?"; const char* PAYLOAD4 = "Someone is always listening."; const char* LWT_PAYLOAD = "Last will and testament."; const int QOS = 1; const auto TIMEOUT = std::chrono::seconds(10); std::string tokenName(const mqtt::token::Type& type) { if (mqtt::token::CONNECT == type) { return "CONNECT"; } if (mqtt::token::SUBSCRIBE == type) { return "SUBSCRIBE"; } if (mqtt::token::PUBLISH == type) { return "PUBLISH"; } if (mqtt::token::DISCONNECT == type) { return "DISCONNECT"; } return "UNKNOWN"; } ///////////////////////////////////////////////////////////////////////////// /** * A callback class for use with the main MQTT client. */ class callback : public virtual mqtt::callback { public: void connection_lost(const string& cause) override { cout << "\nConnection lost" << endl; if (!cause.empty()) cout << "\tcause: " << cause << endl; } void delivery_complete(mqtt::delivery_token_ptr tok) override { cout << "\tDelivery complete for token: " << (tok ? tok->get_message_id() : -1) << endl; } }; ///////////////////////////////////////////////////////////////////////////// /** * A base action listener. */ class action_listener : public virtual mqtt::iaction_listener { protected: void on_failure(const mqtt::token& tok) override { cout << "\tListener failure for token: " << tok.get_message_id() << endl; const auto type = tok.get_type(); const std::string name = tokenName(tok.get_type()); cout << "MQTTActionListner" + name + " call failed. MQTT token id " + std::to_string(tok.get_message_id()) + ", Code = " + std::to_string(tok.get_return_code()) << endl; } void on_success(const mqtt::token& tok) override { cout << "\tListener success for token: " << tok.get_message_id() << endl; } }; ///////////////////////////////////////////////////////////////////////////// /** * A derived action listener for publish events. */ class delivery_action_listener : public action_listener { atomic done_; void on_failure(const mqtt::token& tok) override { const auto type = tok.get_type(); const std::string name = tokenName(tok.get_type()); cout << "MQTTActionListner" + name + " call failed. MQTT token id " + std::to_string(tok.get_message_id()) + ", Code = " + std::to_string(tok.get_return_code()) + ", Error message = " + tok.get_error_message() << endl; action_listener::on_failure(tok); done_ = true; } void on_success(const mqtt::token& tok) override { action_listener::on_success(tok); done_ = true; } public: delivery_action_listener() : done_(false) {} bool is_done() const { return done_; } }; ///////////////////////////////////////////////////////////////////////////// int main(int argc, char* argv[]) { string address = (argc > 1) ? string(argv[1]) : DFLT_SERVER_ADDRESS, clientID = (argc > 2) ? string(argv[2]) : CLIENT_ID; cout << "Initializing for server '" << address << "'..." << endl; mqtt::async_client client(address, clientID, PERSIST_DIR); callback cb; client.set_callback(cb); mqtt::ssl_options sslOpt; sslOpt.set_trust_store("/mnt/cathi/certs/MB-CA01.crt"); sslOpt.ca_path("/mnt/cathi/certs/MB-CA01.crt"); sslOpt.set_key_store("/mnt/cathi/certs/client_cert.pem"); sslOpt.set_private_key("/mnt/cathi/certs/client_key.pem"); sslOpt.set_verify(true); sslOpt.set_enable_server_cert_auth(true); sslOpt.set_error_handler([](const std::string &sslErrorMsg) { std::cout << "SSL Error: " + sslErrorMsg << std::endl; }); auto connOpts = mqtt::connect_options_builder() .clean_session() .keep_alive_interval(std::chrono::seconds(20)) .mqtt_version(MQTTVERSION_3_1_1) .user_name("mqttclient") .password("mqttclientpassword") .will(mqtt::message(TOPIC, LWT_PAYLOAD, QOS)) .automatic_reconnect(true) .ssl(sslOpt) .finalize(); cout << " ...OK" << endl; try { cout << "\nConnecting..." << endl; mqtt::token_ptr conntok = client.connect(connOpts); cout << "Waiting for the connection..." << endl; conntok->wait(); cout << " ...OK" << endl; mqtt::delivery_token_ptr pubtok; action_listener listener; std::string input; std::cout << " Send Messages: " << std::endl; std::cout << " Message : " << std::flush; std::getline(std::cin, input); int toContinue = 1; while (toContinue) { toContinue = 0; int32_t delay; std::cout << " Delay bw msg publish reqs : " << std::flush; std::cin >> delay; int32_t npublish; std::cout << " Number of msg publish reqs : " << std::flush; std::cin >> npublish; int32_t count = 0; for (int32_t i = 0; i < npublish; i++) { try { pubtok = client.publish(TOPIC, input.data(), input.size(), QOS, false, nullptr, listener); usleep(static_cast(delay)); } catch (const mqtt::exception &ex) { count++; std::cout << "Publishing message failed: " << ex.what() << std::endl; usleep(static_cast(delay)); } catch (...) { count++; std::cout << "ERROR: STILL CONTINUE" << std::endl; usleep(static_cast(delay)); } std::cout << "ERRORCOUNT: " << count << std::endl; } std::cout << "Shall we continue?" << std::endl; std::cin >> toContinue; } // Double check that there are no pending tokens auto toks = client.get_pending_delivery_tokens(); if (!toks.empty()) cout << "Error: There are pending delivery tokens!" << endl; // Disconnect cout << "\nDisconnecting..." << endl; client.disconnect()->wait(); cout << " ...OK" << endl; } catch (const mqtt::exception& exc) { cerr << exc.what() << endl; return 1; } return 0; }