Added individual callback registration.

Added chat sample.

The 'topic' objects can subscribe.

Fixed disconnect version for v5
This commit is contained in:
fpagliughi 2019-08-02 01:47:20 -04:00
parent ba3aceeb1a
commit 01d3381421
12 changed files with 339 additions and 92 deletions

View File

@ -37,7 +37,7 @@ _Catch2_ can be found here: [Catch2](https://github.com/catchorg/Catch2)
### Unreleased Features in this Branch
- Started MQTT v5 support
- Started MQTT v5 support:
- **Properties**
- New `property` class acts something like a std::variant to hold a property of any supported type.
- New `properties` class is a collection type to hold all the properties for a single transmitted packet.
@ -45,12 +45,14 @@ _Catch2_ can be found here: [Catch2](https://github.com/catchorg/Catch2)
- Properties can also be obtained from server responses to requests such as from a _connect_ call. These are available in the `token` objects when they complete.
- The client object tracks the desired MQTT version that the app requested and/or is currently connected at. Internally this is now required by the `response_options` the need to distinguish between pre-v5 and post-v5 callback functions.
- MQTT v5 reason codes for requests are available via `token` objects when they complete. They are also available in `exception` objects that are thrown by tokens.
- More descriptive error messages (PR #154), integrated into the `mqtt::exception` class. MQTT v5 reason codes are also included in the exceptions when an error occurs.
- The`message` and various options classes were updated for MQTT v5 to include properties and reson codes (where appropriate).
- Applications can (finally) get server responses from the various ack packets. These are available through the tokens after they complete, as `connect_response`, `subscribe_response`, and `unsubscribe_response`.
- Support for subscibe options, like no local subscriptions, etc.
- Sample applications were added showing how to do basic Remote Procedure Calls (RPC's) with MQTT v5 using the *RESPONSE_TOPIC* and *CORRELATION_DATA* properties. These are *rpc_math_cli* and *rpc_math_srvr* in the _src/samples_ directory.
- Sample applications were added showing how to do basic Remote Procedure Calls (RPC's) with MQTT v5 using the *RESPONSE_TOPIC* and *CORRELATION_DATA* properties. These are *rpc_math_cli* and *rpc_math_srvr* in the _src/samples_ directory.
- A sample "chat" application was added, showing how to use subscribe options, such as "no local".
- More descriptive error messages (PR #154), integrated into the `mqtt::exception` class. MQTT v5 reason codes are also included in the exceptions when an error occurs.
- Applications can (finally) get server responses from the various ack packets. These are available through the tokens after they complete, as `connect_response`, `subscribe_response`, and `unsubscribe_response`.
- The `topic` objects can be used to subscribe.
- Applications can register individual callback functions instead of using a `callback` interface object. This allows easy use of lambda functions for callbacks.
- The connect options can take a LWT as a plain message, via `connect_options::set_will_message()`
## Contributing

View File

@ -155,9 +155,15 @@ void async_client::on_connected(void* context, char* cause)
if (context) {
async_client* cli = static_cast<async_client*>(context);
callback* cb = cli->userCallback_;
auto& connHandler = cli->connHandler_;
string cause_str = cause ? string(cause) : string();
if (cb)
cb->connected(cause ? string(cause) : string());
cb->connected(cause_str);
if (connHandler)
connHandler(cause_str);
}
}
@ -172,9 +178,15 @@ void async_client::on_connection_lost(void *context, char *cause)
async_client* cli = static_cast<async_client*>(context);
callback* cb = cli->userCallback_;
consumer_queue_type& que = cli->que_;
auto& connLostHandler = cli->connLostHandler_;
string cause_str = cause ? string(cause) : string();
if (cb)
cb->connection_lost(cause ? string(cause) : string());
cb->connection_lost(cause_str);
if (connLostHandler)
connLostHandler(cause_str);
if (que)
que->put(const_message_ptr{});
@ -191,13 +203,17 @@ int async_client::on_message_arrived(void* context, char* topicName, int topicLe
async_client* cli = static_cast<async_client*>(context);
callback* cb = cli->userCallback_;
consumer_queue_type& que = cli->que_;
message_handler& msgHandler = cli->msgHandler_;
if (cb || que) {
if (cb || que || msgHandler) {
size_t len = (topicLen == 0) ? strlen(topicName) : size_t(topicLen);
string topic(topicName, topicName+len);
auto m = message::create(std::move(topic), *msg);
if (msgHandler)
msgHandler(m);
if (cb)
cb->message_arrived(m);
@ -295,6 +311,32 @@ void async_client::remove_token(token* tok)
}
}
// --------------------------------------------------------------------------
// Callback management
void async_client::set_callback(callback& cb)
{
guard g(lock_);
userCallback_ = &cb;
int rc = MQTTAsync_setConnected(cli_, this, &async_client::on_connected);
if (rc == MQTTASYNC_SUCCESS) {
rc = MQTTAsync_setCallbacks(cli_, this,
&async_client::on_connection_lost,
&async_client::on_message_arrived,
nullptr /*&async_client::on_delivery_complete*/);
}
else
MQTTAsync_setConnected(cli_, nullptr, nullptr);
if (rc != MQTTASYNC_SUCCESS) {
userCallback_ = nullptr;
throw exception(rc);
}
}
void async_client::disable_callbacks()
{
// TODO: It would be nice to disable callbacks at the C library level,
@ -304,11 +346,32 @@ void async_client::disable_callbacks()
int rc = MQTTAsync_setCallbacks(cli_, this, nullptr,
[](void*,char*,int,MQTTAsync_message*) -> int {return to_int(true);},
nullptr);
if (rc != MQTTASYNC_SUCCESS)
throw exception(rc);
}
void async_client::set_connected_handler(connection_handler cb)
{
connHandler_ = cb;
check_ret(::MQTTAsync_setConnected(cli_, this,
&async_client::on_connected));
}
void async_client::set_connection_lost_handler(connection_handler cb)
{
connLostHandler_ = cb;
check_ret(::MQTTAsync_setConnectionLostCallback(cli_, this,
&async_client::on_connection_lost));
}
void async_client::set_message_callback(message_handler cb)
{
msgHandler_ = cb;
check_ret(::MQTTAsync_setMessageArrivedCallback(cli_, this,
&async_client::on_message_arrived));
}
// --------------------------------------------------------------------------
// Connect
@ -403,24 +466,7 @@ token_ptr async_client::disconnect(disconnect_options opts)
auto tok = token::create(token::Type::DISCONNECT, *this);
add_token(tok);
opts.set_token(tok);
int rc = MQTTAsync_disconnect(cli_, &opts.opts_);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);
}
return tok;
}
token_ptr async_client::disconnect(int timeout)
{
auto tok = token::create(token::Type::DISCONNECT, *this);
add_token(tok);
disconnect_options opts(timeout, tok);
opts.set_token(tok, mqttVersion_);
int rc = MQTTAsync_disconnect(cli_, &opts.opts_);
@ -437,7 +483,8 @@ token_ptr async_client::disconnect(int timeout, void* userContext, iaction_liste
auto tok = token::create(token::Type::DISCONNECT, *this, userContext, cb);
add_token(tok);
disconnect_options opts(timeout, tok);
disconnect_options opts(timeout);
opts.set_token(tok, mqttVersion_);
int rc = MQTTAsync_disconnect(cli_, &opts.opts_);
@ -551,30 +598,6 @@ delivery_token_ptr async_client::publish(const_message_ptr msg,
return tok;
}
// --------------------------------------------------------------------------
void async_client::set_callback(callback& cb)
{
guard g(lock_);
userCallback_ = &cb;
int rc = MQTTAsync_setConnected(cli_, this, &async_client::on_connected);
if (rc == MQTTASYNC_SUCCESS) {
rc = MQTTAsync_setCallbacks(cli_, this,
&async_client::on_connection_lost,
&async_client::on_message_arrived,
nullptr /*&async_client::on_delivery_complete*/);
}
else
MQTTAsync_setConnected(cli_, nullptr, nullptr);
if (rc != MQTTASYNC_SUCCESS) {
userCallback_ = nullptr;
throw exception(rc);
}
}
// --------------------------------------------------------------------------
// Subscribe

View File

@ -14,13 +14,6 @@ disconnect_options::disconnect_options() : opts_(DFLT_C_STRUCT)
{
}
disconnect_options::disconnect_options(int timeout, const token_ptr& tok)
: disconnect_options()
{
set_timeout(timeout);
set_token(tok);
}
disconnect_options::disconnect_options(const disconnect_options& opt)
: opts_(opt.opts_), tok_(opt.tok_)
{
@ -45,18 +38,26 @@ disconnect_options& disconnect_options::operator=(disconnect_options&& opt)
return *this;
}
void disconnect_options::set_token(const token_ptr& tok)
void disconnect_options::set_token(const token_ptr& tok, int mqttVersion)
{
tok_ = tok;
opts_.context = tok_.get();
opts_.onSuccess = nullptr;
opts_.onFailure = nullptr;
opts_.onSuccess5 = nullptr;
opts_.onFailure5 = nullptr;
if (tok) {
opts_.onSuccess = &token::on_success;
opts_.onFailure = &token::on_failure;
}
else {
opts_.onSuccess = nullptr;
opts_.onFailure = nullptr;
if (mqttVersion >= MQTTVERSION_5) {
opts_.onSuccess5 = &token::on_success5;
opts_.onFailure5 = &token::on_failure5;
}
else {
opts_.onSuccess = &token::on_success;
opts_.onFailure = &token::on_failure;
}
}
}

View File

@ -41,6 +41,7 @@
#include <list>
#include <memory>
#include <tuple>
#include <functional>
#include <stdexcept>
namespace mqtt {
@ -79,6 +80,11 @@ public:
/** Type for a thread-safe queue to consume messages synchronously */
using consumer_queue_type = std::unique_ptr<thread_queue<const_message_ptr>>;
/** Handler type for registering an individual message callback */
using message_handler = std::function<void(const_message_ptr)>;
/** Handler type for when a connecion is made or lost */
using connection_handler = std::function<void(const string& cause)>;
private:
/** Lock guard type for this class */
using guard = std::unique_lock<std::mutex>;
@ -99,6 +105,12 @@ private:
std::unique_ptr<MQTTClient_persistence> persist_;
/** Callback supplied by the user (if any) */
callback* userCallback_;
/** Connection handler */
connection_handler connHandler_;
/** Connection lost handler */
connection_handler connLostHandler_;
/** Message handler (if any) */
message_handler msgHandler_;
/** Copy of connect token (for re-connects) */
token_ptr connTok_;
/** A list of tokens that are in play */
@ -128,6 +140,12 @@ private:
async_client(const async_client&) =delete;
async_client& operator=(const async_client&) =delete;
/** Checks a function return code and throws on error. */
static void check_ret(int rc) {
if (rc != MQTTASYNC_SUCCESS)
throw exception(rc);
}
public:
/**
* Create an async_client that can be used to communicate with an MQTT
@ -193,6 +211,37 @@ public:
* Destructor
*/
~async_client() override;
/**
* Sets a callback listener to use for events that happen
* asynchronously.
* @param cb callback receiver which will be invoked for certain
* asynchronous events
*/
void set_callback(callback& cb) override;
/**
* Stops callbacks.
* This is not normally called by the application. It should be used
* cautiously as it may cause the application to lose messages.
*/
void disable_callbacks() override;
/**
* Callback for when a connection is made.
* @param cb Callback functor for when the connection is made.
*/
void set_connected_handler(connection_handler cb) /*override*/;
/**
* Callback for when a connection is lost.
* @param cb Callback functor for when the connection is lost.
*/
void set_connection_lost_handler(connection_handler cb) /*override*/;
/**
* Sets the callback for when a message arrives from the broker.
* Note that the application can only have one message handler which can
* be installed individually using this method, or installled as a
* listener object.
* @param cb The callback functor to register with the library.
*/
void set_message_callback(message_handler cb) /*override*/;
/**
* Connects to an MQTT server using the default options.
* @return token used to track and wait for the connect to complete. The
@ -271,7 +320,9 @@ public:
* set.
* @throw exception for problems encountered while disconnecting
*/
token_ptr disconnect(int timeout) override;
token_ptr disconnect(int timeout) override {
return disconnect(disconnect_options(timeout));
}
/**
* Disconnects from the server.
* @param timeout the amount of time in milliseconds to allow for
@ -453,19 +504,6 @@ public:
*/
delivery_token_ptr publish(const_message_ptr msg,
void* userContext, iaction_listener& cb) override;
/**
* Sets a callback listener to use for events that happen
* asynchronously.
* @param cb callback receiver which will be invoked for certain
* asynchronous events
*/
void set_callback(callback& cb) override;
/**
* Stops callbacks.
* This is not normally called by the application. It should be used
* cautiously as it may cause the application to lose messages.
*/
void disable_callbacks() override;
/**
* Subscribe to a topic, which may include wildcards.
* @param topicFilter the topic to subscribe to, which can include

View File

@ -321,6 +321,20 @@ public:
*/
void set_will(const will_options& will);
void set_will(will_options&& will);
/**
* Sets the "Last Will and Testament" (LWT) as a message
* @param msg The LWT message
*/
void set_will_message(const message& msg) {
set_will(will_options(msg));
}
/**
* Sets the "Last Will and Testament" (LWT) as a message
* @param msg Pointer to a LWT message
*/
void set_will_message(const_message_ptr msg) {
if (msg) set_will(will_options(*msg));
}
/**
* Sets the callback context to a delivery token.
* @param tok The delivery token to be used as the callback context.

View File

@ -62,19 +62,18 @@ public:
/**
* Creates disconnect options tied to the specific token.
* @param timeout The timeout (in milliseconds).
* @param tok A token to be used as the context.
*/
disconnect_options(int timeout, const token_ptr& tok);
disconnect_options(int timeout) : disconnect_options() {
set_timeout(timeout);
}
/**
* Creates disconnect options tied to the specific token.
* @param to The timeout.
* @param tok A token to be used as the context.
*/
template <class Rep, class Period>
disconnect_options(const std::chrono::duration<Rep, Period>& to,
const token_ptr& tok) : disconnect_options() {
disconnect_options(const std::chrono::duration<Rep, Period>& to)
: disconnect_options() {
set_timeout(to);
set_token(tok);
}
/**
* Copy constructor.
@ -123,8 +122,10 @@ public:
/**
* Sets the callback context to a delivery token.
* @param tok The delivery token to be used as the callback context.
* @param mqttVersion The version of MQTT we're using for the
* connection.
*/
void set_token(const token_ptr& tok);
void set_token(const token_ptr& tok, int mqttVersion);
/**
* Gets the callback context to a delivery token.
* @return The delivery token to be used as the callback context.

View File

@ -26,6 +26,7 @@
#include "MQTTAsync.h"
#include "mqtt/delivery_token.h"
#include "mqtt/subscribe_options.h"
#include "mqtt/message.h"
#include "mqtt/types.h"
#include <vector>
@ -153,6 +154,11 @@ public:
* complete.
*/
delivery_token_ptr publish(binary_ref payload, int qos, bool retained);
/**
* Subscribe to the topic.
* @return A token used to track the progress of the operation.
*/
token_ptr subscribe(const subscribe_options& opts=subscribe_options());
/**
* Returns a string representation of this topic.
* @return The name of the topic

View File

@ -40,6 +40,7 @@ add_executable(data_publish data_publish.cpp)
add_executable(rpc_math_cli rpc_math_cli.cpp)
add_executable(rpc_math_srvr rpc_math_srvr.cpp)
add_executable(pub_speed_test pub_speed_test.cpp)
add_executable(mqttpp_chat mqttpp_chat.cpp)
## link binaries
target_link_libraries(async_publish ${PAHO_CPP_LIB})
@ -50,6 +51,7 @@ target_link_libraries(sync_consume ${PAHO_CPP_LIB})
target_link_libraries(data_publish ${PAHO_CPP_LIB})
target_link_libraries(rpc_math_cli ${PAHO_CPP_LIB})
target_link_libraries(rpc_math_srvr ${PAHO_CPP_LIB})
target_link_libraries(mqttpp_chat ${PAHO_CPP_LIB})
target_link_libraries(pub_speed_test ${PAHO_CPP_LIB}
Threads::Threads)
@ -63,6 +65,7 @@ set(INSTALL_TARGETS
data_publish
rpc_math_cli
rpc_math_srvr
mqttpp_chat
pub_speed_test
)

153
src/samples/mqttpp_chat.cpp Normal file
View File

@ -0,0 +1,153 @@
// mqttpp_chat.cpp
//
// This is a Paho MQTT C++ client, sample application.
//
// The "chat" application is practically the "Hello World" application for
// messaging systems. This allows a user to type in message to send to a
// "group" while seeing all the messages that the other members of the group
// send.
//
// This application is an MQTT publisher/subscriber using the C++
// asynchronous client interface, employing callbacks to receive messages
// and status updates.
//
// The sample demonstrates:
// - Connecting to an MQTT server/broker.
// - Publishing messages.
// - Subscribing to a topic
// - Receiving messages (callbacks) through a lambda function
//
// USAGE:
// mqttpp_chat <user> <group>
/*******************************************************************************
* Copyright (c) 2019 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#include <iostream>
#include <cstdlib>
#include <string>
#include <cstring>
#include <cctype>
#include <thread>
#include <chrono>
#include "mqtt/async_client.h"
#include "mqtt/topic.h"
const std::string SERVER_ADDRESS("tcp://localhost:1883");
const int QOS = 1;
/////////////////////////////////////////////////////////////////////////////
int main(int argc, char* argv[])
{
if (argc != 3) {
std::cout << "USAGE: mqttpp_chat <user> <group>" << std::endl;
return 1;
}
std::string chatUser { argv[1] },
chatGroup { argv[2] },
chatTopic { "chat/"+chatGroup };
// LWT message is broadcast to other users if out connection is lost
auto lwt = mqtt::make_message(chatTopic, "<<<"+chatUser+" was disconnected>>>", QOS, false);
// Set up the connect options
mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(20);
connOpts.set_mqtt_version(MQTTVERSION_5);
connOpts.set_clean_start(true);
connOpts.set_will_message(lwt);
mqtt::async_client cli(SERVER_ADDRESS, "");
// Set a callback for connection lost.
// This just exits the app.
cli.set_connection_lost_handler([](const std::string&) {
std::cout << "*** Connection Lost ***" << std::endl;
exit(2);
});
// Set the callback for incoming messages
cli.set_message_callback([](mqtt::const_message_ptr msg) {
std::cout << msg->get_payload_str() << std::endl;
});
// We publish and subscribe to one topic,
// so a 'topic' object is helpful.
mqtt::topic topic { cli, "chat/"+chatGroup, QOS };
// Start the connection.
try {
std::cout << "Connecting to the chat server at '" << SERVER_ADDRESS
<< "'..." << std::flush;
auto tok = cli.connect(connOpts);
tok->wait();
// Subscribe to the topic using "no local" so that
// we don't get own messages sent back to us
std::cout << "Ok\nJoining the group..." << std::flush;
auto subOpts = mqtt::subscribe_options(mqtt::subscribe_options::SUBSCRIBE_NO_LOCAL);
topic.subscribe(subOpts)->wait();
std::cout << "Ok" << std::endl;
}
catch (const mqtt::exception& exc) {
std::cerr << "\nERROR: Unable to connect. "
<< exc.what() << std::endl;
return 1;
}
// Let eveyone know that a new user joined the conversation.
topic.publish("<<" + chatUser + " joined the group>>");
// Read messages from the console and publish them.
// Quit when the use enters an empty line.
std::string usrMsg;
while (std::getline(std::cin, usrMsg) && !usrMsg.empty()) {
usrMsg = chatUser + ": " + usrMsg;
topic.publish(usrMsg);
}
// Let eveyone know that the user left the conversation.
topic.publish("<<" + chatUser + " left the group>>")->wait();
// Disconnect
try {
std::cout << "Disconnecting from the chat server..." << std::flush;
cli.disconnect()->wait();
std::cout << "OK" << std::endl;
}
catch (const mqtt::exception& exc) {
std::cerr << exc.what() << std::endl;
return 1;
}
return 0;
}

View File

@ -230,7 +230,7 @@ void token::on_failure(MQTTAsync_failureData* rsp)
//
void token::on_failure5(MQTTAsync_failureData5* rsp)
{
::Log(TRACE_MIN, -1, "[cpp] on_failure");
::Log(TRACE_MIN, -1, "[cpp] on_failure5");
unique_lock g(lock_);
iaction_listener* listener = listener_;

View File

@ -44,6 +44,11 @@ delivery_token_ptr topic::publish(binary_ref payload, int qos, bool retained)
return cli_.publish(name_, std::move(payload), qos, retained);
}
token_ptr topic::subscribe(const subscribe_options& opts)
{
return cli_.subscribe(name_, qos_, opts);
}
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}

View File

@ -79,7 +79,8 @@ public:
const int TIMEOUT = 10;
auto tok = token::create(TOKEN_TYPE, cli);
mqtt::disconnect_options opts { TIMEOUT, tok };
mqtt::disconnect_options opts { TIMEOUT };
opts.set_token(tok, MQTTVERSION_DEFAULT);
const auto& c_struct = opts.opts_;
@ -124,11 +125,11 @@ public:
CPPUNIT_ASSERT(nullptr == c_struct.onSuccess);
CPPUNIT_ASSERT(nullptr == c_struct.onFailure);
opts.set_token(mqtt::token_ptr());
opts.set_token(mqtt::token_ptr(), MQTTVERSION_DEFAULT);
CPPUNIT_ASSERT(nullptr == c_struct.onSuccess);
CPPUNIT_ASSERT(nullptr == c_struct.onFailure);
opts.set_token(tok);
opts.set_token(tok, MQTTVERSION_DEFAULT);
CPPUNIT_ASSERT(nullptr != c_struct.onSuccess);
CPPUNIT_ASSERT(nullptr != c_struct.onFailure);