Merge pull request #51 from wowario/upstream

epee: adaptive connection timeout system
This commit is contained in:
jw 2018-06-10 10:23:05 -07:00 committed by GitHub
commit e30e0918f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 76 additions and 19 deletions

View File

@ -119,6 +119,7 @@ namespace net_utils
//----------------- i_service_endpoint --------------------- //----------------- i_service_endpoint ---------------------
virtual bool do_send(const void* ptr, size_t cb); ///< (see do_send from i_service_endpoint) virtual bool do_send(const void* ptr, size_t cb); ///< (see do_send from i_service_endpoint)
virtual bool do_send_chunk(const void* ptr, size_t cb); ///< will send (or queue) a part of data virtual bool do_send_chunk(const void* ptr, size_t cb); ///< will send (or queue) a part of data
virtual bool send_done();
virtual bool close(); virtual bool close();
virtual bool call_run_once_service_io(); virtual bool call_run_once_service_io();
virtual bool request_callback(); virtual bool request_callback();
@ -137,8 +138,11 @@ namespace net_utils
/// reset connection timeout timer and callback /// reset connection timeout timer and callback
void reset_timer(boost::posix_time::milliseconds ms, bool add); void reset_timer(boost::posix_time::milliseconds ms, bool add);
boost::posix_time::milliseconds get_default_time() const; boost::posix_time::milliseconds get_default_timeout();
boost::posix_time::milliseconds get_timeout_from_bytes_read(size_t bytes) const; boost::posix_time::milliseconds get_timeout_from_bytes_read(size_t bytes);
/// host connection count tracking
unsigned int host_count(const std::string &host, int delta = 0);
/// Buffer for incoming data. /// Buffer for incoming data.
boost::array<char, 8192> buffer_; boost::array<char, 8192> buffer_;
@ -165,6 +169,8 @@ namespace net_utils
boost::asio::deadline_timer m_timer; boost::asio::deadline_timer m_timer;
bool m_local; bool m_local;
bool m_ready_to_close;
std::string m_host;
public: public:
void setRpcStation(); void setRpcStation();

View File

@ -56,8 +56,8 @@
#undef MONERO_DEFAULT_LOG_CATEGORY #undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "net" #define MONERO_DEFAULT_LOG_CATEGORY "net"
#define DEFAULT_TIMEOUT_MS_LOCAL boost::posix_time::milliseconds(120000) // 2 minutes #define DEFAULT_TIMEOUT_MS_LOCAL 1800000 // 30 minutes
#define DEFAULT_TIMEOUT_MS_REMOTE boost::posix_time::milliseconds(10000) // 10 seconds #define DEFAULT_TIMEOUT_MS_REMOTE 300000 // 5 minutes
#define TIMEOUT_EXTRA_MS_PER_BYTE 0.2 #define TIMEOUT_EXTRA_MS_PER_BYTE 0.2
PRAGMA_WARNING_PUSH PRAGMA_WARNING_PUSH
@ -86,7 +86,8 @@ PRAGMA_WARNING_DISABLE_VS(4355)
m_throttle_speed_in("speed_in", "throttle_speed_in"), m_throttle_speed_in("speed_in", "throttle_speed_in"),
m_throttle_speed_out("speed_out", "throttle_speed_out"), m_throttle_speed_out("speed_out", "throttle_speed_out"),
m_timer(io_service), m_timer(io_service),
m_local(false) m_local(false),
m_ready_to_close(false)
{ {
MDEBUG("test, connection constructor set m_connection_type="<<m_connection_type); MDEBUG("test, connection constructor set m_connection_type="<<m_connection_type);
} }
@ -146,7 +147,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
context = boost::value_initialized<t_connection_context>(); context = boost::value_initialized<t_connection_context>();
const unsigned long ip_{boost::asio::detail::socket_ops::host_to_network_long(remote_ep.address().to_v4().to_ulong())}; const unsigned long ip_{boost::asio::detail::socket_ops::host_to_network_long(remote_ep.address().to_v4().to_ulong())};
m_local = epee::net_utils::is_ip_loopback(ip_); m_local = epee::net_utils::is_ip_local(ip_);
// create a random uuid // create a random uuid
boost::uuids::uuid random_uuid; boost::uuids::uuid random_uuid;
@ -165,9 +166,12 @@ PRAGMA_WARNING_DISABLE_VS(4355)
return false; return false;
} }
m_host = context.m_remote_address.host_str();
host_count(m_host, 1);
m_protocol_handler.after_init_connection(); m_protocol_handler.after_init_connection();
reset_timer(get_default_time(), false); reset_timer(get_default_timeout(), false);
socket_.async_read_some(boost::asio::buffer(buffer_), socket_.async_read_some(boost::asio::buffer(buffer_),
strand_.wrap( strand_.wrap(
@ -324,6 +328,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
logger_handle_net_read(bytes_transferred); logger_handle_net_read(bytes_transferred);
context.m_last_recv = time(NULL); context.m_last_recv = time(NULL);
context.m_recv_cnt += bytes_transferred; context.m_recv_cnt += bytes_transferred;
m_ready_to_close = false;
bool recv_res = m_protocol_handler.handle_recv(buffer_.data(), bytes_transferred); bool recv_res = m_protocol_handler.handle_recv(buffer_.data(), bytes_transferred);
if(!recv_res) if(!recv_res)
{ {
@ -356,6 +361,13 @@ PRAGMA_WARNING_DISABLE_VS(4355)
_dbg3("[sock " << socket_.native_handle() << "] Some problems at read: " << e.message() << ':' << e.value()); _dbg3("[sock " << socket_.native_handle() << "] Some problems at read: " << e.message() << ':' << e.value());
shutdown(); shutdown();
} }
else
{
_dbg3("[sock " << socket_.native_handle() << "] peer closed connection");
if (m_ready_to_close)
shutdown();
}
m_ready_to_close = true;
} }
// If an error occurs then no new asynchronous operations are started. This // If an error occurs then no new asynchronous operations are started. This
// means that all shared_ptr references to the connection object will // means that all shared_ptr references to the connection object will
@ -531,7 +543,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
if(m_send_que.size() > 1) if(m_send_que.size() > 1)
{ // active operation should be in progress, nothing to do, just wait last operation callback { // active operation should be in progress, nothing to do, just wait last operation callback
auto size_now = cb; auto size_now = cb;
MDEBUG("do_send() NOW just queues: packet="<<size_now<<" B, is added to queue-size="<<m_send_que.size()); MDEBUG("do_send_chunk() NOW just queues: packet="<<size_now<<" B, is added to queue-size="<<m_send_que.size());
//do_send_handler_delayed( ptr , size_now ); // (((H))) // empty function //do_send_handler_delayed( ptr , size_now ); // (((H))) // empty function
LOG_TRACE_CC(context, "[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size()); LOG_TRACE_CC(context, "[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size());
@ -546,12 +558,12 @@ PRAGMA_WARNING_DISABLE_VS(4355)
} }
auto size_now = m_send_que.front().size(); auto size_now = m_send_que.front().size();
MDEBUG("do_send() NOW SENSD: packet="<<size_now<<" B"); MDEBUG("do_send_chunk() NOW SENSD: packet="<<size_now<<" B");
if (speed_limit_is_enabled()) if (speed_limit_is_enabled())
do_send_handler_write( ptr , size_now ); // (((H))) do_send_handler_write( ptr , size_now ); // (((H)))
CHECK_AND_ASSERT_MES( size_now == m_send_que.front().size(), false, "Unexpected queue size"); CHECK_AND_ASSERT_MES( size_now == m_send_que.front().size(), false, "Unexpected queue size");
reset_timer(get_default_time(), false); reset_timer(get_default_timeout(), false);
boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), size_now ) , boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), size_now ) ,
//strand_.wrap( //strand_.wrap(
boost::bind(&connection<t_protocol_handler>::handle_write, self, _1, _2) boost::bind(&connection<t_protocol_handler>::handle_write, self, _1, _2)
@ -566,29 +578,50 @@ PRAGMA_WARNING_DISABLE_VS(4355)
return true; return true;
CATCH_ENTRY_L0("connection<t_protocol_handler>::do_send", false); CATCH_ENTRY_L0("connection<t_protocol_handler>::do_send_chunk", false);
} // do_send_chunk } // do_send_chunk
//--------------------------------------------------------------------------------- //---------------------------------------------------------------------------------
template<class t_protocol_handler> template<class t_protocol_handler>
boost::posix_time::milliseconds connection<t_protocol_handler>::get_default_time() const boost::posix_time::milliseconds connection<t_protocol_handler>::get_default_timeout()
{ {
const unsigned count = host_count(m_host);
const unsigned shift = std::min(std::max(count, 1u) - 1, 8u);
boost::posix_time::milliseconds timeout(0);
if (m_local) if (m_local)
return DEFAULT_TIMEOUT_MS_LOCAL; timeout = boost::posix_time::milliseconds(DEFAULT_TIMEOUT_MS_LOCAL >> shift);
else else
return DEFAULT_TIMEOUT_MS_REMOTE; timeout = boost::posix_time::milliseconds(DEFAULT_TIMEOUT_MS_REMOTE >> shift);
return timeout;
} }
//--------------------------------------------------------------------------------- //---------------------------------------------------------------------------------
template<class t_protocol_handler> template<class t_protocol_handler>
boost::posix_time::milliseconds connection<t_protocol_handler>::get_timeout_from_bytes_read(size_t bytes) const boost::posix_time::milliseconds connection<t_protocol_handler>::get_timeout_from_bytes_read(size_t bytes)
{ {
boost::posix_time::milliseconds ms = (boost::posix_time::milliseconds)(unsigned)(bytes * TIMEOUT_EXTRA_MS_PER_BYTE); boost::posix_time::milliseconds ms = (boost::posix_time::milliseconds)(unsigned)(bytes * TIMEOUT_EXTRA_MS_PER_BYTE);
ms += m_timer.expires_from_now(); ms += m_timer.expires_from_now();
if (ms > get_default_time()) if (ms > get_default_timeout())
ms = get_default_time(); ms = get_default_timeout();
return ms; return ms;
} }
//--------------------------------------------------------------------------------- //---------------------------------------------------------------------------------
template<class t_protocol_handler> template<class t_protocol_handler>
unsigned int connection<t_protocol_handler>::host_count(const std::string &host, int delta)
{
static boost::mutex hosts_mutex;
CRITICAL_REGION_LOCAL(hosts_mutex);
static std::map<std::string, unsigned int> hosts;
unsigned int &val = hosts[host];
CHECK_AND_ASSERT_THROW_MES(delta >= 0 || val >= (unsigned)-delta, "Count would go negative");
CHECK_AND_ASSERT_THROW_MES(delta <= 0 || val <= std::numeric_limits<unsigned int>::max() - (unsigned)delta, "Count would wrap");
val += delta;
if (delta > 0)
MTRACE("New connection from host " << host << ": " << val);
else if (delta < 0)
MTRACE("Closed connection from host " << host << ": " << val);
return val;
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
void connection<t_protocol_handler>::reset_timer(boost::posix_time::milliseconds ms, bool add) void connection<t_protocol_handler>::reset_timer(boost::posix_time::milliseconds ms, bool add)
{ {
if (m_connection_type != e_connection_type_RPC) if (m_connection_type != e_connection_type_RPC)
@ -621,6 +654,11 @@ PRAGMA_WARNING_DISABLE_VS(4355)
socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec); socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec);
m_was_shutdown = true; m_was_shutdown = true;
m_protocol_handler.release_protocol(); m_protocol_handler.release_protocol();
if (!m_host.empty())
{
host_count(m_host, -1);
m_host = "";
}
return true; return true;
} }
//--------------------------------------------------------------------------------- //---------------------------------------------------------------------------------
@ -645,6 +683,15 @@ PRAGMA_WARNING_DISABLE_VS(4355)
} }
//--------------------------------------------------------------------------------- //---------------------------------------------------------------------------------
template<class t_protocol_handler> template<class t_protocol_handler>
bool connection<t_protocol_handler>::send_done()
{
if (m_ready_to_close)
return close();
m_ready_to_close = true;
return true;
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
bool connection<t_protocol_handler>::cancel() bool connection<t_protocol_handler>::cancel()
{ {
return close(); return close();
@ -687,7 +734,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
}else }else
{ {
//have more data to send //have more data to send
reset_timer(get_default_time(), false); reset_timer(get_default_timeout(), false);
auto size_now = m_send_que.front().size(); auto size_now = m_send_que.front().size();
MDEBUG("handle_write() NOW SENDS: packet="<<size_now<<" B" <<", from queue size="<<m_send_que.size()); MDEBUG("handle_write() NOW SENDS: packet="<<size_now<<" B" <<", from queue size="<<m_send_que.size());
if (speed_limit_is_enabled()) if (speed_limit_is_enabled())

View File

@ -399,7 +399,7 @@ namespace net_utils
template<class t_connection_context> template<class t_connection_context>
bool simple_http_connection_handler<t_connection_context>::analize_cached_request_header_and_invoke_state(size_t pos) bool simple_http_connection_handler<t_connection_context>::analize_cached_request_header_and_invoke_state(size_t pos)
{ {
//LOG_PRINT_L4("HTTP HEAD:\r\n" << m_cache.substr(0, pos)); LOG_PRINT_L3("HTTP HEAD:\r\n" << m_cache.substr(0, pos));
m_query_info.m_full_request_buf_size = pos; m_query_info.m_full_request_buf_size = pos;
m_query_info.m_request_head.assign(m_cache.begin(), m_cache.begin()+pos); m_query_info.m_request_head.assign(m_cache.begin(), m_cache.begin()+pos);
@ -582,6 +582,7 @@ namespace net_utils
m_psnd_hndlr->do_send((void*)response_data.data(), response_data.size()); m_psnd_hndlr->do_send((void*)response_data.data(), response_data.size());
if ((response.m_body.size() && (query_info.m_http_method != http::http_method_head)) || (query_info.m_http_method == http::http_method_options)) if ((response.m_body.size() && (query_info.m_http_method != http::http_method_head)) || (query_info.m_http_method == http::http_method_options))
m_psnd_hndlr->do_send((void*)response.m_body.data(), response.m_body.size()); m_psnd_hndlr->do_send((void*)response.m_body.data(), response.m_body.size());
m_psnd_hndlr->send_done();
return res; return res;
} }
//----------------------------------------------------------------------------------- //-----------------------------------------------------------------------------------

View File

@ -281,6 +281,7 @@ namespace net_utils
{ {
virtual bool do_send(const void* ptr, size_t cb)=0; virtual bool do_send(const void* ptr, size_t cb)=0;
virtual bool close()=0; virtual bool close()=0;
virtual bool send_done()=0;
virtual bool call_run_once_service_io()=0; virtual bool call_run_once_service_io()=0;
virtual bool request_callback()=0; virtual bool request_callback()=0;
virtual boost::asio::io_service& get_io_service()=0; virtual boost::asio::io_service& get_io_service()=0;

View File

@ -158,6 +158,7 @@ namespace
} }
virtual bool close() { return true; } virtual bool close() { return true; }
virtual bool send_done() { return true; }
virtual bool call_run_once_service_io() { return true; } virtual bool call_run_once_service_io() { return true; }
virtual bool request_callback() { return true; } virtual bool request_callback() { return true; }
virtual boost::asio::io_service& get_io_service() { return m_io_service; } virtual boost::asio::io_service& get_io_service() { return m_io_service; }

View File

@ -150,6 +150,7 @@ namespace
} }
virtual bool close() { /*std::cout << "test_connection::close()" << std::endl; */return true; } virtual bool close() { /*std::cout << "test_connection::close()" << std::endl; */return true; }
virtual bool send_done() { /*std::cout << "test_connection::send_done()" << std::endl; */return true; }
virtual bool call_run_once_service_io() { std::cout << "test_connection::call_run_once_service_io()" << std::endl; return true; } virtual bool call_run_once_service_io() { std::cout << "test_connection::call_run_once_service_io()" << std::endl; return true; }
virtual bool request_callback() { std::cout << "test_connection::request_callback()" << std::endl; return true; } virtual bool request_callback() { std::cout << "test_connection::request_callback()" << std::endl; return true; }
virtual boost::asio::io_service& get_io_service() { std::cout << "test_connection::get_io_service()" << std::endl; return m_io_service; } virtual boost::asio::io_service& get_io_service() { std::cout << "test_connection::get_io_service()" << std::endl; return m_io_service; }