Implemented create_options and put them into the 'async_publish_time' sample app

This commit is contained in:
fpagliughi 2020-10-18 09:47:53 -04:00
parent 9bc5403b62
commit 5db5b038a0
11 changed files with 520 additions and 37 deletions

View File

@ -32,11 +32,19 @@ To keep up with the latest announcements for this project, or to ask questions:
### Unreleased Features in this Branch
- New `create_options` that can be used to construct a client with new features:
- Send while disconnected before the 1st successful connection
- Output buffer can delete oldest messages when full
- Can choose to clear the persistence store on startup
- Select whether to persist QoS 0 messages
- Started classes to create options using the Builder Pattern, with the `create_options_builder`.
- [#231] Added `on_disconnected` callback to handle receipt of disconnect packet from server.
- [#211, 223, #235] Removed use of Log() function from the Paho C library.
- [#227] Fixed race condition in thread-safe queue
- [#224] & [#255] Subscribing to MQTT v3 broker with array of one topic causes segfault.
Targets Paho C v1.3.6
### New Features in Paho C++ v1.1
- MQTT v5 support:

View File

@ -42,20 +42,21 @@ find_package(Threads REQUIRED)
## --- Use object library to optimize compilation ---
add_library(paho-cpp-objs OBJECT
async_client.cpp
client.cpp
disconnect_options.cpp
iclient_persistence.cpp
message.cpp
properties.cpp
response_options.cpp
ssl_options.cpp
string_collection.cpp
subscribe_options.cpp
token.cpp
topic.cpp
connect_options.cpp
will_options.cpp
async_client.cpp
client.cpp
connect_options.cpp
create_options.cpp
disconnect_options.cpp
iclient_persistence.cpp
message.cpp
properties.cpp
response_options.cpp
ssl_options.cpp
string_collection.cpp
subscribe_options.cpp
token.cpp
topic.cpp
will_options.cpp
)
## install the shared library

View File

@ -29,11 +29,6 @@
#include <cstring>
#include <cstdio>
// TODO: Delete this when #680 is merged into the Paho C lib
#if !defined(MQTTAsync_createOptions_initializer5)
#define MQTTAsync_createOptions_initializer5 { {'M', 'Q', 'C', 'O'}, 0, 0, 100, MQTTVERSION_5 }
#endif
namespace mqtt {
/////////////////////////////////////////////////////////////////////////////
@ -56,17 +51,17 @@ async_client::async_client(const string& serverURI, const string& clientId,
: serverURI_(serverURI), clientId_(clientId), mqttVersion_(MQTTVERSION_DEFAULT),
persist_(nullptr), userCallback_(nullptr)
{
MQTTAsync_createOptions opts MQTTAsync_createOptions_initializer5;
create_options opts;
if (maxBufferedMessages != 0) {
opts.sendWhileDisconnected = to_int(true);
opts.maxBufferedMessages = maxBufferedMessages;
opts.set_send_while_disconnected(true);
opts.set_max_buffered_messages(maxBufferedMessages);
}
int rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(),
MQTTCLIENT_PERSISTENCE_DEFAULT,
const_cast<char*>(persistDir.c_str()),
&opts);
&opts.opts_);
if (rc != 0)
throw exception(rc);
@ -77,18 +72,19 @@ async_client::async_client(const string& serverURI, const string& clientId,
: serverURI_(serverURI), clientId_(clientId), mqttVersion_(MQTTVERSION_DEFAULT),
persist_(nullptr), userCallback_(nullptr)
{
MQTTAsync_createOptions opts MQTTAsync_createOptions_initializer5;
create_options opts;
if (maxBufferedMessages != 0) {
opts.sendWhileDisconnected = to_int(true);
opts.maxBufferedMessages = maxBufferedMessages;
opts.set_send_while_disconnected(true);
opts.set_max_buffered_messages(maxBufferedMessages);
}
int rc = MQTTASYNC_SUCCESS;
if (!persistence) {
rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(),
MQTTCLIENT_PERSISTENCE_NONE, nullptr, &opts);
MQTTCLIENT_PERSISTENCE_NONE, nullptr,
&opts.opts_);
}
else {
persist_.reset(new MQTTClient_persistence {
@ -105,7 +101,59 @@ async_client::async_client(const string& serverURI, const string& clientId,
rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(),
MQTTCLIENT_PERSISTENCE_USER, persist_.get(),
&opts);
&opts.opts_);
}
if (rc != 0)
throw exception(rc);
}
async_client::async_client(const string& serverURI, const string& clientId,
const create_options& opts,
const string& persistDir)
: serverURI_(serverURI), clientId_(clientId),
mqttVersion_(opts.opts_.MQTTVersion),
persist_(nullptr), userCallback_(nullptr)
{
int rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(),
MQTTCLIENT_PERSISTENCE_DEFAULT,
const_cast<char*>(persistDir.c_str()),
const_cast<MQTTAsync_createOptions*>(&opts.opts_));
if (rc != 0)
throw exception(rc);
}
async_client::async_client(const string& serverURI, const string& clientId,
const create_options& opts,
iclient_persistence* persistence /*=nullptr*/)
: serverURI_(serverURI), clientId_(clientId),
mqttVersion_(opts.opts_.MQTTVersion),
persist_(nullptr), userCallback_(nullptr)
{
int rc = MQTTASYNC_SUCCESS;
if (!persistence) {
rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(),
MQTTCLIENT_PERSISTENCE_NONE, nullptr,
const_cast<MQTTAsync_createOptions*>(&opts.opts_));
}
else {
persist_.reset(new MQTTClient_persistence {
persistence,
&iclient_persistence::persistence_open,
&iclient_persistence::persistence_close,
&iclient_persistence::persistence_put,
&iclient_persistence::persistence_get,
&iclient_persistence::persistence_remove,
&iclient_persistence::persistence_keys,
&iclient_persistence::persistence_clear,
&iclient_persistence::persistence_containskey
});
rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(),
MQTTCLIENT_PERSISTENCE_USER, persist_.get(),
const_cast<MQTTAsync_createOptions*>(&opts.opts_));
}
if (rc != 0)
throw exception(rc);

30
src/create_options.cpp Normal file
View File

@ -0,0 +1,30 @@
/*******************************************************************************
* Copyright (c) 2020 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#include "mqtt/create_options.h"
#include <cstring>
namespace mqtt {
/////////////////////////////////////////////////////////////////////////////
const MQTTAsync_createOptions create_options::DFLT_C_STRUCT =
MQTTAsync_createOptions_initializer;
/////////////////////////////////////////////////////////////////////////////
} // end namespace mqtt

View File

@ -28,6 +28,7 @@
#include "MQTTAsync.h"
#include "mqtt/types.h"
#include "mqtt/token.h"
#include "mqtt/create_options.h"
#include "mqtt/string_collection.h"
#include "mqtt/delivery_token.h"
#include "mqtt/iclient_persistence.h"
@ -214,6 +215,39 @@ public:
*/
async_client(const string& serverURI, const string& clientId,
int maxBufferedMessages, iclient_persistence* persistence=nullptr);
/**
* Create an async_client that can be used to communicate with an MQTT
* server, which allows for off-line message buffering.
* This uses file-based persistence in the specified directory.
* @param serverURI the address of the server to connect to, specified
* as a URI.
* @param clientId a client identifier that is unique on the server
* being connected to
* @param opts The create options
* @param persistDir The directory to use for persistence data
* @throw exception if an argument is invalid
*/
async_client(const string& serverURI, const string& clientId,
const create_options& opts, const string& persistDir);
/**
* Create an async_client that can be used to communicate with an MQTT
* server, which allows for off-line message buffering.
* This allows the caller to specify a user-defined persistence object,
* or use no persistence.
* @param serverURI the address of the server to connect to, specified
* as a URI.
* @param clientId a client identifier that is unique on the server
* being connected to
* @param maxBufferedMessages the maximum number of messages allowed to
* be buffered while not connected
* @param persistence The user persistence structure. If this is null,
* then no persistence is used.
* @throw exception if an argument is invalid
*/
async_client(const string& serverURI, const string& clientId,
const create_options& opts,
iclient_persistence* persistence=nullptr);
/**
* Destructor
*/

261
src/mqtt/create_options.h Normal file
View File

@ -0,0 +1,261 @@
/////////////////////////////////////////////////////////////////////////////
/// @file create_options.h
/// Declaration of MQTT create_options class
/// @date Oct 17, 2020
/// @author Frank Pagliughi
/////////////////////////////////////////////////////////////////////////////
/*******************************************************************************
* Copyright (c) 2020 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#ifndef __mqtt_create_options_h
#define __mqtt_create_options_h
#include "MQTTAsync.h"
#include "mqtt/types.h"
namespace mqtt {
/////////////////////////////////////////////////////////////////////////////
/**
* Options for creating a client object.
*/
class create_options
{
/** The default C struct */
static const MQTTAsync_createOptions DFLT_C_STRUCT;
/** The underlying C options */
MQTTAsync_createOptions opts_;
/** The client and tests have special access */
friend class async_client;
friend class create_options_builder;
public:
/** Smart/shared pointer to an object of this class. */
using ptr_t = std::shared_ptr<create_options>;
/** Smart/shared pointer to a const object of this class. */
using const_ptr_t = std::shared_ptr<const create_options>;
/**
* Default set of client create options.
*/
create_options() : opts_(DFLT_C_STRUCT) {}
/**
* Gets whether the client will accept message to publish while
* disconnected.
*/
bool get_send_while_disconnected() const {
return to_bool(opts_.sendWhileDisconnected);
}
/**
* Sets whether the client will accept message to publish while
* disconnected.
*
* @param on @em true to allow the application to publish messages while
* disconnected, @false returns an error on publish if
* disconnected.
* @param anyTime If @em true, allows you to publish messages before the
* first successful connection.
*/
void set_send_while_disconnected(bool on, bool anyTime=false) {
opts_.sendWhileDisconnected = to_int(on);
opts_.allowDisconnectedSendAtAnyTime = to_int(anyTime);
}
/**
* Gets the maximum number of offline buffered messages.
* @return The maximum number of offline buffered messages.
*/
int get_max_buffered_messages() const {
return opts_.maxBufferedMessages;
}
/**
* Sets the maximum number of offline buffered messages.
* @param n The maximum number of offline buffered messages.
*/
void set_max_buffered_messages(int n) {
opts_.maxBufferedMessages = n;
}
/**
* Gets the MQTT version used to create the client.
* @return The MQTT version used to create the client.
*/
int mqtt_version() const { return opts_.MQTTVersion; }
/**
* Sets the MQTT version used to create the client.
* @param ver The MQTT version used to create the client.
*/
void set_mqtt_verison(int ver) { opts_.MQTTVersion = ver; }
/**
* Whether the oldest messages are deleted when the output buffer is
* full.
*
* @return @em true if the oldest messages should be deleted when the
* output buffer is full, @em false if the new messages should
* be dropped when the buffer is full.
*/
bool get_delete_oldest_messages() const {
return to_bool(opts_.deleteOldestMessages);
}
/**
* Determines what to do when the maximum number of buffered messages is
* reached: delete the oldest messages rather than the newest
* @param on @em true When the output queue is full, delete the oldest
* message, @em false drop the newest message being added.
*/
void set_delete_oldest_messages(bool on) {
opts_.deleteOldestMessages = to_int(on);
}
/**
* Whether the messages will be restored from persistence or the store
* will be cleared.
*
* @return @em true if the messages will be restored from persistence,
* @em false if the persistence store will be cleared.
*/
bool get_restore_messages() const {
return to_bool(opts_.restoreMessages);
}
/**
* Determine whether to restore messages from persistence or clear the
* persistence store.
*
* @param on @true to restore messages from persistence, @false to clear
* the persistence store.
*/
void set_restore_messages(bool on) {
opts_.restoreMessages = to_int(on);
}
/**
* Whether to persist QoS 0 messages.
*
* @return @em true if QoS 0 messages are persisted, @em false if not.
*/
bool get_persist_qos0() const {
return to_bool(opts_.persistQoS0);
}
/**
* Determeine whether to persist QoS 0 messages.
*
* @param on @em true if QoS 0 messages are persisted, @em false if not.
*/
void set_persist_qos0(bool on) {
opts_.persistQoS0 = to_int(on);
}
};
/** Smart/shared pointer to a connection options object. */
using create_options_ptr = create_options::ptr_t;
/////////////////////////////////////////////////////////////////////////////
/**
* Builder class to generate the create options.
*/
class create_options_builder
{
/** The underlying options */
create_options opts_;
public:
/** This class */
using self = create_options_builder;
/**
* Default constructor.
*/
create_options_builder() {}
/**
*
* Sets whether the client will accept message to publish while
* disconnected.
*
* @param on @em true to allow the application to publish messages while
* disconnected, @false returns an error on publish if
* disconnected.
* @param anyTime If @em true, allows you to publish messages before the
* first successful connection.
* @return A reference to this object.
*/
auto send_while_disconnected(bool on=true, bool anyTime=false) -> self& {
opts_.opts_.sendWhileDisconnected = to_int(on);
opts_.opts_.allowDisconnectedSendAtAnyTime = to_int(anyTime);
return *this;
}
/**
* Sets the maximum number of offline buffered messages.
* @param n The maximum number of offline buffered messages.
* @return A reference to this object.
*/
auto max_buffered_messages(int n) -> self& {
opts_.opts_.maxBufferedMessages = n;
return *this;
}
/**
* Sets the MQTT version used to create the client.
* @param ver The MQTT version used to create the client.
*/
auto set_mqtt_verison(int ver) -> self& {
opts_.opts_.MQTTVersion = ver;
return *this;
}
/**
* Determines what to do when the maximum number of buffered messages is
* reached: delete the oldest messages rather than the newest.
* @param on @em true When the output queue is full, delete the oldest
* message, @em false drop the newest message being added.
* @return A reference to this object.
*/
auto delete_oldest_messages(bool on=true) -> self& {
opts_.opts_.deleteOldestMessages = to_int(on);
return *this;
}
/**
* Determines whether to restore persisted messsages or clear the
* persistence store. (Defaults true)
*
* @param on @em true to retore persisted messages, @em false to clear
* the persistence store.
* @return A reference to this object.
*/
auto restore_messages(bool on=true) -> self& {
opts_.opts_.restoreMessages = to_int(on);
return *this;
}
/**
* Whether to persist QoS 0 messages. (Defaults true)
*
* @param on @em true persist QoS 0 messages, @em false, don't.
* @return A reference to this object
*/
auto persist_qos0(bool on=true) -> self& {
opts_.opts_.persistQoS0 = to_int(on);
return *this;
}
/**
* Finish building the options and return them.
* @return The option struct as built.
*/
create_options finalize() { return opts_; }
};
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}
#endif // __mqtt_create_options_h

View File

@ -314,9 +314,13 @@ public:
* @return The number of bytes required for the serialized
* struct.
*/
#if 0
// Note: This isn't exported by the shared library. Perhaps we can change
// that in the upstream C lib.
size_t byte_length() const {
return (size_t) ::MQTTProperties_len(const_cast<MQTTProperties*>(&props_));
}
#endif
/**
* Adds a property to the list.
* @param prop The property to add to the list.

View File

@ -18,14 +18,16 @@
// The sample demonstrates:
// - Connecting to an MQTT server/broker
// - Sampling a value
// - Publishing messages
// - Publishing messages using a `topic` object
// - Last will and testament
// - Callbacks with lambdas
// - Implementing callbacks and action listeners
// - Using `create_options`
// - Creating options with builder classes
// - Offline buffering in the client
//
/*******************************************************************************
* Copyright (c) 2019 Frank Pagliughi <fpagliughi@mindspring.com>
* Copyright (c) 2019-2020 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
@ -86,9 +88,18 @@ int main(int argc, char* argv[])
uint64_t trun = (argc > 2) ? stoll(argv[2]) : 0LL;
cout << "Initializing for server '" << address << "'..." << endl;
mqtt::async_client cli(address, "", MAX_BUFFERED_MESSAGES);
// Set callbacks for connected and connection lost.
// We configure to allow publishing to the client while off-line,
// and that it's OK to do so before the 1st successful connection.
auto create_opts = mqtt::create_options_builder()
.send_while_disconnected(true, true)
.max_buffered_messages(MAX_BUFFERED_MESSAGES)
.delete_oldest_messages()
.finalize();
mqtt::async_client cli(address, "", create_opts);
// Set callbacks for when connected and connection lost.
cli.set_connected_handler([&cli](const std::string&) {
std::cout << "*** Connected ("
@ -107,12 +118,17 @@ int main(int argc, char* argv[])
connopts.set_automatic_reconnect(1, 10);
try {
cout << "Connecting..." << endl;
cli.connect(connopts)->wait();
// Note that we start the connection, but don't wait for completion.
// We configured to allow publishing before a successful connection.
cout << "Starting connection..." << endl;
cli.connect(connopts);
mqtt::topic top(cli, "data/time", QOS);
cout << "Publishing data..." << endl;
while (timestamp() % DELTA_MS != 0)
;
uint64_t t = timestamp(),
tlast = t,
tstart = t;
@ -123,6 +139,7 @@ int main(int argc, char* argv[])
this_thread::sleep_for(SAMPLE_PERIOD);
t = timestamp();
cout << t << endl;
if (abs(int(t - tlast)) >= DELTA_MS)
top.publish(to_string(tlast = t));

View File

@ -31,6 +31,7 @@ find_package(Catch2 REQUIRED)
add_executable(unit_tests unit_tests.cpp
test_buffer_ref.cpp
test_connect_options.cpp
test_create_options.cpp
test_message.cpp
test_properties.cpp
test_string_collection.cpp

View File

@ -0,0 +1,79 @@
// test_create_options.cpp
//
// Unit tests for the create_options class in the Paho MQTT C++ library.
//
/*******************************************************************************
* Copyright (c) 2020 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#define UNIT_TESTS
#include <cstring>
#include "catch2/catch.hpp"
#include "mqtt/create_options.h"
#include "mock_async_client.h"
using namespace mqtt;
// ----------------------------------------------------------------------
// Test the default constructor
// ----------------------------------------------------------------------
TEST_CASE("create_options default ctor", "[options]")
{
mqtt::create_options opts;
REQUIRE(!opts.get_send_while_disconnected());
REQUIRE(!opts.get_delete_oldest_messages());
REQUIRE(opts.get_restore_messages());
REQUIRE(opts.get_persist_qos0());
}
/////////////////////////////////////////////////////////////////////////////
// create_options_builder
/////////////////////////////////////////////////////////////////////////////
// ----------------------------------------------------------------------
// Test the default constructor
// ----------------------------------------------------------------------
TEST_CASE("create_options_builder default ctor", "[options]")
{
const auto opts = create_options_builder()
.finalize();
REQUIRE(!opts.get_send_while_disconnected());
REQUIRE(!opts.get_delete_oldest_messages());
REQUIRE(opts.get_restore_messages());
REQUIRE(opts.get_persist_qos0());
}
TEST_CASE("create_options_builder sets", "[options]")
{
const auto opts = create_options_builder()
.send_while_disconnected()
.delete_oldest_messages()
.finalize();
REQUIRE(opts.get_send_while_disconnected());
REQUIRE(opts.get_delete_oldest_messages());
REQUIRE(opts.get_restore_messages());
REQUIRE(opts.get_persist_qos0());
}

View File

@ -356,7 +356,7 @@ TEST_CASE("properties constructor", "[properties]") {
properties props;
REQUIRE(props.size() == 0);
REQUIRE(props.byte_length() == 1);
//REQUIRE(props.byte_length() == 1);
}
}
@ -382,7 +382,7 @@ TEST_CASE("properties clear", "[properties]") {
props.clear();
REQUIRE(props.size() == 0);
REQUIRE(props.byte_length() == 1);
//REQUIRE(props.byte_length() == 1);
}
}