From 01d3381421d140083b7d6c8f1682c30e1bdcd52c Mon Sep 17 00:00:00 2001 From: fpagliughi Date: Fri, 2 Aug 2019 01:47:20 -0400 Subject: [PATCH] Added individual callback registration. Added chat sample. The 'topic' objects can subscribe. Fixed disconnect version for v5 --- README.md | 14 ++- src/async_client.cpp | 117 +++++++++++-------- src/disconnect_options.cpp | 29 ++--- src/mqtt/async_client.h | 66 ++++++++--- src/mqtt/connect_options.h | 14 +++ src/mqtt/disconnect_options.h | 15 +-- src/mqtt/topic.h | 6 + src/samples/CMakeLists.txt | 3 + src/samples/mqttpp_chat.cpp | 153 +++++++++++++++++++++++++ src/token.cpp | 2 +- src/topic.cpp | 5 + test/cppunit/disconnect_options_test.h | 7 +- 12 files changed, 339 insertions(+), 92 deletions(-) create mode 100644 src/samples/mqttpp_chat.cpp diff --git a/README.md b/README.md index 7aff6d7..26c507c 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ _Catch2_ can be found here: [Catch2](https://github.com/catchorg/Catch2) ### Unreleased Features in this Branch -- Started MQTT v5 support +- Started MQTT v5 support: - **Properties** - New `property` class acts something like a std::variant to hold a property of any supported type. - New `properties` class is a collection type to hold all the properties for a single transmitted packet. @@ -45,12 +45,14 @@ _Catch2_ can be found here: [Catch2](https://github.com/catchorg/Catch2) - Properties can also be obtained from server responses to requests such as from a _connect_ call. These are available in the `token` objects when they complete. - The client object tracks the desired MQTT version that the app requested and/or is currently connected at. Internally this is now required by the `response_options` the need to distinguish between pre-v5 and post-v5 callback functions. - MQTT v5 reason codes for requests are available via `token` objects when they complete. They are also available in `exception` objects that are thrown by tokens. - - More descriptive error messages (PR #154), integrated into the `mqtt::exception` class. MQTT v5 reason codes are also included in the exceptions when an error occurs. - - The`message` and various options classes were updated for MQTT v5 to include properties and reson codes (where appropriate). - - Applications can (finally) get server responses from the various ack packets. These are available through the tokens after they complete, as `connect_response`, `subscribe_response`, and `unsubscribe_response`. - Support for subscibe options, like no local subscriptions, etc. - - Sample applications were added showing how to do basic Remote Procedure Calls (RPC's) with MQTT v5 using the *RESPONSE_TOPIC* and *CORRELATION_DATA* properties. These are *rpc_math_cli* and *rpc_math_srvr* in the _src/samples_ directory. - + - Sample applications were added showing how to do basic Remote Procedure Calls (RPC's) with MQTT v5 using the *RESPONSE_TOPIC* and *CORRELATION_DATA* properties. These are *rpc_math_cli* and *rpc_math_srvr* in the _src/samples_ directory. + - A sample "chat" application was added, showing how to use subscribe options, such as "no local". +- More descriptive error messages (PR #154), integrated into the `mqtt::exception` class. MQTT v5 reason codes are also included in the exceptions when an error occurs. +- Applications can (finally) get server responses from the various ack packets. These are available through the tokens after they complete, as `connect_response`, `subscribe_response`, and `unsubscribe_response`. +- The `topic` objects can be used to subscribe. +- Applications can register individual callback functions instead of using a `callback` interface object. This allows easy use of lambda functions for callbacks. +- The connect options can take a LWT as a plain message, via `connect_options::set_will_message()` ## Contributing diff --git a/src/async_client.cpp b/src/async_client.cpp index 0afcae9..c536b77 100644 --- a/src/async_client.cpp +++ b/src/async_client.cpp @@ -155,9 +155,15 @@ void async_client::on_connected(void* context, char* cause) if (context) { async_client* cli = static_cast(context); callback* cb = cli->userCallback_; + auto& connHandler = cli->connHandler_; + + string cause_str = cause ? string(cause) : string(); if (cb) - cb->connected(cause ? string(cause) : string()); + cb->connected(cause_str); + + if (connHandler) + connHandler(cause_str); } } @@ -172,9 +178,15 @@ void async_client::on_connection_lost(void *context, char *cause) async_client* cli = static_cast(context); callback* cb = cli->userCallback_; consumer_queue_type& que = cli->que_; + auto& connLostHandler = cli->connLostHandler_; + + string cause_str = cause ? string(cause) : string(); if (cb) - cb->connection_lost(cause ? string(cause) : string()); + cb->connection_lost(cause_str); + + if (connLostHandler) + connLostHandler(cause_str); if (que) que->put(const_message_ptr{}); @@ -191,13 +203,17 @@ int async_client::on_message_arrived(void* context, char* topicName, int topicLe async_client* cli = static_cast(context); callback* cb = cli->userCallback_; consumer_queue_type& que = cli->que_; + message_handler& msgHandler = cli->msgHandler_; - if (cb || que) { + if (cb || que || msgHandler) { size_t len = (topicLen == 0) ? strlen(topicName) : size_t(topicLen); string topic(topicName, topicName+len); auto m = message::create(std::move(topic), *msg); + if (msgHandler) + msgHandler(m); + if (cb) cb->message_arrived(m); @@ -295,6 +311,32 @@ void async_client::remove_token(token* tok) } } + +// -------------------------------------------------------------------------- +// Callback management + +void async_client::set_callback(callback& cb) +{ + guard g(lock_); + userCallback_ = &cb; + + int rc = MQTTAsync_setConnected(cli_, this, &async_client::on_connected); + + if (rc == MQTTASYNC_SUCCESS) { + rc = MQTTAsync_setCallbacks(cli_, this, + &async_client::on_connection_lost, + &async_client::on_message_arrived, + nullptr /*&async_client::on_delivery_complete*/); + } + else + MQTTAsync_setConnected(cli_, nullptr, nullptr); + + if (rc != MQTTASYNC_SUCCESS) { + userCallback_ = nullptr; + throw exception(rc); + } +} + void async_client::disable_callbacks() { // TODO: It would be nice to disable callbacks at the C library level, @@ -304,11 +346,32 @@ void async_client::disable_callbacks() int rc = MQTTAsync_setCallbacks(cli_, this, nullptr, [](void*,char*,int,MQTTAsync_message*) -> int {return to_int(true);}, nullptr); - + if (rc != MQTTASYNC_SUCCESS) throw exception(rc); } +void async_client::set_connected_handler(connection_handler cb) +{ + connHandler_ = cb; + check_ret(::MQTTAsync_setConnected(cli_, this, + &async_client::on_connected)); +} + +void async_client::set_connection_lost_handler(connection_handler cb) +{ + connLostHandler_ = cb; + check_ret(::MQTTAsync_setConnectionLostCallback(cli_, this, + &async_client::on_connection_lost)); +} + +void async_client::set_message_callback(message_handler cb) +{ + msgHandler_ = cb; + check_ret(::MQTTAsync_setMessageArrivedCallback(cli_, this, + &async_client::on_message_arrived)); +} + // -------------------------------------------------------------------------- // Connect @@ -403,24 +466,7 @@ token_ptr async_client::disconnect(disconnect_options opts) auto tok = token::create(token::Type::DISCONNECT, *this); add_token(tok); - opts.set_token(tok); - - int rc = MQTTAsync_disconnect(cli_, &opts.opts_); - - if (rc != MQTTASYNC_SUCCESS) { - remove_token(tok); - throw exception(rc); - } - - return tok; -} - -token_ptr async_client::disconnect(int timeout) -{ - auto tok = token::create(token::Type::DISCONNECT, *this); - add_token(tok); - - disconnect_options opts(timeout, tok); + opts.set_token(tok, mqttVersion_); int rc = MQTTAsync_disconnect(cli_, &opts.opts_); @@ -437,7 +483,8 @@ token_ptr async_client::disconnect(int timeout, void* userContext, iaction_liste auto tok = token::create(token::Type::DISCONNECT, *this, userContext, cb); add_token(tok); - disconnect_options opts(timeout, tok); + disconnect_options opts(timeout); + opts.set_token(tok, mqttVersion_); int rc = MQTTAsync_disconnect(cli_, &opts.opts_); @@ -551,30 +598,6 @@ delivery_token_ptr async_client::publish(const_message_ptr msg, return tok; } -// -------------------------------------------------------------------------- - -void async_client::set_callback(callback& cb) -{ - guard g(lock_); - userCallback_ = &cb; - - int rc = MQTTAsync_setConnected(cli_, this, &async_client::on_connected); - - if (rc == MQTTASYNC_SUCCESS) { - rc = MQTTAsync_setCallbacks(cli_, this, - &async_client::on_connection_lost, - &async_client::on_message_arrived, - nullptr /*&async_client::on_delivery_complete*/); - } - else - MQTTAsync_setConnected(cli_, nullptr, nullptr); - - if (rc != MQTTASYNC_SUCCESS) { - userCallback_ = nullptr; - throw exception(rc); - } -} - // -------------------------------------------------------------------------- // Subscribe diff --git a/src/disconnect_options.cpp b/src/disconnect_options.cpp index abd31b7..8f52ee5 100644 --- a/src/disconnect_options.cpp +++ b/src/disconnect_options.cpp @@ -14,13 +14,6 @@ disconnect_options::disconnect_options() : opts_(DFLT_C_STRUCT) { } -disconnect_options::disconnect_options(int timeout, const token_ptr& tok) - : disconnect_options() -{ - set_timeout(timeout); - set_token(tok); -} - disconnect_options::disconnect_options(const disconnect_options& opt) : opts_(opt.opts_), tok_(opt.tok_) { @@ -45,18 +38,26 @@ disconnect_options& disconnect_options::operator=(disconnect_options&& opt) return *this; } -void disconnect_options::set_token(const token_ptr& tok) +void disconnect_options::set_token(const token_ptr& tok, int mqttVersion) { tok_ = tok; opts_.context = tok_.get(); + opts_.onSuccess = nullptr; + opts_.onFailure = nullptr; + + opts_.onSuccess5 = nullptr; + opts_.onFailure5 = nullptr; + if (tok) { - opts_.onSuccess = &token::on_success; - opts_.onFailure = &token::on_failure; - } - else { - opts_.onSuccess = nullptr; - opts_.onFailure = nullptr; + if (mqttVersion >= MQTTVERSION_5) { + opts_.onSuccess5 = &token::on_success5; + opts_.onFailure5 = &token::on_failure5; + } + else { + opts_.onSuccess = &token::on_success; + opts_.onFailure = &token::on_failure; + } } } diff --git a/src/mqtt/async_client.h b/src/mqtt/async_client.h index 350746c..4323071 100644 --- a/src/mqtt/async_client.h +++ b/src/mqtt/async_client.h @@ -41,6 +41,7 @@ #include #include #include +#include #include namespace mqtt { @@ -79,6 +80,11 @@ public: /** Type for a thread-safe queue to consume messages synchronously */ using consumer_queue_type = std::unique_ptr>; + /** Handler type for registering an individual message callback */ + using message_handler = std::function; + /** Handler type for when a connecion is made or lost */ + using connection_handler = std::function; + private: /** Lock guard type for this class */ using guard = std::unique_lock; @@ -99,6 +105,12 @@ private: std::unique_ptr persist_; /** Callback supplied by the user (if any) */ callback* userCallback_; + /** Connection handler */ + connection_handler connHandler_; + /** Connection lost handler */ + connection_handler connLostHandler_; + /** Message handler (if any) */ + message_handler msgHandler_; /** Copy of connect token (for re-connects) */ token_ptr connTok_; /** A list of tokens that are in play */ @@ -128,6 +140,12 @@ private: async_client(const async_client&) =delete; async_client& operator=(const async_client&) =delete; + /** Checks a function return code and throws on error. */ + static void check_ret(int rc) { + if (rc != MQTTASYNC_SUCCESS) + throw exception(rc); + } + public: /** * Create an async_client that can be used to communicate with an MQTT @@ -193,6 +211,37 @@ public: * Destructor */ ~async_client() override; + /** + * Sets a callback listener to use for events that happen + * asynchronously. + * @param cb callback receiver which will be invoked for certain + * asynchronous events + */ + void set_callback(callback& cb) override; + /** + * Stops callbacks. + * This is not normally called by the application. It should be used + * cautiously as it may cause the application to lose messages. + */ + void disable_callbacks() override; + /** + * Callback for when a connection is made. + * @param cb Callback functor for when the connection is made. + */ + void set_connected_handler(connection_handler cb) /*override*/; + /** + * Callback for when a connection is lost. + * @param cb Callback functor for when the connection is lost. + */ + void set_connection_lost_handler(connection_handler cb) /*override*/; + /** + * Sets the callback for when a message arrives from the broker. + * Note that the application can only have one message handler which can + * be installed individually using this method, or installled as a + * listener object. + * @param cb The callback functor to register with the library. + */ + void set_message_callback(message_handler cb) /*override*/; /** * Connects to an MQTT server using the default options. * @return token used to track and wait for the connect to complete. The @@ -271,7 +320,9 @@ public: * set. * @throw exception for problems encountered while disconnecting */ - token_ptr disconnect(int timeout) override; + token_ptr disconnect(int timeout) override { + return disconnect(disconnect_options(timeout)); + } /** * Disconnects from the server. * @param timeout the amount of time in milliseconds to allow for @@ -453,19 +504,6 @@ public: */ delivery_token_ptr publish(const_message_ptr msg, void* userContext, iaction_listener& cb) override; - /** - * Sets a callback listener to use for events that happen - * asynchronously. - * @param cb callback receiver which will be invoked for certain - * asynchronous events - */ - void set_callback(callback& cb) override; - /** - * Stops callbacks. - * This is not normally called by the application. It should be used - * cautiously as it may cause the application to lose messages. - */ - void disable_callbacks() override; /** * Subscribe to a topic, which may include wildcards. * @param topicFilter the topic to subscribe to, which can include diff --git a/src/mqtt/connect_options.h b/src/mqtt/connect_options.h index 54c8d73..891bd01 100644 --- a/src/mqtt/connect_options.h +++ b/src/mqtt/connect_options.h @@ -321,6 +321,20 @@ public: */ void set_will(const will_options& will); void set_will(will_options&& will); + /** + * Sets the "Last Will and Testament" (LWT) as a message + * @param msg The LWT message + */ + void set_will_message(const message& msg) { + set_will(will_options(msg)); + } + /** + * Sets the "Last Will and Testament" (LWT) as a message + * @param msg Pointer to a LWT message + */ + void set_will_message(const_message_ptr msg) { + if (msg) set_will(will_options(*msg)); + } /** * Sets the callback context to a delivery token. * @param tok The delivery token to be used as the callback context. diff --git a/src/mqtt/disconnect_options.h b/src/mqtt/disconnect_options.h index ebc9f2a..d2ebdbf 100644 --- a/src/mqtt/disconnect_options.h +++ b/src/mqtt/disconnect_options.h @@ -62,19 +62,18 @@ public: /** * Creates disconnect options tied to the specific token. * @param timeout The timeout (in milliseconds). - * @param tok A token to be used as the context. */ - disconnect_options(int timeout, const token_ptr& tok); + disconnect_options(int timeout) : disconnect_options() { + set_timeout(timeout); + } /** * Creates disconnect options tied to the specific token. * @param to The timeout. - * @param tok A token to be used as the context. */ template - disconnect_options(const std::chrono::duration& to, - const token_ptr& tok) : disconnect_options() { + disconnect_options(const std::chrono::duration& to) + : disconnect_options() { set_timeout(to); - set_token(tok); } /** * Copy constructor. @@ -123,8 +122,10 @@ public: /** * Sets the callback context to a delivery token. * @param tok The delivery token to be used as the callback context. + * @param mqttVersion The version of MQTT we're using for the + * connection. */ - void set_token(const token_ptr& tok); + void set_token(const token_ptr& tok, int mqttVersion); /** * Gets the callback context to a delivery token. * @return The delivery token to be used as the callback context. diff --git a/src/mqtt/topic.h b/src/mqtt/topic.h index f68005a..618eb85 100644 --- a/src/mqtt/topic.h +++ b/src/mqtt/topic.h @@ -26,6 +26,7 @@ #include "MQTTAsync.h" #include "mqtt/delivery_token.h" +#include "mqtt/subscribe_options.h" #include "mqtt/message.h" #include "mqtt/types.h" #include @@ -153,6 +154,11 @@ public: * complete. */ delivery_token_ptr publish(binary_ref payload, int qos, bool retained); + /** + * Subscribe to the topic. + * @return A token used to track the progress of the operation. + */ + token_ptr subscribe(const subscribe_options& opts=subscribe_options()); /** * Returns a string representation of this topic. * @return The name of the topic diff --git a/src/samples/CMakeLists.txt b/src/samples/CMakeLists.txt index 489f74f..acd74eb 100644 --- a/src/samples/CMakeLists.txt +++ b/src/samples/CMakeLists.txt @@ -40,6 +40,7 @@ add_executable(data_publish data_publish.cpp) add_executable(rpc_math_cli rpc_math_cli.cpp) add_executable(rpc_math_srvr rpc_math_srvr.cpp) add_executable(pub_speed_test pub_speed_test.cpp) +add_executable(mqttpp_chat mqttpp_chat.cpp) ## link binaries target_link_libraries(async_publish ${PAHO_CPP_LIB}) @@ -50,6 +51,7 @@ target_link_libraries(sync_consume ${PAHO_CPP_LIB}) target_link_libraries(data_publish ${PAHO_CPP_LIB}) target_link_libraries(rpc_math_cli ${PAHO_CPP_LIB}) target_link_libraries(rpc_math_srvr ${PAHO_CPP_LIB}) +target_link_libraries(mqttpp_chat ${PAHO_CPP_LIB}) target_link_libraries(pub_speed_test ${PAHO_CPP_LIB} Threads::Threads) @@ -63,6 +65,7 @@ set(INSTALL_TARGETS data_publish rpc_math_cli rpc_math_srvr + mqttpp_chat pub_speed_test ) diff --git a/src/samples/mqttpp_chat.cpp b/src/samples/mqttpp_chat.cpp new file mode 100644 index 0000000..f94c91b --- /dev/null +++ b/src/samples/mqttpp_chat.cpp @@ -0,0 +1,153 @@ +// mqttpp_chat.cpp +// +// This is a Paho MQTT C++ client, sample application. +// +// The "chat" application is practically the "Hello World" application for +// messaging systems. This allows a user to type in message to send to a +// "group" while seeing all the messages that the other members of the group +// send. +// +// This application is an MQTT publisher/subscriber using the C++ +// asynchronous client interface, employing callbacks to receive messages +// and status updates. +// +// The sample demonstrates: +// - Connecting to an MQTT server/broker. +// - Publishing messages. +// - Subscribing to a topic +// - Receiving messages (callbacks) through a lambda function +// +// USAGE: +// mqttpp_chat + +/******************************************************************************* + * Copyright (c) 2019 Frank Pagliughi + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Frank Pagliughi - initial implementation and documentation + *******************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include "mqtt/async_client.h" +#include "mqtt/topic.h" + +const std::string SERVER_ADDRESS("tcp://localhost:1883"); + +const int QOS = 1; + +///////////////////////////////////////////////////////////////////////////// + +int main(int argc, char* argv[]) +{ + if (argc != 3) { + std::cout << "USAGE: mqttpp_chat " << std::endl; + return 1; + } + + std::string chatUser { argv[1] }, + chatGroup { argv[2] }, + chatTopic { "chat/"+chatGroup }; + + // LWT message is broadcast to other users if out connection is lost + + auto lwt = mqtt::make_message(chatTopic, "<<<"+chatUser+" was disconnected>>>", QOS, false); + + // Set up the connect options + + mqtt::connect_options connOpts; + connOpts.set_keep_alive_interval(20); + connOpts.set_mqtt_version(MQTTVERSION_5); + connOpts.set_clean_start(true); + connOpts.set_will_message(lwt); + + mqtt::async_client cli(SERVER_ADDRESS, ""); + + // Set a callback for connection lost. + // This just exits the app. + + cli.set_connection_lost_handler([](const std::string&) { + std::cout << "*** Connection Lost ***" << std::endl; + exit(2); + }); + + // Set the callback for incoming messages + + cli.set_message_callback([](mqtt::const_message_ptr msg) { + std::cout << msg->get_payload_str() << std::endl; + }); + + // We publish and subscribe to one topic, + // so a 'topic' object is helpful. + + mqtt::topic topic { cli, "chat/"+chatGroup, QOS }; + + // Start the connection. + + try { + std::cout << "Connecting to the chat server at '" << SERVER_ADDRESS + << "'..." << std::flush; + auto tok = cli.connect(connOpts); + tok->wait(); + + // Subscribe to the topic using "no local" so that + // we don't get own messages sent back to us + + std::cout << "Ok\nJoining the group..." << std::flush; + auto subOpts = mqtt::subscribe_options(mqtt::subscribe_options::SUBSCRIBE_NO_LOCAL); + topic.subscribe(subOpts)->wait(); + std::cout << "Ok" << std::endl; + } + catch (const mqtt::exception& exc) { + std::cerr << "\nERROR: Unable to connect. " + << exc.what() << std::endl; + return 1; + } + + // Let eveyone know that a new user joined the conversation. + + topic.publish("<<" + chatUser + " joined the group>>"); + + // Read messages from the console and publish them. + // Quit when the use enters an empty line. + + std::string usrMsg; + + while (std::getline(std::cin, usrMsg) && !usrMsg.empty()) { + usrMsg = chatUser + ": " + usrMsg; + topic.publish(usrMsg); + } + + // Let eveyone know that the user left the conversation. + + topic.publish("<<" + chatUser + " left the group>>")->wait(); + + // Disconnect + + try { + std::cout << "Disconnecting from the chat server..." << std::flush; + cli.disconnect()->wait(); + std::cout << "OK" << std::endl; + } + catch (const mqtt::exception& exc) { + std::cerr << exc.what() << std::endl; + return 1; + } + + return 0; +} + diff --git a/src/token.cpp b/src/token.cpp index f9abbe8..ffbf816 100644 --- a/src/token.cpp +++ b/src/token.cpp @@ -230,7 +230,7 @@ void token::on_failure(MQTTAsync_failureData* rsp) // void token::on_failure5(MQTTAsync_failureData5* rsp) { - ::Log(TRACE_MIN, -1, "[cpp] on_failure"); + ::Log(TRACE_MIN, -1, "[cpp] on_failure5"); unique_lock g(lock_); iaction_listener* listener = listener_; diff --git a/src/topic.cpp b/src/topic.cpp index 3bf5f1b..8e486a9 100644 --- a/src/topic.cpp +++ b/src/topic.cpp @@ -44,6 +44,11 @@ delivery_token_ptr topic::publish(binary_ref payload, int qos, bool retained) return cli_.publish(name_, std::move(payload), qos, retained); } +token_ptr topic::subscribe(const subscribe_options& opts) +{ + return cli_.subscribe(name_, qos_, opts); +} + ///////////////////////////////////////////////////////////////////////////// // end namespace mqtt } diff --git a/test/cppunit/disconnect_options_test.h b/test/cppunit/disconnect_options_test.h index 0386c91..4984e80 100644 --- a/test/cppunit/disconnect_options_test.h +++ b/test/cppunit/disconnect_options_test.h @@ -79,7 +79,8 @@ public: const int TIMEOUT = 10; auto tok = token::create(TOKEN_TYPE, cli); - mqtt::disconnect_options opts { TIMEOUT, tok }; + mqtt::disconnect_options opts { TIMEOUT }; + opts.set_token(tok, MQTTVERSION_DEFAULT); const auto& c_struct = opts.opts_; @@ -124,11 +125,11 @@ public: CPPUNIT_ASSERT(nullptr == c_struct.onSuccess); CPPUNIT_ASSERT(nullptr == c_struct.onFailure); - opts.set_token(mqtt::token_ptr()); + opts.set_token(mqtt::token_ptr(), MQTTVERSION_DEFAULT); CPPUNIT_ASSERT(nullptr == c_struct.onSuccess); CPPUNIT_ASSERT(nullptr == c_struct.onFailure); - opts.set_token(tok); + opts.set_token(tok, MQTTVERSION_DEFAULT); CPPUNIT_ASSERT(nullptr != c_struct.onSuccess); CPPUNIT_ASSERT(nullptr != c_struct.onFailure);