diff --git a/README.md b/README.md index 706604d..c32f55b 100644 --- a/README.md +++ b/README.md @@ -32,11 +32,19 @@ To keep up with the latest announcements for this project, or to ask questions: ### Unreleased Features in this Branch +- New `create_options` that can be used to construct a client with new features: + - Send while disconnected before the 1st successful connection + - Output buffer can delete oldest messages when full + - Can choose to clear the persistence store on startup + - Select whether to persist QoS 0 messages +- Started classes to create options using the Builder Pattern, with the `create_options_builder`. - [#231] Added `on_disconnected` callback to handle receipt of disconnect packet from server. - [#211, 223, #235] Removed use of Log() function from the Paho C library. - [#227] Fixed race condition in thread-safe queue - [#224] & [#255] Subscribing to MQTT v3 broker with array of one topic causes segfault. +Targets Paho C v1.3.6 + ### New Features in Paho C++ v1.1 - MQTT v5 support: diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4e9b7c4..3d43595 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -42,20 +42,21 @@ find_package(Threads REQUIRED) ## --- Use object library to optimize compilation --- add_library(paho-cpp-objs OBJECT - async_client.cpp - client.cpp - disconnect_options.cpp - iclient_persistence.cpp - message.cpp - properties.cpp - response_options.cpp - ssl_options.cpp - string_collection.cpp - subscribe_options.cpp - token.cpp - topic.cpp - connect_options.cpp - will_options.cpp + async_client.cpp + client.cpp + connect_options.cpp + create_options.cpp + disconnect_options.cpp + iclient_persistence.cpp + message.cpp + properties.cpp + response_options.cpp + ssl_options.cpp + string_collection.cpp + subscribe_options.cpp + token.cpp + topic.cpp + will_options.cpp ) ## install the shared library diff --git a/src/async_client.cpp b/src/async_client.cpp index 1a89662..f1e3226 100644 --- a/src/async_client.cpp +++ b/src/async_client.cpp @@ -29,11 +29,6 @@ #include #include -// TODO: Delete this when #680 is merged into the Paho C lib -#if !defined(MQTTAsync_createOptions_initializer5) - #define MQTTAsync_createOptions_initializer5 { {'M', 'Q', 'C', 'O'}, 0, 0, 100, MQTTVERSION_5 } -#endif - namespace mqtt { ///////////////////////////////////////////////////////////////////////////// @@ -56,17 +51,17 @@ async_client::async_client(const string& serverURI, const string& clientId, : serverURI_(serverURI), clientId_(clientId), mqttVersion_(MQTTVERSION_DEFAULT), persist_(nullptr), userCallback_(nullptr) { - MQTTAsync_createOptions opts MQTTAsync_createOptions_initializer5; + create_options opts; if (maxBufferedMessages != 0) { - opts.sendWhileDisconnected = to_int(true); - opts.maxBufferedMessages = maxBufferedMessages; + opts.set_send_while_disconnected(true); + opts.set_max_buffered_messages(maxBufferedMessages); } int rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(), MQTTCLIENT_PERSISTENCE_DEFAULT, const_cast(persistDir.c_str()), - &opts); + &opts.opts_); if (rc != 0) throw exception(rc); @@ -77,18 +72,19 @@ async_client::async_client(const string& serverURI, const string& clientId, : serverURI_(serverURI), clientId_(clientId), mqttVersion_(MQTTVERSION_DEFAULT), persist_(nullptr), userCallback_(nullptr) { - MQTTAsync_createOptions opts MQTTAsync_createOptions_initializer5; + create_options opts; if (maxBufferedMessages != 0) { - opts.sendWhileDisconnected = to_int(true); - opts.maxBufferedMessages = maxBufferedMessages; + opts.set_send_while_disconnected(true); + opts.set_max_buffered_messages(maxBufferedMessages); } int rc = MQTTASYNC_SUCCESS; if (!persistence) { rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(), - MQTTCLIENT_PERSISTENCE_NONE, nullptr, &opts); + MQTTCLIENT_PERSISTENCE_NONE, nullptr, + &opts.opts_); } else { persist_.reset(new MQTTClient_persistence { @@ -105,7 +101,59 @@ async_client::async_client(const string& serverURI, const string& clientId, rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(), MQTTCLIENT_PERSISTENCE_USER, persist_.get(), - &opts); + &opts.opts_); + } + if (rc != 0) + throw exception(rc); +} + + +async_client::async_client(const string& serverURI, const string& clientId, + const create_options& opts, + const string& persistDir) + : serverURI_(serverURI), clientId_(clientId), + mqttVersion_(opts.opts_.MQTTVersion), + persist_(nullptr), userCallback_(nullptr) +{ + int rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(), + MQTTCLIENT_PERSISTENCE_DEFAULT, + const_cast(persistDir.c_str()), + const_cast(&opts.opts_)); + + if (rc != 0) + throw exception(rc); +} + +async_client::async_client(const string& serverURI, const string& clientId, + const create_options& opts, + iclient_persistence* persistence /*=nullptr*/) + : serverURI_(serverURI), clientId_(clientId), + mqttVersion_(opts.opts_.MQTTVersion), + persist_(nullptr), userCallback_(nullptr) +{ + int rc = MQTTASYNC_SUCCESS; + + if (!persistence) { + rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(), + MQTTCLIENT_PERSISTENCE_NONE, nullptr, + const_cast(&opts.opts_)); + } + else { + persist_.reset(new MQTTClient_persistence { + persistence, + &iclient_persistence::persistence_open, + &iclient_persistence::persistence_close, + &iclient_persistence::persistence_put, + &iclient_persistence::persistence_get, + &iclient_persistence::persistence_remove, + &iclient_persistence::persistence_keys, + &iclient_persistence::persistence_clear, + &iclient_persistence::persistence_containskey + }); + + rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(), + MQTTCLIENT_PERSISTENCE_USER, persist_.get(), + const_cast(&opts.opts_)); } if (rc != 0) throw exception(rc); diff --git a/src/create_options.cpp b/src/create_options.cpp new file mode 100644 index 0000000..a365604 --- /dev/null +++ b/src/create_options.cpp @@ -0,0 +1,30 @@ +/******************************************************************************* + * Copyright (c) 2020 Frank Pagliughi + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Frank Pagliughi - initial implementation and documentation + *******************************************************************************/ + +#include "mqtt/create_options.h" +#include + +namespace mqtt { + +///////////////////////////////////////////////////////////////////////////// + +const MQTTAsync_createOptions create_options::DFLT_C_STRUCT = + MQTTAsync_createOptions_initializer; + +///////////////////////////////////////////////////////////////////////////// + +} // end namespace mqtt + diff --git a/src/mqtt/async_client.h b/src/mqtt/async_client.h index 4bbd95c..87c0218 100644 --- a/src/mqtt/async_client.h +++ b/src/mqtt/async_client.h @@ -28,6 +28,7 @@ #include "MQTTAsync.h" #include "mqtt/types.h" #include "mqtt/token.h" +#include "mqtt/create_options.h" #include "mqtt/string_collection.h" #include "mqtt/delivery_token.h" #include "mqtt/iclient_persistence.h" @@ -214,6 +215,39 @@ public: */ async_client(const string& serverURI, const string& clientId, int maxBufferedMessages, iclient_persistence* persistence=nullptr); + + /** + * Create an async_client that can be used to communicate with an MQTT + * server, which allows for off-line message buffering. + * This uses file-based persistence in the specified directory. + * @param serverURI the address of the server to connect to, specified + * as a URI. + * @param clientId a client identifier that is unique on the server + * being connected to + * @param opts The create options + * @param persistDir The directory to use for persistence data + * @throw exception if an argument is invalid + */ + async_client(const string& serverURI, const string& clientId, + const create_options& opts, const string& persistDir); + /** + * Create an async_client that can be used to communicate with an MQTT + * server, which allows for off-line message buffering. + * This allows the caller to specify a user-defined persistence object, + * or use no persistence. + * @param serverURI the address of the server to connect to, specified + * as a URI. + * @param clientId a client identifier that is unique on the server + * being connected to + * @param maxBufferedMessages the maximum number of messages allowed to + * be buffered while not connected + * @param persistence The user persistence structure. If this is null, + * then no persistence is used. + * @throw exception if an argument is invalid + */ + async_client(const string& serverURI, const string& clientId, + const create_options& opts, + iclient_persistence* persistence=nullptr); /** * Destructor */ diff --git a/src/mqtt/create_options.h b/src/mqtt/create_options.h new file mode 100644 index 0000000..f6812f3 --- /dev/null +++ b/src/mqtt/create_options.h @@ -0,0 +1,261 @@ +///////////////////////////////////////////////////////////////////////////// +/// @file create_options.h +/// Declaration of MQTT create_options class +/// @date Oct 17, 2020 +/// @author Frank Pagliughi +///////////////////////////////////////////////////////////////////////////// + +/******************************************************************************* + * Copyright (c) 2020 Frank Pagliughi + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Frank Pagliughi - initial implementation and documentation + *******************************************************************************/ + +#ifndef __mqtt_create_options_h +#define __mqtt_create_options_h + +#include "MQTTAsync.h" +#include "mqtt/types.h" + +namespace mqtt { + +///////////////////////////////////////////////////////////////////////////// + +/** + * Options for creating a client object. + */ +class create_options +{ + /** The default C struct */ + static const MQTTAsync_createOptions DFLT_C_STRUCT; + + /** The underlying C options */ + MQTTAsync_createOptions opts_; + + /** The client and tests have special access */ + friend class async_client; + friend class create_options_builder; + +public: + /** Smart/shared pointer to an object of this class. */ + using ptr_t = std::shared_ptr; + /** Smart/shared pointer to a const object of this class. */ + using const_ptr_t = std::shared_ptr; + + /** + * Default set of client create options. + */ + create_options() : opts_(DFLT_C_STRUCT) {} + /** + * Gets whether the client will accept message to publish while + * disconnected. + */ + bool get_send_while_disconnected() const { + return to_bool(opts_.sendWhileDisconnected); + } + /** + * Sets whether the client will accept message to publish while + * disconnected. + * + * @param on @em true to allow the application to publish messages while + * disconnected, @false returns an error on publish if + * disconnected. + * @param anyTime If @em true, allows you to publish messages before the + * first successful connection. + */ + void set_send_while_disconnected(bool on, bool anyTime=false) { + opts_.sendWhileDisconnected = to_int(on); + opts_.allowDisconnectedSendAtAnyTime = to_int(anyTime); + } + /** + * Gets the maximum number of offline buffered messages. + * @return The maximum number of offline buffered messages. + */ + int get_max_buffered_messages() const { + return opts_.maxBufferedMessages; + } + /** + * Sets the maximum number of offline buffered messages. + * @param n The maximum number of offline buffered messages. + */ + void set_max_buffered_messages(int n) { + opts_.maxBufferedMessages = n; + } + /** + * Gets the MQTT version used to create the client. + * @return The MQTT version used to create the client. + */ + int mqtt_version() const { return opts_.MQTTVersion; } + /** + * Sets the MQTT version used to create the client. + * @param ver The MQTT version used to create the client. + */ + void set_mqtt_verison(int ver) { opts_.MQTTVersion = ver; } + /** + * Whether the oldest messages are deleted when the output buffer is + * full. + * + * @return @em true if the oldest messages should be deleted when the + * output buffer is full, @em false if the new messages should + * be dropped when the buffer is full. + */ + bool get_delete_oldest_messages() const { + return to_bool(opts_.deleteOldestMessages); + } + /** + * Determines what to do when the maximum number of buffered messages is + * reached: delete the oldest messages rather than the newest + * @param on @em true When the output queue is full, delete the oldest + * message, @em false drop the newest message being added. + */ + void set_delete_oldest_messages(bool on) { + opts_.deleteOldestMessages = to_int(on); + } + /** + * Whether the messages will be restored from persistence or the store + * will be cleared. + * + * @return @em true if the messages will be restored from persistence, + * @em false if the persistence store will be cleared. + */ + bool get_restore_messages() const { + return to_bool(opts_.restoreMessages); + } + /** + * Determine whether to restore messages from persistence or clear the + * persistence store. + * + * @param on @true to restore messages from persistence, @false to clear + * the persistence store. + */ + void set_restore_messages(bool on) { + opts_.restoreMessages = to_int(on); + } + /** + * Whether to persist QoS 0 messages. + * + * @return @em true if QoS 0 messages are persisted, @em false if not. + */ + bool get_persist_qos0() const { + return to_bool(opts_.persistQoS0); + } + /** + * Determeine whether to persist QoS 0 messages. + * + * @param on @em true if QoS 0 messages are persisted, @em false if not. + */ + void set_persist_qos0(bool on) { + opts_.persistQoS0 = to_int(on); + } +}; + +/** Smart/shared pointer to a connection options object. */ +using create_options_ptr = create_options::ptr_t; + +///////////////////////////////////////////////////////////////////////////// + +/** + * Builder class to generate the create options. + */ +class create_options_builder +{ + /** The underlying options */ + create_options opts_; + +public: + /** This class */ + using self = create_options_builder; + /** + * Default constructor. + */ + create_options_builder() {} + /** + * + * Sets whether the client will accept message to publish while + * disconnected. + * + * @param on @em true to allow the application to publish messages while + * disconnected, @false returns an error on publish if + * disconnected. + * @param anyTime If @em true, allows you to publish messages before the + * first successful connection. + * @return A reference to this object. + */ + auto send_while_disconnected(bool on=true, bool anyTime=false) -> self& { + opts_.opts_.sendWhileDisconnected = to_int(on); + opts_.opts_.allowDisconnectedSendAtAnyTime = to_int(anyTime); + return *this; + } + /** + * Sets the maximum number of offline buffered messages. + * @param n The maximum number of offline buffered messages. + * @return A reference to this object. + */ + auto max_buffered_messages(int n) -> self& { + opts_.opts_.maxBufferedMessages = n; + return *this; + } + /** + * Sets the MQTT version used to create the client. + * @param ver The MQTT version used to create the client. + */ + auto set_mqtt_verison(int ver) -> self& { + opts_.opts_.MQTTVersion = ver; + return *this; + } + /** + * Determines what to do when the maximum number of buffered messages is + * reached: delete the oldest messages rather than the newest. + * @param on @em true When the output queue is full, delete the oldest + * message, @em false drop the newest message being added. + * @return A reference to this object. + */ + auto delete_oldest_messages(bool on=true) -> self& { + opts_.opts_.deleteOldestMessages = to_int(on); + return *this; + } + /** + * Determines whether to restore persisted messsages or clear the + * persistence store. (Defaults true) + * + * @param on @em true to retore persisted messages, @em false to clear + * the persistence store. + * @return A reference to this object. + */ + auto restore_messages(bool on=true) -> self& { + opts_.opts_.restoreMessages = to_int(on); + return *this; + } + /** + * Whether to persist QoS 0 messages. (Defaults true) + * + * @param on @em true persist QoS 0 messages, @em false, don't. + * @return A reference to this object + */ + auto persist_qos0(bool on=true) -> self& { + opts_.opts_.persistQoS0 = to_int(on); + return *this; + } + /** + * Finish building the options and return them. + * @return The option struct as built. + */ + create_options finalize() { return opts_; } +}; + +///////////////////////////////////////////////////////////////////////////// +// end namespace mqtt +} + +#endif // __mqtt_create_options_h + diff --git a/src/mqtt/properties.h b/src/mqtt/properties.h index bbe977c..2bef27a 100644 --- a/src/mqtt/properties.h +++ b/src/mqtt/properties.h @@ -314,9 +314,13 @@ public: * @return The number of bytes required for the serialized * struct. */ + #if 0 + // Note: This isn't exported by the shared library. Perhaps we can change + // that in the upstream C lib. size_t byte_length() const { return (size_t) ::MQTTProperties_len(const_cast(&props_)); } + #endif /** * Adds a property to the list. * @param prop The property to add to the list. diff --git a/src/samples/async_publish_time.cpp b/src/samples/async_publish_time.cpp index 8488f41..4032a1d 100644 --- a/src/samples/async_publish_time.cpp +++ b/src/samples/async_publish_time.cpp @@ -18,14 +18,16 @@ // The sample demonstrates: // - Connecting to an MQTT server/broker // - Sampling a value -// - Publishing messages +// - Publishing messages using a `topic` object // - Last will and testament // - Callbacks with lambdas -// - Implementing callbacks and action listeners +// - Using `create_options` +// - Creating options with builder classes +// - Offline buffering in the client // /******************************************************************************* - * Copyright (c) 2019 Frank Pagliughi + * Copyright (c) 2019-2020 Frank Pagliughi * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 @@ -86,9 +88,18 @@ int main(int argc, char* argv[]) uint64_t trun = (argc > 2) ? stoll(argv[2]) : 0LL; cout << "Initializing for server '" << address << "'..." << endl; - mqtt::async_client cli(address, "", MAX_BUFFERED_MESSAGES); - // Set callbacks for connected and connection lost. + // We configure to allow publishing to the client while off-line, + // and that it's OK to do so before the 1st successful connection. + auto create_opts = mqtt::create_options_builder() + .send_while_disconnected(true, true) + .max_buffered_messages(MAX_BUFFERED_MESSAGES) + .delete_oldest_messages() + .finalize(); + + mqtt::async_client cli(address, "", create_opts); + + // Set callbacks for when connected and connection lost. cli.set_connected_handler([&cli](const std::string&) { std::cout << "*** Connected (" @@ -107,12 +118,17 @@ int main(int argc, char* argv[]) connopts.set_automatic_reconnect(1, 10); try { - cout << "Connecting..." << endl; - cli.connect(connopts)->wait(); + // Note that we start the connection, but don't wait for completion. + // We configured to allow publishing before a successful connection. + cout << "Starting connection..." << endl; + cli.connect(connopts); mqtt::topic top(cli, "data/time", QOS); cout << "Publishing data..." << endl; + while (timestamp() % DELTA_MS != 0) + ; + uint64_t t = timestamp(), tlast = t, tstart = t; @@ -123,6 +139,7 @@ int main(int argc, char* argv[]) this_thread::sleep_for(SAMPLE_PERIOD); t = timestamp(); + cout << t << endl; if (abs(int(t - tlast)) >= DELTA_MS) top.publish(to_string(tlast = t)); diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 01fcaa6..3ee7701 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -31,6 +31,7 @@ find_package(Catch2 REQUIRED) add_executable(unit_tests unit_tests.cpp test_buffer_ref.cpp test_connect_options.cpp + test_create_options.cpp test_message.cpp test_properties.cpp test_string_collection.cpp diff --git a/test/unit/test_create_options.cpp b/test/unit/test_create_options.cpp new file mode 100644 index 0000000..61b3550 --- /dev/null +++ b/test/unit/test_create_options.cpp @@ -0,0 +1,79 @@ +// test_create_options.cpp +// +// Unit tests for the create_options class in the Paho MQTT C++ library. +// + +/******************************************************************************* + * Copyright (c) 2020 Frank Pagliughi + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Frank Pagliughi - initial implementation and documentation + *******************************************************************************/ + +#define UNIT_TESTS + +#include +#include "catch2/catch.hpp" +#include "mqtt/create_options.h" +#include "mock_async_client.h" + +using namespace mqtt; + +// ---------------------------------------------------------------------- +// Test the default constructor +// ---------------------------------------------------------------------- + +TEST_CASE("create_options default ctor", "[options]") +{ + mqtt::create_options opts; + + REQUIRE(!opts.get_send_while_disconnected()); + REQUIRE(!opts.get_delete_oldest_messages()); + + REQUIRE(opts.get_restore_messages()); + REQUIRE(opts.get_persist_qos0()); +} + +///////////////////////////////////////////////////////////////////////////// +// create_options_builder +///////////////////////////////////////////////////////////////////////////// + + +// ---------------------------------------------------------------------- +// Test the default constructor +// ---------------------------------------------------------------------- + +TEST_CASE("create_options_builder default ctor", "[options]") +{ + const auto opts = create_options_builder() + .finalize(); + + REQUIRE(!opts.get_send_while_disconnected()); + REQUIRE(!opts.get_delete_oldest_messages()); + + REQUIRE(opts.get_restore_messages()); + REQUIRE(opts.get_persist_qos0()); +} + +TEST_CASE("create_options_builder sets", "[options]") +{ + const auto opts = create_options_builder() + .send_while_disconnected() + .delete_oldest_messages() + .finalize(); + + REQUIRE(opts.get_send_while_disconnected()); + REQUIRE(opts.get_delete_oldest_messages()); + + REQUIRE(opts.get_restore_messages()); + REQUIRE(opts.get_persist_qos0()); +} diff --git a/test/unit/test_properties.cpp b/test/unit/test_properties.cpp index 33f286d..d81fafa 100644 --- a/test/unit/test_properties.cpp +++ b/test/unit/test_properties.cpp @@ -356,7 +356,7 @@ TEST_CASE("properties constructor", "[properties]") { properties props; REQUIRE(props.size() == 0); - REQUIRE(props.byte_length() == 1); + //REQUIRE(props.byte_length() == 1); } } @@ -382,7 +382,7 @@ TEST_CASE("properties clear", "[properties]") { props.clear(); REQUIRE(props.size() == 0); - REQUIRE(props.byte_length() == 1); + //REQUIRE(props.byte_length() == 1); } }