diff --git a/contrib/epee/include/net/abstract_tcp_server2.h b/contrib/epee/include/net/abstract_tcp_server2.h index bc0da66e2..28a202c15 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.h +++ b/contrib/epee/include/net/abstract_tcp_server2.h @@ -379,7 +379,7 @@ namespace net_utils try_connect_result_t try_connect(connection_ptr new_connection_l, const std::string& adr, const std::string& port, boost::asio::ip::tcp::socket &sock_, const boost::asio::ip::tcp::endpoint &remote_endpoint, const std::string &bind_ip, uint32_t conn_timeout, epee::net_utils::ssl_support_t ssl_support); bool connect(const std::string& adr, const std::string& port, uint32_t conn_timeot, t_connection_context& cn, const std::string& bind_ip = "0.0.0.0", epee::net_utils::ssl_support_t ssl_support = epee::net_utils::ssl_support_t::e_ssl_support_autodetect); template - bool connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeot, const t_callback &cb, const std::string& bind_ip = "0.0.0.0", epee::net_utils::ssl_support_t ssl_support = epee::net_utils::ssl_support_t::e_ssl_support_autodetect); + bool connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeot, t_callback &&cb, const std::string& bind_ip = "0.0.0.0", epee::net_utils::ssl_support_t ssl_support = epee::net_utils::ssl_support_t::e_ssl_support_autodetect); boost::asio::ssl::context& get_ssl_context() noexcept { diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl index 81aa725d1..bfd734c4e 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.inl +++ b/contrib/epee/include/net/abstract_tcp_server2.inl @@ -1786,7 +1786,7 @@ namespace net_utils } //--------------------------------------------------------------------------------- template template - bool boosted_tcp_server::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, const t_callback &cb, const std::string& bind_ip, epee::net_utils::ssl_support_t ssl_support) + bool boosted_tcp_server::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, t_callback &&cb, const std::string& bind_ip, epee::net_utils::ssl_support_t ssl_support) { TRY_ENTRY(); connection_ptr new_connection_l(new connection(io_service_, m_state, m_connection_type, ssl_support) ); @@ -1879,7 +1879,7 @@ namespace net_utils } }); //start async connect - sock_.async_connect(remote_endpoint, [=](const boost::system::error_code& ec_) + sock_.async_connect(remote_endpoint, [=, cb = std::forward(cb)](const boost::system::error_code& ec_) mutable { t_connection_context conn_context = AUTO_VAL_INIT(conn_context); boost::system::error_code ignored_ec; diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h index bd6ffe930..ec1aff739 100644 --- a/contrib/epee/include/net/levin_protocol_handler_async.h +++ b/contrib/epee/include/net/levin_protocol_handler_async.h @@ -104,7 +104,7 @@ public: int invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id); template - int invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED); + int invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, callback_t &&cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED); int send(epee::byte_slice message, const boost::uuids::uuid& connection_id); bool close(boost::uuids::uuid connection_id); @@ -190,105 +190,90 @@ public: struct invoke_response_handler_base { - virtual bool handle(int res, const epee::span buff, connection_context& context)=0; - virtual bool is_timer_started() const=0; - virtual void cancel()=0; - virtual bool cancel_timer()=0; - virtual void reset_timer()=0; - }; - template - struct anvoke_handler: invoke_response_handler_base - { - anvoke_handler(const callback_t& cb, uint64_t timeout, async_protocol_handler& con, int command) - :m_cb(cb), m_timeout(timeout), m_con(con), m_timer(con.m_pservice_endpoint->get_io_service()), m_timer_started(false), - m_cancel_timer_called(false), m_timer_cancelled(false), m_command(command) - { - if(m_con.start_outer_call()) - { - MDEBUG(con.get_context_ref() << "anvoke_handler, timeout: " << timeout); - m_timer.expires_from_now(boost::posix_time::milliseconds(timeout)); - m_timer.async_wait([&con, command, cb, timeout](const boost::system::error_code& ec) - { - if(ec == boost::asio::error::operation_aborted) - return; - MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout); - epee::span fake; - cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref()); - con.close(); - con.finish_outer_call(); - }); - m_timer_started = true; - } - } - virtual ~anvoke_handler() - {} - callback_t m_cb; - async_protocol_handler& m_con; + protected: boost::asio::deadline_timer m_timer; - bool m_timer_started; - bool m_cancel_timer_called; - bool m_timer_cancelled; - uint64_t m_timeout; - int m_command; - virtual bool handle(int res, const epee::span buff, typename async_protocol_handler::connection_context& context) + async_protocol_handler& m_con; + const uint64_t m_timeout; + const int m_command; + + virtual void do_handle(int res, const epee::span buff)=0; + + public: + invoke_response_handler_base(uint64_t timeout, async_protocol_handler& con, int command) + : m_timer(con.m_pservice_endpoint->get_io_service()), + m_con(con), + m_timeout(timeout), + m_command(command) + {} + + virtual ~invoke_response_handler_base() {} + + //! Run user handler. Only call if `cancel_timer()` returns true. + void handle(int res, const epee::span buff) { - if(!cancel_timer()) - return false; - m_cb(res, buff, context); + do_handle(res, buff); m_con.finish_outer_call(); + } + + //! Cancel timer and run user handler if not previously invoked \return True if user handler invoked + bool cancel() + { + if (!cancel_timer()) + return false; + handle(LEVIN_ERROR_CONNECTION_DESTROYED, nullptr); return true; } - virtual bool is_timer_started() const + + //! Cancel timer but do not run user handler. \return True if user handler has not been invoked + bool cancel_timer() { - return m_timer_started; + boost::system::error_code ignored{}; + return m_timer.cancel(ignored); } - virtual void cancel() + + // Attempt to adjust timeout for user handler execution. \return True if timer adjusted. + static bool reset_timer(const std::shared_ptr& self, const bool initial = false) { - if(cancel_timer()) + if (!self || (!initial && !self->cancel_timer())) + return false; + if (initial && !self->m_con.start_outer_call()) + return false; + + self->m_timer.expires_from_now(boost::posix_time::milliseconds(self->m_timeout)); + self->m_timer.async_wait([self](const boost::system::error_code& ec) { - epee::span fake; - m_cb(LEVIN_ERROR_CONNECTION_DESTROYED, fake, m_con.get_context_ref()); - m_con.finish_outer_call(); - } + if(ec == boost::asio::error::operation_aborted) + return; + MINFO(self->m_con.get_context_ref() << "Timeout on invoke operation happened, command: " << self->m_command << " timeout: " << self->m_timeout); + self->do_handle(LEVIN_ERROR_CONNECTION_TIMEDOUT, nullptr); // delay finish_outer_call + self->m_con.close(); + self->m_con.finish_outer_call(); + }); + return true; } - virtual bool cancel_timer() + }; + template + struct anvoke_handler final : invoke_response_handler_base + { + template + anvoke_handler(F&& cb, uint64_t timeout, async_protocol_handler& con, int command) + : invoke_response_handler_base(timeout, con, command), m_cb(std::forward(cb)) + {} + virtual ~anvoke_handler() override final + {} + + private: + callback_t m_cb; + virtual void do_handle(int res, const epee::span buff) override final { - if(!m_cancel_timer_called) - { - m_cancel_timer_called = true; - boost::system::error_code ignored_ec; - m_timer_cancelled = 1 == m_timer.cancel(ignored_ec); - } - return m_timer_cancelled; - } - virtual void reset_timer() - { - boost::system::error_code ignored_ec; - if (!m_cancel_timer_called && m_timer.cancel(ignored_ec) > 0) - { - callback_t& cb = m_cb; - uint64_t timeout = m_timeout; - async_protocol_handler& con = m_con; - int command = m_command; - m_timer.expires_from_now(boost::posix_time::milliseconds(m_timeout)); - m_timer.async_wait([&con, cb, command, timeout](const boost::system::error_code& ec) - { - if(ec == boost::asio::error::operation_aborted) - return; - MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout); - epee::span fake; - cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref()); - con.close(); - con.finish_outer_call(); - }); - } + m_cb(res, buff, this->m_con.get_context_ref()); } }; critical_section m_invoke_response_handlers_lock; - std::list > m_invoke_response_handlers; + std::deque> m_invoke_response_handlers; template - bool add_invoke_response_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler& con, int command) + bool add_invoke_response_handler(callback_t &&cb, uint64_t timeout, async_protocol_handler& con, int command) { CRITICAL_REGION_LOCAL(m_invoke_response_handlers_lock); if (m_protocol_released) @@ -296,9 +281,10 @@ public: MERROR("Adding response handler to a released object"); return false; } - boost::shared_ptr handler(boost::make_shared>(cb, timeout, con, command)); - m_invoke_response_handlers.push_back(handler); - return handler->is_timer_started(); + m_invoke_response_handlers.push_back( + std::make_shared>>(std::forward(cb), timeout, con, command) + ); + return invoke_response_handler_base::reset_timer(m_invoke_response_handlers.back(), true); } template friend struct anvoke_handler; public: @@ -372,9 +358,8 @@ public: // Never call callback inside critical section, that can cause deadlock. Callback can be called when // invoke_response_handler_base is cancelled - std::for_each(local_invoke_response_handlers.begin(), local_invoke_response_handlers.end(), [](const boost::shared_ptr& pinv_resp_hndlr) { + for (const std::shared_ptr& pinv_resp_hndlr : local_invoke_response_handlers) pinv_resp_hndlr->cancel(); - }); return true; } @@ -447,8 +432,7 @@ public: if (!m_invoke_response_handlers.empty()) { //async call scenario - boost::shared_ptr response_handler = m_invoke_response_handlers.front(); - response_handler->reset_timer(); + invoke_response_handler_base::reset_timer(m_invoke_response_handlers.front()); MDEBUG(m_connection_context << "LEVIN_PACKET partial msg received. len=" << cb << ", current total " << m_cache_in_buffer.size() << "/" << m_current_head.m_cb << " (" << (100.0f * m_cache_in_buffer.size() / (m_current_head.m_cb ? m_current_head.m_cb : 1)) << "%)"); } } @@ -509,7 +493,7 @@ public: epee::critical_region_t invoke_response_handlers_guard(m_invoke_response_handlers_lock); if(!m_invoke_response_handlers.empty()) {//async call scenario - boost::shared_ptr response_handler = m_invoke_response_handlers.front(); + const std::shared_ptr response_handler = m_invoke_response_handlers.front(); bool timer_cancelled = response_handler->cancel_timer(); // Don't pop handler, to avoid destroying it if(timer_cancelled) @@ -517,7 +501,7 @@ public: invoke_response_handlers_guard.unlock(); if(timer_cancelled) - response_handler->handle(m_current_head.m_return_code, buff_to_invoke, m_connection_context); + response_handler->handle(m_current_head.m_return_code, buff_to_invoke); } else { @@ -628,7 +612,7 @@ public: } template - bool async_invoke(int command, message_writer in_msg, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED) + bool async_invoke(int command, message_writer in_msg, callback_t &&cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED) { misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( boost::bind(&async_protocol_handler::finish_outer_call, this)); @@ -654,7 +638,7 @@ public: break; } - if(!add_invoke_response_handler(cb, timeout, *this, command)) + if(!add_invoke_response_handler(std::forward(cb), timeout, *this, command)) { err_code = LEVIN_ERROR_CONNECTION_DESTROYED; break; @@ -836,11 +820,11 @@ int async_protocol_handler_config::invoke(int command, mes } //------------------------------------------------------------------------------------------ template template -int async_protocol_handler_config::invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout) +int async_protocol_handler_config::invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, callback_t &&cb, size_t timeout) { async_protocol_handler* aph; int r = find_and_lock_connection(connection_id, aph); - return LEVIN_OK == r ? aph->async_invoke(command, std::move(in_msg), cb, timeout) : r; + return LEVIN_OK == r ? aph->async_invoke(command, std::move(in_msg), std::forward(cb), timeout) : r; } //------------------------------------------------------------------------------------------ template template diff --git a/contrib/epee/include/storages/levin_abstract_invoke2.h b/contrib/epee/include/storages/levin_abstract_invoke2.h index 7fd786a53..b76de9775 100644 --- a/contrib/epee/include/storages/levin_abstract_invoke2.h +++ b/contrib/epee/include/storages/levin_abstract_invoke2.h @@ -84,14 +84,14 @@ namespace epee } template - bool async_invoke_remote_command2(const epee::net_utils::connection_context_base &context, int command, const t_arg& out_struct, t_transport& transport, const callback_t &cb, size_t inv_timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED) + bool async_invoke_remote_command2(const epee::net_utils::connection_context_base &context, int command, const t_arg& out_struct, t_transport& transport, callback_t &&cb, size_t inv_timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED) { const boost::uuids::uuid &conn_id = context.m_connection_id; typename serialization::portable_storage stg; const_cast(out_struct).store(stg);//TODO: add true const support to searilzation levin::message_writer to_send{16 * 1024}; stg.store_to_binary(to_send.buffer); - int res = transport.invoke_async(command, std::move(to_send), conn_id, [cb, command](int code, const epee::span buff, typename t_transport::connection_context& context)->bool + int res = transport.invoke_async(command, std::move(to_send), conn_id, [cb = std::forward(cb), command](int code, const epee::span buff, typename t_transport::connection_context& context)->bool { t_result result_struct = AUTO_VAL_INIT(result_struct); if( code <=0 ) diff --git a/src/p2p/net_node.h b/src/p2p/net_node.h index 98d8ecfff..459299f86 100644 --- a/src/p2p/net_node.h +++ b/src/p2p/net_node.h @@ -385,7 +385,7 @@ namespace nodetool void delete_upnp_port_mapping_v6(uint32_t port); void delete_upnp_port_mapping(uint32_t port); template - bool try_ping(basic_node_data& node_data, p2p_connection_context& context, const t_callback &cb); + bool try_ping(basic_node_data& node_data, p2p_connection_context& context, t_callback &&cb); bool try_get_support_flags(const p2p_connection_context& context, std::function f); bool make_expected_connections_count(network_zone& zone, PeerType peer_type, size_t expected_connections); void record_addr_failed(const epee::net_utils::network_address& addr); diff --git a/src/p2p/net_node.inl b/src/p2p/net_node.inl index f33ce977d..e23134cfb 100644 --- a/src/p2p/net_node.inl +++ b/src/p2p/net_node.inl @@ -2321,7 +2321,7 @@ namespace nodetool } //----------------------------------------------------------------------------------- template template - bool node_server::try_ping(basic_node_data& node_data, p2p_connection_context& context, const t_callback &cb) + bool node_server::try_ping(basic_node_data& node_data, p2p_connection_context& context, t_callback &&cb) { if(!node_data.my_port) return false; @@ -2364,9 +2364,9 @@ namespace nodetool address = epee::net_utils::network_address{epee::net_utils::ipv6_network_address(ipv6_addr, node_data.my_port)}; } peerid_type pr = node_data.peer_id; - bool r = zone.m_net_server.connect_async(ip, port, zone.m_config.m_net_config.ping_connection_timeout, [cb, /*context,*/ address, pr, this]( + bool r = zone.m_net_server.connect_async(ip, port, zone.m_config.m_net_config.ping_connection_timeout, [cb = std::forward(cb), /*context,*/ address, pr, this] ( const typename net_server::t_connection_context& ping_context, - const boost::system::error_code& ec)->bool + const boost::system::error_code& ec) mutable ->bool { if(ec) { @@ -2375,19 +2375,11 @@ namespace nodetool } COMMAND_PING::request req; COMMAND_PING::response rsp; - //vc2010 workaround - /*std::string ip_ = ip; - std::string port_=port; - peerid_type pr_ = pr; - auto cb_ = cb;*/ - - // GCC 5.1.0 gives error with second use of uint64_t (peerid_type) variable. - peerid_type pr_ = pr; network_zone& zone = m_network_zones.at(address.get_zone()); bool inv_call_res = epee::net_utils::async_invoke_remote_command2(ping_context, COMMAND_PING::ID, req, zone.m_net_server.get_config_object(), - [=](int code, const COMMAND_PING::response& rsp, p2p_connection_context& context) + [this, pr, cb = std::move(cb), ping_context = std::move(ping_context), address = std::move(address)](int code, const COMMAND_PING::response& rsp, p2p_connection_context& context) { if(code <= 0) { @@ -2398,7 +2390,7 @@ namespace nodetool network_zone& zone = m_network_zones.at(address.get_zone()); if(rsp.status != PING_OK_RESPONSE_STATUS_TEXT || pr != rsp.peer_id) { - LOG_WARNING_CC(ping_context, "back ping invoke wrong response \"" << rsp.status << "\" from" << address.str() << ", hsh_peer_id=" << pr_ << ", rsp.peer_id=" << peerid_to_string(rsp.peer_id)); + LOG_WARNING_CC(ping_context, "back ping invoke wrong response \"" << rsp.status << "\" from" << address.str() << ", hsh_peer_id=" << pr << ", rsp.peer_id=" << peerid_to_string(rsp.peer_id)); zone.m_net_server.get_config_object().close(ping_context.m_connection_id); return; } @@ -2434,7 +2426,7 @@ namespace nodetool COMMAND_REQUEST_SUPPORT_FLAGS::ID, support_flags_request, m_network_zones.at(epee::net_utils::zone::public_).m_net_server.get_config_object(), - [=](int code, const typename COMMAND_REQUEST_SUPPORT_FLAGS::response& rsp, p2p_connection_context& context_) + [f = std::move(f)](int code, const typename COMMAND_REQUEST_SUPPORT_FLAGS::response& rsp, p2p_connection_context& context_) { if(code < 0) {