Added the topic_collection class to cache C pointers to the topics. Removed alloc/free topic pointers in the async_client. Removed separate topic_filter_collection class.

This commit is contained in:
fmp 2017-04-23 13:37:03 -04:00
parent 44a4bddf86
commit d744264876
14 changed files with 50 additions and 79 deletions

View File

@ -35,6 +35,7 @@ libpaho_mqttpp3_la_SOURCES += src/message.cpp
libpaho_mqttpp3_la_SOURCES += src/response_options.cpp
libpaho_mqttpp3_la_SOURCES += src/token.cpp
libpaho_mqttpp3_la_SOURCES += src/topic.cpp
libpaho_mqttpp3_la_SOURCES += src/topic_collection.cpp
libpaho_mqttpp3_la_SOURCES += src/connect_options.cpp
libpaho_mqttpp3_la_SOURCES += src/will_options.cpp
if PAHO_WITH_SSL
@ -94,6 +95,7 @@ include_HEADERS += src/mqtt/response_options.h
include_HEADERS += src/mqtt/thread_queue.h
include_HEADERS += src/mqtt/token.h
include_HEADERS += src/mqtt/topic.h
include_HEADERS += src/mqtt/topic_collection.h
include_HEADERS += src/mqtt/types.h
include_HEADERS += src/mqtt/will_options.h
if PAHO_WITH_SSL

View File

@ -15,6 +15,8 @@
COMPILERS="g++-4.8 g++-4.9 g++-5 g++-6 clang++-3.6 clang++-3.8"
[ "$#" -gt 0 ] && COMPILERS="$@"
[ -z "${BUILD_JOBS}" ] && BUILD_JOBS=4
for COMPILER in $COMPILERS
do
if [ -z "$(which ${COMPILER})" ]; then
@ -22,7 +24,7 @@ do
else
printf "===== Testing: %s =====\n\n" "${COMPILER}"
make distclean
if ! make CXX=${COMPILER} SSL=1 ; then
if ! make CXX=${COMPILER} SSL=1 -j${BUILD_JOBS} ; then
printf "\nCompilation failed for %s\n" "${COMPILER}"
exit 1
fi

View File

@ -44,6 +44,7 @@ set(COMMON_SRC
response_options.cpp
token.cpp
topic.cpp
topic_collection.cpp
connect_options.cpp
will_options.cpp)

View File

@ -201,24 +201,6 @@ void async_client::remove_token(token* tok)
}
}
std::vector<char*> async_client::alloc_topic_filters(
const topic_filter_collection& topicFilters)
{
std::vector<char*> filts;
for (const auto& t : topicFilters) {
char* filt = new char[t.size()+1];
std::strcpy(filt, t.c_str());
filts.push_back(filt);
}
return filts;
}
void async_client::free_topic_filters(std::vector<char*>& filts)
{
for (const auto& f : filts)
delete[] f;
}
// --------------------------------------------------------------------------
// Connect
@ -436,25 +418,25 @@ void async_client::set_callback(callback& cb)
// --------------------------------------------------------------------------
// Subscribe
token_ptr async_client::subscribe(const topic_filter_collection& topicFilters,
token_ptr async_client::subscribe(const topic_collection& topicFilters,
const qos_collection& qos)
{
if (topicFilters.size() != qos.size())
throw std::invalid_argument("Collection sizes don't match");
size_t n = topicFilters.size();
std::vector<char*> filts = alloc_topic_filters(topicFilters);
if (n != qos.size())
throw std::invalid_argument("Collection sizes don't match");
auto tok = token::create(*this, topicFilters);
add_token(tok);
response_options opts(tok);
int rc = MQTTAsync_subscribeMany(cli_, static_cast<int>(topicFilters.size()),
static_cast<char**>(&filts[0]),
const_cast<int*>(&qos[0]), &opts.opts_);
int rc = MQTTAsync_subscribeMany(cli_, int(n),
topicFilters.c_arr(),
const_cast<int*>(&qos[0]),
&opts.opts_);
free_topic_filters(filts);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);
@ -463,27 +445,25 @@ token_ptr async_client::subscribe(const topic_filter_collection& topicFilters,
return tok;
}
token_ptr async_client::subscribe(const topic_filter_collection& topicFilters,
token_ptr async_client::subscribe(const topic_collection& topicFilters,
const qos_collection& qos,
void* userContext, iaction_listener& cb)
{
if (topicFilters.size() != qos.size())
size_t n = topicFilters.size();
if (n != qos.size())
throw std::invalid_argument("Collection sizes don't match");
std::vector<char*> filts = alloc_topic_filters(topicFilters);
// No exceptions till C-strings are deleted!
auto tok = token::create(*this, topicFilters, userContext, cb);
add_token(tok);
response_options opts(tok);
int rc = MQTTAsync_subscribeMany(cli_, static_cast<int>(topicFilters.size()),
static_cast<char**>(&filts[0]),
const_cast<int*>(&qos[0]), &opts.opts_);
int rc = MQTTAsync_subscribeMany(cli_, int(n),
topicFilters.c_arr(),
const_cast<int*>(&qos[0]),
&opts.opts_);
free_topic_filters(filts);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);
@ -547,20 +527,18 @@ token_ptr async_client::unsubscribe(const string& topicFilter)
return tok;
}
token_ptr async_client::unsubscribe(const topic_filter_collection& topicFilters)
token_ptr async_client::unsubscribe(const topic_collection& topicFilters)
{
size_t n = topicFilters.size();
std::vector<char*> filts = alloc_topic_filters(topicFilters);
auto tok = token::create(*this, topicFilters);
add_token(tok);
response_options opts(tok);
int rc = MQTTAsync_unsubscribeMany(cli_, static_cast<int>(n),
static_cast<char**>(&filts[0]), &opts.opts_);
int rc = MQTTAsync_unsubscribeMany(cli_, int(n),
topicFilters.c_arr(), &opts.opts_);
free_topic_filters(filts);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);
@ -569,21 +547,18 @@ token_ptr async_client::unsubscribe(const topic_filter_collection& topicFilters)
return tok;
}
token_ptr async_client::unsubscribe(const topic_filter_collection& topicFilters,
token_ptr async_client::unsubscribe(const topic_collection& topicFilters,
void* userContext, iaction_listener& cb)
{
size_t n = topicFilters.size();
std::vector<char*> filts = alloc_topic_filters(topicFilters);
auto tok = token::create(*this, topicFilters, userContext, cb);
add_token(tok);
response_options opts(tok);
int rc = MQTTAsync_unsubscribeMany(cli_, static_cast<int>(n),
static_cast<char**>(&filts[0]), &opts.opts_);
int rc = MQTTAsync_unsubscribeMany(cli_, int(n), topicFilters.c_arr(), &opts.opts_);
free_topic_filters(filts);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);

View File

@ -93,7 +93,7 @@ void client::subscribe(const string& topicFilter)
cli_.subscribe(topicFilter, DFLT_QOS)->wait_for_completion(timeout_);
}
void client::subscribe(const topic_filter_collection& topicFilters)
void client::subscribe(const topic_collection& topicFilters)
{
qos_collection qos;
for (size_t i=0; i<topicFilters.size(); ++i)
@ -102,7 +102,7 @@ void client::subscribe(const topic_filter_collection& topicFilters)
cli_.subscribe(topicFilters, qos)->wait_for_completion(timeout_);
}
void client::subscribe(const topic_filter_collection& topicFilters,
void client::subscribe(const topic_collection& topicFilters,
const qos_collection& qos)
{
cli_.subscribe(topicFilters, qos)->wait_for_completion(timeout_);
@ -118,7 +118,7 @@ void client::unsubscribe(const string& topicFilter)
cli_.unsubscribe(topicFilter)->wait_for_completion(timeout_);
}
void client::unsubscribe(const topic_filter_collection& topicFilters)
void client::unsubscribe(const topic_collection& topicFilters)
{
cli_.unsubscribe(topicFilters)->wait_for_completion(timeout_);
}

View File

@ -35,6 +35,7 @@ set(COMMON_HDR
thread_queue.h
token.h
topic.h
topic_collection.h
types.h
will_options.h)

View File

@ -27,6 +27,7 @@
#include "MQTTAsync.h"
#include "mqtt/types.h"
#include "mqtt/token.h"
#include "mqtt/topic_collection.h"
#include "mqtt/delivery_token.h"
#include "mqtt/iclient_persistence.h"
#include "mqtt/iaction_listener.h"
@ -93,11 +94,6 @@ private:
virtual void remove_token(token_ptr tok) { remove_token(tok.get()); }
void remove_token(delivery_token_ptr tok) { remove_token(tok.get()); }
/** Memory management for C-style filter collections */
std::vector<char*> alloc_topic_filters(
const topic_filter_collection& topicFilters);
void free_topic_filters(std::vector<char*>& filts);
/**
* Convenience function to get user callback safely.
* @return callback*
@ -388,7 +384,7 @@ public:
* @return token used to track and wait for the subscribe to complete.
* The token will be passed to callback methods if set.
*/
token_ptr subscribe(const topic_filter_collection& topicFilters,
token_ptr subscribe(const topic_collection& topicFilters,
const qos_collection& qos) override;
/**
* Subscribes to multiple topics, each of which may include wildcards.
@ -404,7 +400,7 @@ public:
* @return token used to track and wait for the subscribe to complete.
* The token will be passed to callback methods if set.
*/
token_ptr subscribe(const topic_filter_collection& topicFilters,
token_ptr subscribe(const topic_collection& topicFilters,
const qos_collection& qos,
void* userContext, iaction_listener& cb) override;
/**
@ -450,7 +446,7 @@ public:
* @return token used to track and wait for the unsubscribe to complete.
* The token will be passed to callback methods if set.
*/
token_ptr unsubscribe(const topic_filter_collection& topicFilters) override;
token_ptr unsubscribe(const topic_collection& topicFilters) override;
/**
* Requests the server unsubscribe the client from one or more topics.
* @param topicFilters
@ -461,7 +457,7 @@ public:
* @return token used to track and wait for the unsubscribe to complete.
* The token will be passed to callback methods if set.
*/
token_ptr unsubscribe(const topic_filter_collection& topicFilters,
token_ptr unsubscribe(const topic_collection& topicFilters,
void* userContext, iaction_listener& cb) override;
/**
* Requests the server unsubscribe the client from a topics.

View File

@ -25,7 +25,6 @@
#define __mqtt_client_h
#include "mqtt/async_client.h"
#include "mqtt/types.h"
namespace mqtt {
@ -220,13 +219,13 @@ public:
* a QoS of 1.
* @param topicFilters A set of topics to subscribe
*/
virtual void subscribe(const topic_filter_collection& topicFilters);
virtual void subscribe(const topic_collection& topicFilters);
/**
* Subscribes to multiple topics, each of which may include wildcards.
* @param topicFilters A collection of topics to subscribe
* @param qos A collection of QoS for each topic
*/
virtual void subscribe(const topic_filter_collection& topicFilters,
virtual void subscribe(const topic_collection& topicFilters,
const qos_collection& qos);
/**
* Subscribe to a topic, which may include wildcards.
@ -243,7 +242,7 @@ public:
* Requests the server unsubscribe the client from one or more topics.
* @param topicFilters A collection of topics to unsubscribe.
*/
virtual void unsubscribe(const topic_filter_collection& topicFilters);
virtual void unsubscribe(const topic_collection& topicFilters);
};
/** Smart/shared pointer to an MQTT synchronous client object */

View File

@ -260,7 +260,7 @@ public:
* @return token used to track and wait for the subscribe to complete.
* The token will be passed to callback methods if set.
*/
virtual token_ptr subscribe(const topic_filter_collection& topicFilters,
virtual token_ptr subscribe(const topic_collection& topicFilters,
const qos_collection& qos) =0;
/**
* Subscribes to multiple topics, each of which may include wildcards.
@ -278,7 +278,7 @@ public:
* @return token used to track and wait for the subscribe to complete.
* The token will be passed to callback methods if set.
*/
virtual token_ptr subscribe(const topic_filter_collection& topicFilters,
virtual token_ptr subscribe(const topic_collection& topicFilters,
const qos_collection& qos,
void* userContext, iaction_listener& callback) =0;
/**
@ -328,7 +328,7 @@ public:
* @return token used to track and wait for the unsubscribe to complete.
* The token will be passed to callback methods if set.
*/
virtual token_ptr unsubscribe(const topic_filter_collection& topicFilters) =0;
virtual token_ptr unsubscribe(const topic_collection& topicFilters) =0;
/**
* Requests the server unsubscribe the client from one or more topics.
* @param topicFilters one or more topics to unsubscribe from. Each
@ -341,7 +341,7 @@ public:
* @return token used to track and wait for the unsubscribe to complete.
* The token will be passed to callback methods if set.
*/
virtual token_ptr unsubscribe(const topic_filter_collection& topicFilters,
virtual token_ptr unsubscribe(const topic_collection& topicFilters,
void* userContext, iaction_listener& cb) =0;
/**
* Requests the server unsubscribe the client from a topics.

View File

@ -28,6 +28,7 @@
#include "mqtt/iaction_listener.h"
#include "mqtt/exception.h"
#include "mqtt/types.h"
#include "mqtt/topic_collection.h"
#include <vector>
#include <thread>
#include <mutex>

View File

@ -46,12 +46,6 @@ using string_ptr = std::shared_ptr<const string>;
/** Smart/shared pointer to a const binary blob */
using binary_ptr = std::shared_ptr<const binary>;
/** Type for a collection of topics */
using topic_collection = std::vector<string>;
/** Type for a collection of filters */
using topic_filter_collection = std::vector<string>;
/////////////////////////////////////////////////////////////////////////////
// Time functions

View File

@ -104,7 +104,7 @@ class async_client_test : public CppUnit::TestFixture
const std::string TOPIC { "TOPIC" };
const int GOOD_QOS { 0 };
const int BAD_QOS { 3 };
mqtt::topic_filter_collection TOPIC_COLL { "TOPIC0", "TOPIC1", "TOPIC2" };
mqtt::topic_collection TOPIC_COLL { "TOPIC0", "TOPIC1", "TOPIC2" };
mqtt::iasync_client::qos_collection GOOD_QOS_COLL { 0, 1, 2 };
mqtt::iasync_client::qos_collection BAD_QOS_COLL { BAD_QOS };
const std::string PAYLOAD { "PAYLOAD" };

View File

@ -93,7 +93,7 @@ class client_test : public CppUnit::TestFixture
const std::string TOPIC { "TOPIC" };
const int GOOD_QOS { 0 };
const int BAD_QOS { 3 };
mqtt::topic_filter_collection TOPIC_COLL { "TOPIC0", "TOPIC1", "TOPIC2" };
mqtt::topic_collection TOPIC_COLL { "TOPIC0", "TOPIC1", "TOPIC2" };
mqtt::client::qos_collection GOOD_QOS_COLL { 0, 1, 2 };
mqtt::client::qos_collection BAD_QOS_COLL { BAD_QOS };
const std::string PAYLOAD { "PAYLOAD" };

View File

@ -116,12 +116,12 @@ public:
void set_callback(mqtt::callback& cb) override {}
mqtt::token_ptr subscribe(const topic_filter_collection& topicFilters,
mqtt::token_ptr subscribe(const topic_collection& topicFilters,
const qos_collection& qos) override {
return mqtt::token_ptr{};
}
mqtt::token_ptr subscribe(const topic_filter_collection& topicFilters,
mqtt::token_ptr subscribe(const topic_collection& topicFilters,
const qos_collection& qos,
void* userContext, mqtt::iaction_listener& callback) override {
return mqtt::token_ptr{};
@ -140,11 +140,11 @@ public:
return mqtt::token_ptr{};
}
mqtt::token_ptr unsubscribe(const topic_filter_collection& topicFilters) override {
mqtt::token_ptr unsubscribe(const topic_collection& topicFilters) override {
return mqtt::token_ptr{};
}
mqtt::token_ptr unsubscribe(const topic_filter_collection& topicFilters,
mqtt::token_ptr unsubscribe(const topic_collection& topicFilters,
void* userContext, mqtt::iaction_listener& cb) override {
return mqtt::token_ptr{};
}