Change p2p connection map from raw pointers to weak_ptrs

This commit is contained in:
Lee Clagett 2021-01-18 00:08:19 +00:00 committed by Lee *!* Clagett
parent 00fd416a99
commit 1e21e4e324
10 changed files with 358 additions and 441 deletions

View File

@ -84,10 +84,10 @@ namespace net_utils
/// Represents a single connection from a client.
template<class t_protocol_handler>
class connection
: public boost::enable_shared_from_this<connection<t_protocol_handler> >,
: public std::enable_shared_from_this<connection<t_protocol_handler>>,
private boost::noncopyable,
public i_service_endpoint,
public connection_basic
public connection_basic, // shared_state shared_ptr must be destroyed after service_endpoint
public service_endpoint<t_protocol_handler>
{
public:
typedef typename t_protocol_handler::connection_context t_connection_context;
@ -248,14 +248,13 @@ namespace net_utils
io_context_t &m_io_context;
t_connection_type m_connection_type;
t_connection_context m_conn_context{};
strand_t m_strand;
timers_t m_timers;
connection_ptr self{};
bool m_local{};
std::string m_host{};
state_t m_state{};
t_protocol_handler m_handler;
public:
struct shared_state : connection_basic_shared_state, t_protocol_handler::config_type
{
@ -288,7 +287,7 @@ namespace net_utils
// `real_remote` is the actual endpoint (if connection is to proxy, etc.)
bool start(bool is_income, bool is_multithreaded, network_address real_remote);
void get_context(t_connection_context& context_){context_ = m_conn_context;}
void get_context(t_connection_context& context_){context_ = get_context();}
void call_back_starter();
@ -307,9 +306,12 @@ namespace net_utils
virtual bool call_run_once_service_io();
virtual bool request_callback();
virtual boost::asio::io_service& get_io_service();
virtual bool add_ref();
virtual bool release();
//------------------------------------------------------
const t_connection_context& get_context() const noexcept { return this->context; }
t_connection_context& get_context() noexcept { return this->context; }
const t_protocol_handler& get_protocol_handler() const noexcept { return this->m_protocol_handler; }
t_protocol_handler& get_protocol_handler() noexcept { return this->m_protocol_handler; }
public:
void setRpcStation();
};
@ -330,7 +332,7 @@ namespace net_utils
};
public:
typedef boost::shared_ptr<connection<t_protocol_handler> > connection_ptr;
typedef std::shared_ptr<connection<t_protocol_handler>> connection_ptr;
typedef typename t_protocol_handler::connection_context t_connection_context;
/// Construct the server to listen on the specified TCP address and port, and
/// serve up files from the given directory.

View File

@ -248,7 +248,7 @@ namespace net_utils
m_state.socket.handle_read = true;
connection_basic::strand_.post(
[this, self, bytes_transferred]{
bool success = m_handler.handle_recv(
bool success = get_protocol_handler().handle_recv(
reinterpret_cast<char *>(m_state.data.read.buffer.data()),
bytes_transferred
);
@ -377,9 +377,9 @@ namespace net_utils
{
m_state.stat.in.throttle.handle_trafic_exact(bytes_transferred);
const auto speed = m_state.stat.in.throttle.get_current_speed();
m_conn_context.m_current_speed_down = speed;
m_conn_context.m_max_speed_down = std::max(
m_conn_context.m_max_speed_down,
get_context().m_current_speed_down = speed;
get_context().m_max_speed_down = std::max(
get_context().m_max_speed_down,
speed
);
{
@ -390,8 +390,8 @@ namespace net_utils
).handle_trafic_exact(bytes_transferred);
}
connection_basic::logger_handle_net_read(bytes_transferred);
m_conn_context.m_last_recv = time(NULL);
m_conn_context.m_recv_cnt += bytes_transferred;
get_context().m_last_recv = time(NULL);
get_context().m_recv_cnt += bytes_transferred;
start_timer(get_timeout_from_bytes_read(bytes_transferred), true);
}
@ -403,7 +403,7 @@ namespace net_utils
m_state.socket.handle_read = true;
connection_basic::strand_.post(
[this, self, bytes_transferred]{
bool success = m_handler.handle_recv(
bool success = get_protocol_handler().handle_recv(
reinterpret_cast<char *>(m_state.data.read.buffer.data()),
bytes_transferred
);
@ -508,9 +508,9 @@ namespace net_utils
{
m_state.stat.out.throttle.handle_trafic_exact(bytes_transferred);
const auto speed = m_state.stat.out.throttle.get_current_speed();
m_conn_context.m_current_speed_up = speed;
m_conn_context.m_max_speed_down = std::max(
m_conn_context.m_max_speed_down,
get_context().m_current_speed_up = speed;
get_context().m_max_speed_down = std::max(
get_context().m_max_speed_down,
speed
);
{
@ -521,8 +521,8 @@ namespace net_utils
).handle_trafic_exact(bytes_transferred);
}
connection_basic::logger_handle_net_write(bytes_transferred);
m_conn_context.m_last_send = time(NULL);
m_conn_context.m_send_cnt += bytes_transferred;
get_context().m_last_send = time(NULL);
get_context().m_send_cnt += bytes_transferred;
start_timer(get_default_timeout(), true);
}
@ -632,7 +632,7 @@ namespace net_utils
return;
m_state.protocol.wait_release = true;
m_state.lock.unlock();
m_handler.release_protocol();
get_protocol_handler().release_protocol();
m_state.lock.lock();
m_state.protocol.wait_release = false;
m_state.protocol.released = true;
@ -891,7 +891,7 @@ namespace net_utils
if (ec.value())
return false;
connection_basic::m_is_multithreaded = is_multithreaded;
m_conn_context.set_details(
get_context().set_details(
boost::uuids::random_generator()(),
*real_remote,
is_income,
@ -915,7 +915,7 @@ namespace net_utils
);
m_state.protocol.wait_init = true;
guard.unlock();
m_handler.after_init_connection();
static_cast<shared_state&>(connection_basic::get_state()).after_init_connection(connection<T>::shared_from_this());
guard.lock();
m_state.protocol.wait_init = false;
m_state.protocol.initialized = true;
@ -954,7 +954,7 @@ namespace net_utils
ssl_support_t ssl_support
):
connection_basic(std::move(socket), shared_state, ssl_support),
m_handler(this, *shared_state, m_conn_context),
service_endpoint<T>(check_and_get(shared_state)),
m_connection_type(connection_type),
m_io_context{GET_IO_SERVICE(connection_basic::socket_)},
m_strand{m_io_context},
@ -1014,7 +1014,7 @@ namespace net_utils
" connection type " << std::to_string(m_connection_type) <<
" " << connection_basic::socket().local_endpoint().address().to_string() <<
":" << connection_basic::socket().local_endpoint().port() <<
" <--> " << m_conn_context.m_remote_address.str() <<
" <--> " << get_context().m_remote_address.str() <<
" (via " << address << ":" << port << ")"
);
}
@ -1076,7 +1076,7 @@ namespace net_utils
auto self = connection<T>::shared_from_this();
++m_state.protocol.wait_callback;
connection_basic::strand_.post([this, self]{
m_handler.handle_qued_callback();
get_protocol_handler().handle_qued_callback();
std::lock_guard<std::mutex> guard(m_state.lock);
--m_state.protocol.wait_callback;
if (m_state.status == status_t::INTERRUPTED)
@ -1093,31 +1093,6 @@ namespace net_utils
return m_io_context;
}
template<typename T>
bool connection<T>::add_ref()
{
try {
auto self = connection<T>::shared_from_this();
std::lock_guard<std::mutex> guard(m_state.lock);
this->self = std::move(self);
++m_state.protocol.reference_counter;
return true;
}
catch (boost::bad_weak_ptr &exception) {
return false;
}
}
template<typename T>
bool connection<T>::release()
{
connection_ptr self;
std::lock_guard<std::mutex> guard(m_state.lock);
if (!(--m_state.protocol.reference_counter))
self = std::move(this->self);
return true;
}
template<typename T>
void connection<T>::setRpcStation()
{

View File

@ -57,6 +57,12 @@ namespace net_utils
boost::optional<login> m_user;
size_t m_max_content_length{std::numeric_limits<size_t>::max()};
critical_section m_lock;
template<typename T>
static constexpr bool after_init_connection(const std::shared_ptr<T>&) noexcept
{
return true;
}
};
/************************************************************************/
@ -86,10 +92,6 @@ namespace net_utils
{
return true;
}
bool after_init_connection()
{
return true;
}
virtual bool handle_recv(const void* ptr, size_t cb);
virtual bool handle_request(const http::http_request_info& query_info, http_response_info& response);

View File

@ -79,15 +79,16 @@ class async_protocol_handler;
template<class t_connection_context>
class async_protocol_handler_config
{
typedef boost::unordered_map<boost::uuids::uuid, async_protocol_handler<t_connection_context>* > connections_map;
typedef net_utils::service_endpoint<async_protocol_handler<t_connection_context>> levin_endpoint;
typedef boost::unordered_map<boost::uuids::uuid, std::weak_ptr<levin_endpoint>> connections_map;
critical_section m_connects_lock;
connections_map m_connects;
std::atomic<std::size_t> m_incoming_count;
std::atomic<std::size_t> m_outgoing_count;
void add_connection(async_protocol_handler<t_connection_context>* pc);
void del_connection(async_protocol_handler<t_connection_context>* pc);
async_protocol_handler<t_connection_context>* find_connection(boost::uuids::uuid connection_id) const;
int find_and_lock_connection(boost::uuids::uuid connection_id, async_protocol_handler<t_connection_context>*& aph);
std::shared_ptr<levin_endpoint> find_and_lock_connection(const boost::uuids::uuid& connection_id);
friend class async_protocol_handler<t_connection_context>;
@ -117,8 +118,13 @@ public:
size_t get_out_connections_count();
size_t get_in_connections_count();
void set_handler(levin_commands_handler<t_connection_context>* handler, void (*destroy)(levin_commands_handler<t_connection_context>*) = NULL);
bool after_init_connection(const std::shared_ptr<levin_endpoint>& pconn);
async_protocol_handler_config():m_pcommands_handler(NULL), m_pcommands_handler_destroy(NULL), m_initial_max_packet_size(LEVIN_INITIAL_MAX_PACKET_SIZE), m_max_packet_size(LEVIN_DEFAULT_MAX_PACKET_SIZE), m_invoke_timeout(LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
async_protocol_handler_config()
: m_incoming_count(0), m_outgoing_count(0),
m_pcommands_handler(NULL), m_pcommands_handler_destroy(NULL),
m_initial_max_packet_size(LEVIN_INITIAL_MAX_PACKET_SIZE), m_max_packet_size(LEVIN_DEFAULT_MAX_PACKET_SIZE),
m_invoke_timeout(LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
{}
~async_protocol_handler_config() { set_handler(NULL, NULL); }
void del_out_connections(size_t count);
@ -181,8 +187,8 @@ public:
struct invoke_response_handler_base
{
virtual ~invoke_response_handler_base() {}
virtual bool handle(int res, const epee::span<const uint8_t> buff, connection_context& context)=0;
virtual bool is_timer_started() const=0;
virtual void cancel()=0;
virtual bool cancel_timer()=0;
virtual void reset_timer()=0;
@ -190,33 +196,26 @@ public:
template <class callback_t>
struct anvoke_handler: invoke_response_handler_base
{
anvoke_handler(const callback_t& cb, uint64_t timeout, async_protocol_handler& con, int command)
:m_cb(cb), m_timeout(timeout), m_con(con), m_timer(con.m_pservice_endpoint->get_io_service()), m_timer_started(false),
anvoke_handler(const callback_t& cb, uint64_t timeout, std::shared_ptr<net_utils::service_endpoint<async_protocol_handler>> con, int command)
:m_cb(cb), m_timeout(timeout), m_con(con), m_timer(con->get_io_service()),
m_cancel_timer_called(false), m_timer_cancelled(false), m_command(command)
{
if(m_con.start_outer_call())
MDEBUG(con->context << "anvoke_handler, timeout: " << timeout);
m_timer.expires_from_now(boost::posix_time::milliseconds(timeout));
m_timer.async_wait([con = std::move(con), command, cb, timeout](const boost::system::error_code& ec)
{
MDEBUG(con.get_context_ref() << "anvoke_handler, timeout: " << timeout);
m_timer.expires_from_now(boost::posix_time::milliseconds(timeout));
m_timer.async_wait([&con, command, cb, timeout](const boost::system::error_code& ec)
{
if(ec == boost::asio::error::operation_aborted)
return;
MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout);
epee::span<const uint8_t> fake;
cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
con.close();
con.finish_outer_call();
});
m_timer_started = true;
}
if(ec == boost::asio::error::operation_aborted)
return;
MINFO(con->context << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout);
cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, nullptr, con->context);
con->close();
});
}
virtual ~anvoke_handler()
{}
callback_t m_cb;
async_protocol_handler& m_con;
std::weak_ptr<net_utils::service_endpoint<async_protocol_handler>> m_con;
boost::asio::deadline_timer m_timer;
bool m_timer_started;
bool m_cancel_timer_called;
bool m_timer_cancelled;
uint64_t m_timeout;
@ -226,20 +225,14 @@ public:
if(!cancel_timer())
return false;
m_cb(res, buff, context);
m_con.finish_outer_call();
return true;
}
virtual bool is_timer_started() const
{
return m_timer_started;
}
virtual void cancel()
{
if(cancel_timer())
std::shared_ptr<net_utils::service_endpoint<async_protocol_handler>> con;
if(cancel_timer() && (con = m_con.lock()))
{
epee::span<const uint8_t> fake;
m_cb(LEVIN_ERROR_CONNECTION_DESTROYED, fake, m_con.get_context_ref());
m_con.finish_outer_call();
m_cb(LEVIN_ERROR_CONNECTION_DESTROYED, nullptr, con->context);
}
}
virtual bool cancel_timer()
@ -255,22 +248,20 @@ public:
virtual void reset_timer()
{
boost::system::error_code ignored_ec;
if (!m_cancel_timer_called && m_timer.cancel(ignored_ec) > 0)
std::shared_ptr<net_utils::service_endpoint<async_protocol_handler>> con;
if (!m_cancel_timer_called && m_timer.cancel(ignored_ec) > 0 && (con = m_con.lock()))
{
callback_t& cb = m_cb;
uint64_t timeout = m_timeout;
async_protocol_handler& con = m_con;
int command = m_command;
m_timer.expires_from_now(boost::posix_time::milliseconds(m_timeout));
m_timer.async_wait([&con, cb, command, timeout](const boost::system::error_code& ec)
m_timer.async_wait([con = std::move(con), cb, command, timeout](const boost::system::error_code& ec)
{
if(ec == boost::asio::error::operation_aborted)
return;
MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout);
epee::span<const uint8_t> fake;
cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
con.close();
con.finish_outer_call();
MINFO(con->context << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout);
cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, nullptr, con->context);
con->close();
});
}
}
@ -279,7 +270,7 @@ public:
std::list<boost::shared_ptr<invoke_response_handler_base> > m_invoke_response_handlers;
template<class callback_t>
bool add_invoke_response_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler& con, int command)
bool add_invoke_response_handler(const callback_t &cb, uint64_t timeout, std::shared_ptr<net_utils::service_endpoint<async_protocol_handler>> con, int command)
{
CRITICAL_REGION_LOCAL(m_invoke_response_handlers_lock);
if (m_protocol_released)
@ -287,15 +278,17 @@ public:
MERROR("Adding response handler to a released object");
return false;
}
boost::shared_ptr<invoke_response_handler_base> handler(boost::make_shared<anvoke_handler<callback_t>>(cb, timeout, con, command));
m_invoke_response_handlers.push_back(handler);
return handler->is_timer_started();
boost::shared_ptr<invoke_response_handler_base> handler(boost::make_shared<anvoke_handler<callback_t>>(cb, timeout, std::move(con), command));
m_invoke_response_handlers.push_back(std::move(handler));
return true;
}
template<class callback_t> friend struct anvoke_handler;
public:
async_protocol_handler(net_utils::i_service_endpoint* psnd_hndlr,
config_type& config,
t_connection_context& conn_context):
m_wait_count(0),
m_current_head(bucket_head2()),
m_pservice_endpoint(psnd_hndlr),
m_config(config),
@ -306,7 +299,6 @@ public:
{
m_close_called = 0;
m_protocol_released = false;
m_wait_count = 0;
m_oponent_protocol_ver = 0;
m_connection_initialized = false;
}
@ -320,37 +312,12 @@ public:
m_config.del_connection(this);
}
for (size_t i = 0; i < 60 * 1000 / 100 && 0 != m_wait_count; ++i)
{
misc_utils::sleep_no_w(100);
}
CHECK_AND_ASSERT_MES_NO_RET(0 == m_wait_count, "Failed to wait for operation completion. m_wait_count = " << m_wait_count.load());
MTRACE(m_connection_context << "~async_protocol_handler()");
}
catch (...) { /* ignore */ }
}
bool start_outer_call()
{
MTRACE(m_connection_context << "[levin_protocol] -->> start_outer_call");
if(!m_pservice_endpoint->add_ref())
{
MERROR(m_connection_context << "[levin_protocol] -->> start_outer_call failed");
return false;
}
++m_wait_count;
return true;
}
bool finish_outer_call()
{
MTRACE(m_connection_context << "[levin_protocol] <<-- finish_outer_call");
--m_wait_count;
m_pservice_endpoint->release();
return true;
}
bool release_protocol()
{
decltype(m_invoke_response_handlers) local_invoke_response_handlers;
@ -383,9 +350,6 @@ public:
void request_callback()
{
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler::finish_outer_call, this));
m_pservice_endpoint->request_callback();
}
@ -593,21 +557,10 @@ public:
return true;
}
bool after_init_connection()
{
if (!m_connection_initialized)
{
m_connection_initialized = true;
m_config.add_connection(this);
}
return true;
}
template<class callback_t>
bool async_invoke(int command, message_writer in_msg, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
bool async_invoke(std::shared_ptr<net_utils::service_endpoint<async_protocol_handler>> self, int command, message_writer in_msg, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
{
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler::finish_outer_call, this));
assert(self && this == std::addressof(self->m_protocol_handler));
if(timeout == LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
timeout = m_config.m_invoke_timeout;
@ -627,7 +580,7 @@ public:
break;
}
if(!add_invoke_response_handler(cb, timeout, *this, command))
if(!add_invoke_response_handler(cb, timeout, std::move(self), command))
{
err_code = LEVIN_ERROR_CONNECTION_DESTROYED;
break;
@ -654,10 +607,6 @@ public:
\return 1 on success */
int send(byte_slice message)
{
const misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler::finish_outer_call, this)
);
if (!send_message(std::move(message)))
{
LOG_ERROR_CC(m_connection_context, "Failed to send message, dropping it");
@ -675,27 +624,28 @@ template<class t_connection_context>
void async_protocol_handler_config<t_connection_context>::del_connection(async_protocol_handler<t_connection_context>* pconn)
{
CRITICAL_REGION_BEGIN(m_connects_lock);
m_connects.erase(pconn->get_connection_id());
if (!m_connects.erase(pconn->get_connection_id()))
return;
if (pconn->get_context_ref().m_is_income)
--m_incoming_count;
else
--m_outgoing_count;
CRITICAL_REGION_END();
m_pcommands_handler->on_connection_close(pconn->m_connection_context);
if (m_pcommands_handler)
m_pcommands_handler->on_connection_close(pconn->get_context_ref());
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
void async_protocol_handler_config<t_connection_context>::delete_connections(size_t count, bool incoming)
{
std::vector<typename connections_map::mapped_type> connections;
auto scope_exit_handler = misc_utils::create_scope_leave_handler([&connections]{
for (auto &aph: connections)
aph->finish_outer_call();
});
std::vector<std::shared_ptr<levin_endpoint>> connections;
CRITICAL_REGION_BEGIN(m_connects_lock);
for (auto& c: m_connects)
{
if (c.second->m_connection_context.m_is_income == incoming)
if (c.second->start_outer_call())
connections.push_back(c.second);
auto locked = c.second.lock();
if (locked && locked->context.m_is_income == incoming)
connections.push_back(std::move(locked));
}
// close random connections from the provided set
@ -703,7 +653,7 @@ void async_protocol_handler_config<t_connection_context>::delete_connections(siz
unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
shuffle(connections.begin(), connections.end(), std::default_random_engine(seed));
for (size_t i = 0; i < connections.size() && i < count; ++i)
m_connects.erase(connections[i]->get_connection_id());
m_connects.erase(connections[i]->context.m_connection_id);
CRITICAL_REGION_END();
@ -724,60 +674,55 @@ void async_protocol_handler_config<t_connection_context>::del_in_connections(siz
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
void async_protocol_handler_config<t_connection_context>::add_connection(async_protocol_handler<t_connection_context>* pconn)
bool async_protocol_handler_config<t_connection_context>::after_init_connection(const std::shared_ptr<levin_endpoint>& pconn)
{
if (!pconn || pconn->m_protocol_handler.m_connection_initialized)
return false;
CRITICAL_REGION_BEGIN(m_connects_lock);
m_connects[pconn->get_connection_id()] = pconn;
if (!m_connects.emplace(pconn->context.m_connection_id, pconn).second)
return false;
pconn->m_protocol_handler.m_connection_initialized = true;
if (pconn->context.m_is_income)
++m_incoming_count;
else
++m_outgoing_count;
CRITICAL_REGION_END();
m_pcommands_handler->on_connection_new(pconn->m_connection_context);
m_pcommands_handler->on_connection_new(pconn->context);
return true;
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
async_protocol_handler<t_connection_context>* async_protocol_handler_config<t_connection_context>::find_connection(boost::uuids::uuid connection_id) const
{
auto it = m_connects.find(connection_id);
return it == m_connects.end() ? 0 : it->second;
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
int async_protocol_handler_config<t_connection_context>::find_and_lock_connection(boost::uuids::uuid connection_id, async_protocol_handler<t_connection_context>*& aph)
std::shared_ptr<net_utils::service_endpoint<async_protocol_handler<t_connection_context>>> async_protocol_handler_config<t_connection_context>::find_and_lock_connection(const boost::uuids::uuid& connection_id)
{
CRITICAL_REGION_LOCAL(m_connects_lock);
aph = find_connection(connection_id);
if(0 == aph)
return LEVIN_ERROR_CONNECTION_NOT_FOUND;
if(!aph->start_outer_call())
return LEVIN_ERROR_CONNECTION_DESTROYED;
return LEVIN_OK;
const auto aph = m_connects.find(connection_id);
return aph == m_connects.end() ? nullptr : aph->second.lock();
}
//------------------------------------------------------------------------------------------
template<class t_connection_context> template<class callback_t>
int async_protocol_handler_config<t_connection_context>::invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout)
{
async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph);
return LEVIN_OK == r ? aph->async_invoke(command, std::move(in_msg), cb, timeout) : r;
std::shared_ptr<levin_endpoint> con = find_and_lock_connection(connection_id);
if (!con)
return LEVIN_ERROR_CONNECTION_NOT_FOUND;
levin_endpoint& ref = *con;
return ref.m_protocol_handler.async_invoke(std::move(con), command, std::move(in_msg), cb, timeout);
}
//------------------------------------------------------------------------------------------
template<class t_connection_context> template<class callback_t>
bool async_protocol_handler_config<t_connection_context>::foreach_connection(const callback_t &cb)
{
std::vector<typename connections_map::mapped_type> conn;
auto scope_exit_handler = misc_utils::create_scope_leave_handler([&conn]{
for (auto &aph: conn)
aph->finish_outer_call();
});
std::vector<std::shared_ptr<levin_endpoint>> conn;
CRITICAL_REGION_BEGIN(m_connects_lock);
conn.reserve(m_connects.size());
for (auto &e: m_connects)
if (e.second->start_outer_call())
conn.push_back(e.second);
CRITICAL_REGION_END()
conn.push_back(e.second.lock());
CRITICAL_REGION_END();
for (auto &aph: conn)
if (!cb(aph->get_context_ref()))
for (auto &c: conn)
if (c && !cb(c->context))
return false;
return true;
@ -786,14 +731,8 @@ bool async_protocol_handler_config<t_connection_context>::foreach_connection(con
template<class t_connection_context> template<class callback_t>
bool async_protocol_handler_config<t_connection_context>::for_connection(const boost::uuids::uuid &connection_id, const callback_t &cb)
{
async_protocol_handler<t_connection_context>* aph = nullptr;
if (find_and_lock_connection(connection_id, aph) != LEVIN_OK)
return false;
auto scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler<t_connection_context>::finish_outer_call, aph));
if(!cb(aph->get_context_ref()))
return false;
return true;
const std::shared_ptr<levin_endpoint> aph = find_and_lock_connection(connection_id);
return aph && cb(aph->context);
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
@ -806,23 +745,13 @@ size_t async_protocol_handler_config<t_connection_context>::get_connections_coun
template<class t_connection_context>
size_t async_protocol_handler_config<t_connection_context>::get_out_connections_count()
{
CRITICAL_REGION_LOCAL(m_connects_lock);
size_t count = 0;
for (const auto &c: m_connects)
if (!c.second->m_connection_context.m_is_income)
++count;
return count;
return m_outgoing_count;
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
size_t async_protocol_handler_config<t_connection_context>::get_in_connections_count()
{
CRITICAL_REGION_LOCAL(m_connects_lock);
size_t count = 0;
for (const auto &c: m_connects)
if (c.second->m_connection_context.m_is_income)
++count;
return count;
return m_incoming_count;
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
@ -837,20 +766,15 @@ void async_protocol_handler_config<t_connection_context>::set_handler(levin_comm
template<class t_connection_context>
int async_protocol_handler_config<t_connection_context>::send(byte_slice message, const boost::uuids::uuid& connection_id)
{
async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph);
return LEVIN_OK == r ? aph->send(std::move(message)) : 0;
const std::shared_ptr<levin_endpoint> aph = find_and_lock_connection(connection_id);
return aph ? aph->m_protocol_handler.send(std::move(message)) : 0;
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
bool async_protocol_handler_config<t_connection_context>::close(boost::uuids::uuid connection_id)
{
async_protocol_handler<t_connection_context>* aph = nullptr;
if (find_and_lock_connection(connection_id, aph) != LEVIN_OK)
return false;
auto scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler<t_connection_context>::finish_outer_call, aph));
if (!aph->close())
const std::shared_ptr<levin_endpoint> aph = find_and_lock_connection(connection_id);
if (!aph || !aph->m_protocol_handler.close())
return false;
CRITICAL_REGION_LOCAL(m_connects_lock);
m_connects.erase(connection_id);
@ -861,8 +785,8 @@ template<class t_connection_context>
bool async_protocol_handler_config<t_connection_context>::update_connection_context(const t_connection_context& contxt)
{
CRITICAL_REGION_LOCAL(m_connects_lock);
async_protocol_handler<t_connection_context>* aph = find_connection(contxt.m_connection_id);
if(0 == aph)
const boost::shared_ptr<levin_endpoint> aph = find_and_lock_connection(contxt.m_connection_id);
if(nullptr == aph)
return false;
aph->update_connection_context(contxt);
return true;
@ -871,11 +795,10 @@ bool async_protocol_handler_config<t_connection_context>::update_connection_cont
template<class t_connection_context>
bool async_protocol_handler_config<t_connection_context>::request_callback(boost::uuids::uuid connection_id)
{
async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph);
if(LEVIN_OK == r)
const std::shared_ptr<levin_endpoint> con = find_and_lock_connection(connection_id);
if(con)
{
aph->request_callback();
con->request_callback();
return true;
}
else

View File

@ -32,6 +32,7 @@
#include <boost/uuid/uuid.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/address_v6.hpp>
#include <memory>
#include <typeinfo>
#include <type_traits>
#include "byte_slice.h"
@ -444,13 +445,30 @@ namespace net_utils
virtual bool call_run_once_service_io()=0;
virtual bool request_callback()=0;
virtual boost::asio::io_service& get_io_service()=0;
//protect from deletion connection object(with protocol instance) during external call "invoke"
virtual bool add_ref()=0;
virtual bool release()=0;
protected:
virtual ~i_service_endpoint() noexcept(false) {}
};
template<typename t_protocol_handler>
struct service_endpoint : i_service_endpoint
{
typedef typename t_protocol_handler::connection_context t_connection_context;
service_endpoint(typename t_protocol_handler::config_type& config)
: i_service_endpoint(), context(), m_protocol_handler(this, config, context)
{}
t_connection_context context;
// TODO what do they mean about wait on destructor?? --rfree :
//this should be the last one, because it could be wait on destructor, while other activities possible on other threads
t_protocol_handler m_protocol_handler;
protected:
virtual ~service_endpoint() noexcept(false)
{}
};
//some helpers

View File

@ -26,6 +26,8 @@
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <boost/uuid/random_generator.hpp>
#include "include_base_utils.h"
#include "file_io_utils.h"
#include "net/net_utils_base.h"
@ -135,19 +137,22 @@ namespace
std::string m_last_in_buf;
};
class test_connection : public epee::net_utils::i_service_endpoint
class test_connection : public epee::net_utils::service_endpoint<test_levin_protocol_handler>, public std::enable_shared_from_this<test_connection>
{
public:
test_connection(boost::asio::io_service& io_service, test_levin_protocol_handler_config& protocol_config)
: m_io_service(io_service)
, m_protocol_handler(this, protocol_config, m_context)
: epee::net_utils::service_endpoint<test_levin_protocol_handler>(protocol_config)
, std::enable_shared_from_this<test_connection>()
, m_io_service(io_service)
, m_send_return(true)
{
}
void start()
{
m_protocol_handler.after_init_connection();
using base_type = epee::net_utils::connection_context_base;
static_cast<base_type&>(context) = base_type{boost::uuids::random_generator{}(), {}, true, false};
m_protocol_handler.m_config.after_init_connection(shared_from_this());
}
// Implement epee::net_utils::i_service_endpoint interface
@ -175,10 +180,6 @@ namespace
bool send_return() const { return m_send_return; }
void send_return(bool v) { m_send_return = v; }
public:
test_levin_connection_context m_context;
test_levin_protocol_handler m_protocol_handler;
private:
boost::asio::io_service& m_io_service;
@ -197,7 +198,7 @@ namespace
const static uint64_t invoke_timeout = 5 * 1000;
const static size_t max_packet_size = 10 * 1024 * 1024;
typedef std::unique_ptr<test_connection> test_connection_ptr;
typedef std::shared_ptr<test_connection> test_connection_ptr;
async_protocol_handler_test():
m_pcommands_handler(new test_levin_commands_handler()),
@ -304,7 +305,7 @@ BEGIN_SIMPLE_FUZZER()
test_levin_protocol_handler_config m_handler_config;
test_levin_commands_handler *m_pcommands_handler = new test_levin_commands_handler();
m_handler_config.set_handler(m_pcommands_handler, [](epee::levin::levin_commands_handler<test_levin_connection_context> *handler) { delete handler; });
std::unique_ptr<test_connection> conn(new test_connection(io_service, m_handler_config));
std::shared_ptr<test_connection> conn(new test_connection(io_service, m_handler_config));
conn->start();
//m_commands_handler.invoke_out_buf(expected_out_data);
//m_commands_handler.return_code(expected_return_code);

View File

@ -52,6 +52,11 @@ namespace
struct test_protocol_handler_config
{
template<typename T>
static constexpr bool after_init_connection(const std::shared_ptr<T>&) noexcept
{
return true;
}
};
struct test_protocol_handler
@ -63,10 +68,6 @@ namespace
{
}
void after_init_connection()
{
}
void handle_qued_callback()
{
}
@ -167,7 +168,7 @@ TEST(test_epee_connection, test_lifetime)
using handler_t = epee::levin::async_protocol_handler<context_t>;
using connection_t = epee::net_utils::connection<handler_t>;
using connection_ptr = boost::shared_ptr<connection_t>;
using connection_ptr = std::shared_ptr<connection_t>;
using shared_state_t = typename connection_t::shared_state;
using shared_state_ptr = std::shared_ptr<shared_state_t>;
using shared_states_t = std::vector<shared_state_ptr>;
@ -181,7 +182,7 @@ TEST(test_epee_connection, test_lifetime)
using server_t = epee::net_utils::boosted_tcp_server<handler_t>;
using lock_t = std::mutex;
using lock_guard_t = std::lock_guard<lock_t>;
using connection_weak_ptr = boost::weak_ptr<connection_t>;
using connection_weak_ptr = std::weak_ptr<connection_t>;
struct shared_conn_t {
lock_t lock;
connection_weak_ptr conn;
@ -225,7 +226,7 @@ TEST(test_epee_connection, test_lifetime)
auto create_connection = [&io_context, &endpoint, &shared_state] {
connection_ptr conn(new connection_t(io_context, shared_state, {}, {}));
conn->socket().connect(endpoint);
conn->start({}, {});
EXPECT_TRUE(conn->start({}, {}));
context_t context;
conn->get_context(context);
auto tag = context.m_connection_id;
@ -585,26 +586,38 @@ TEST(test_epee_connection, ssl_handshake)
workers.back().join();
}
TEST(boosted_tcp_server, strand_deadlock)
namespace
{
using context_t = epee::net_utils::connection_context_base;
using lock_t = std::mutex;
using unique_lock_t = std::unique_lock<lock_t>;
struct config_t {
using condition_t = std::condition_variable_any;
using lock_guard_t = std::lock_guard<lock_t>;
using lock_guard_t = std::lock_guard<std::mutex>;
void notify_success()
{
lock_guard_t guard(lock);
success = true;
condition.notify_all();
}
lock_t lock;
template<typename T>
static bool after_init_connection(const std::shared_ptr<T>& conn)
{
if (!conn)
return false;
conn->m_protocol_handler.after_init_connection();
return true;
}
std::mutex lock;
condition_t condition;
bool success;
};
}
TEST(boosted_tcp_server, strand_deadlock)
{
using context_t = epee::net_utils::connection_context_base;
using lock_t = std::mutex;
using unique_lock_t = std::unique_lock<lock_t>;
struct handler_t {
using config_type = config_t;

View File

@ -30,6 +30,7 @@
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
#include <boost/uuid/random_generator.hpp>
#include "gtest/gtest.h"
@ -126,19 +127,22 @@ namespace
std::string m_last_in_buf;
};
class test_connection : public epee::net_utils::i_service_endpoint
class test_connection : public epee::net_utils::service_endpoint<test_levin_protocol_handler>, public std::enable_shared_from_this<test_connection>
{
public:
test_connection(boost::asio::io_service& io_service, test_levin_protocol_handler_config& protocol_config)
: m_io_service(io_service)
, m_protocol_handler(this, protocol_config, m_context)
: epee::net_utils::service_endpoint<test_levin_protocol_handler>(protocol_config)
, std::enable_shared_from_this<test_connection>()
, m_io_service(io_service)
, m_send_return(true)
{
}
void start()
{
ASSERT_TRUE(m_protocol_handler.after_init_connection());
using base_type = epee::net_utils::connection_context_base;
static_cast<base_type&>(context) = base_type{boost::uuids::random_generator{}(), {}, true, false};
ASSERT_TRUE(m_protocol_handler.m_config.after_init_connection(shared_from_this()));
}
// Implement epee::net_utils::i_service_endpoint interface
@ -156,8 +160,6 @@ namespace
virtual bool call_run_once_service_io() { std::cout << "test_connection::call_run_once_service_io()" << std::endl; return true; }
virtual bool request_callback() { std::cout << "test_connection::request_callback()" << std::endl; return true; }
virtual boost::asio::io_service& get_io_service() { std::cout << "test_connection::get_io_service()" << std::endl; return m_io_service; }
virtual bool add_ref() { std::cout << "test_connection::add_ref()" << std::endl; return true; }
virtual bool release() { std::cout << "test_connection::release()" << std::endl; return true; }
size_t send_counter() const { return m_send_counter.get(); }
@ -167,12 +169,8 @@ namespace
bool send_return() const { return m_send_return; }
void send_return(bool v) { m_send_return = v; }
public:
test_levin_protocol_handler m_protocol_handler;
private:
boost::asio::io_service& m_io_service;
test_levin_connection_context m_context;
unit_test::call_counter m_send_counter;
boost::mutex m_mutex;
@ -188,7 +186,7 @@ namespace
const static uint64_t invoke_timeout = 5 * 1000;
const static size_t max_packet_size = 10 * 1024 * 1024;
typedef std::unique_ptr<test_connection> test_connection_ptr;
typedef std::shared_ptr<test_connection> test_connection_ptr;
async_protocol_handler_test():
m_pcommands_handler(new test_levin_commands_handler()),

View File

@ -27,6 +27,7 @@
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <algorithm>
#include <boost/iterator/indirect_iterator.hpp>
#include <boost/uuid/nil_generator.hpp>
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid.hpp>
@ -52,70 +53,6 @@
namespace
{
class test_endpoint final : public epee::net_utils::i_service_endpoint
{
boost::asio::io_service& io_service_;
std::size_t ref_count_;
virtual bool do_send(epee::byte_slice message) override final
{
send_queue_.push_back(std::move(message));
return true;
}
virtual bool close() override final
{
return true;
}
virtual bool send_done() override final
{
throw std::logic_error{"send_done not implemented"};
}
virtual bool call_run_once_service_io() override final
{
return io_service_.run_one();
}
virtual bool request_callback() override final
{
throw std::logic_error{"request_callback not implemented"};
}
virtual boost::asio::io_service& get_io_service() override final
{
return io_service_;
}
virtual bool add_ref() override final
{
++ref_count_;
return true;
}
virtual bool release() override final
{
--ref_count_;
return true;
}
public:
test_endpoint(boost::asio::io_service& io_service)
: epee::net_utils::i_service_endpoint(),
io_service_(io_service),
ref_count_(0),
send_queue_()
{}
virtual ~test_endpoint() noexcept(false) override final
{
EXPECT_EQ(0u, ref_count_);
}
std::deque<epee::byte_slice> send_queue_;
};
class test_core_events final : public cryptonote::i_core_events
{
std::map<cryptonote::relay_method, std::vector<cryptonote::blobdata>> relayed_;
@ -164,43 +101,78 @@ namespace
}
};
class test_connection
class test_connection final : public epee::net_utils::service_endpoint<epee::levin::async_protocol_handler<cryptonote::levin::detail::p2p_context>>
{
test_endpoint endpoint_;
cryptonote::levin::detail::p2p_context context_;
epee::levin::async_protocol_handler<cryptonote::levin::detail::p2p_context> handler_;
boost::asio::io_service& io_service_;
std::deque<epee::byte_slice> send_queue_;
virtual bool do_send(epee::byte_slice message) override final
{
send_queue_.push_back(std::move(message));
return true;
}
virtual bool close() override final
{
return true;
}
virtual bool send_done() override final
{
throw std::logic_error{"send_done not implemented"};
}
virtual bool call_run_once_service_io() override final
{
return io_service_.run_one();
}
virtual bool request_callback() override final
{
throw std::logic_error{"request_callback not implemented"};
}
virtual boost::asio::io_service& get_io_service() override final
{
return io_service_;
}
public:
test_connection(boost::asio::io_service& io_service, cryptonote::levin::connections& connections, boost::uuids::random_generator& random_generator, const bool is_incoming)
: endpoint_(io_service),
context_(),
handler_(std::addressof(endpoint_), connections, context_)
: epee::net_utils::service_endpoint<epee::levin::async_protocol_handler<cryptonote::levin::detail::p2p_context>>(connections),
io_service_(io_service),
send_queue_()
{
using base_type = epee::net_utils::connection_context_base;
static_cast<base_type&>(context_) = base_type{random_generator(), {}, is_incoming, false};
context_.m_state = cryptonote::cryptonote_connection_context::state_normal;
handler_.after_init_connection();
static_cast<base_type&>(context) = base_type{random_generator(), {}, is_incoming, false};
context.m_state = cryptonote::cryptonote_connection_context::state_normal;
}
virtual ~test_connection() noexcept override final
try {}
catch (...)
{}
//\return Number of messages processed
std::size_t process_send_queue(const bool valid = true)
{
std::size_t count = 0;
for ( ; !endpoint_.send_queue_.empty(); ++count, endpoint_.send_queue_.pop_front())
for ( ; !send_queue_.empty(); ++count, send_queue_.pop_front())
{
EXPECT_EQ(valid, handler_.handle_recv(endpoint_.send_queue_.front().data(), endpoint_.send_queue_.front().size()));
// invalid messages shoudn't be possible in this test;
EXPECT_EQ(valid, m_protocol_handler.handle_recv(send_queue_.front().data(), send_queue_.front().size()));
}
return count;
}
const boost::uuids::uuid& get_id() const noexcept
{
return context_.m_connection_id;
return context.m_connection_id;
}
bool is_incoming() const noexcept
{
return context_.m_is_income;
return context.m_is_income;
}
};
@ -346,8 +318,9 @@ namespace
void add_connection(const bool is_incoming)
{
contexts_.emplace_back(io_service_, *connections_, random_generator_, is_incoming);
EXPECT_TRUE(connection_ids_.emplace(contexts_.back().get_id()).second);
contexts_.emplace_back(std::make_shared<test_connection>(io_service_, *connections_, random_generator_, is_incoming));
connections_->after_init_connection(contexts_.back());
EXPECT_TRUE(connection_ids_.emplace(contexts_.back()->get_id()).second);
EXPECT_EQ(connection_ids_.size(), connections_->get_connections_count());
}
@ -366,7 +339,7 @@ namespace
boost::uuids::random_generator random_generator_;
boost::asio::io_service io_service_;
test_receiver receiver_;
std::deque<test_connection> contexts_;
std::deque<std::shared_ptr<test_connection>> contexts_;
test_core_events events_;
};
}
@ -621,7 +594,7 @@ TEST_F(levin_notify, fluff_without_padding)
ASSERT_EQ(10u, contexts_.size());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::fluff));
io_service_.reset();
@ -630,7 +603,7 @@ TEST_F(levin_notify, fluff_without_padding)
ASSERT_LT(0u, io_service_.poll());
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
EXPECT_EQ(1u, context->process_send_queue());
EXPECT_EQ(txs, events_.take_relayed(cryptonote::relay_method::fluff));
@ -674,7 +647,7 @@ TEST_F(levin_notify, stem_without_padding)
bool has_fluffed = false;
while (!has_stemmed || !has_fluffed)
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::stem));
io_service_.reset();
@ -690,12 +663,12 @@ TEST_F(levin_notify, stem_without_padding)
std::size_t send_count = 0;
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
const std::size_t sent = context->process_send_queue();
if (sent && is_stem)
{
EXPECT_EQ(1u, (context - contexts_.begin()) % 2);
EXPECT_EQ(1u, (context.base() - contexts_.begin()) % 2);
}
send_count += sent;
}
@ -744,7 +717,7 @@ TEST_F(levin_notify, stem_no_outs_without_padding)
ASSERT_EQ(10u, contexts_.size());
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::stem));
io_service_.reset();
@ -760,7 +733,7 @@ TEST_F(levin_notify, stem_no_outs_without_padding)
std::size_t send_count = 0;
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
send_count += context->process_send_queue();
}
@ -811,9 +784,15 @@ TEST_F(levin_notify, local_without_padding)
bool has_fluffed = false;
while (!has_stemmed || !has_fluffed)
{
/*<<<<<<< HEAD
// run their "their" txes first
auto context = contexts_.begin();
EXPECT_TRUE(notifier.send_txs(their_txs, context->get_id(), cryptonote::relay_method::stem));
======= */
// run their "their" txes first
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(their_txs, context->get_id(), cryptonote::relay_method::stem));
//>>>>>>> 4538bc0ed (Change p2p connection map from raw pointers to weak_ptrs)
io_service_.reset();
ASSERT_LT(0u, io_service_.poll());
@ -828,12 +807,12 @@ TEST_F(levin_notify, local_without_padding)
std::size_t send_count = 0;
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
const std::size_t sent = context->process_send_queue();
if (sent && is_stem)
{
EXPECT_EQ(1u, (context - contexts_.begin()) % 2);
EXPECT_EQ(1u, (context.base() - contexts_.begin()) % 2);
}
send_count += sent;
}
@ -862,12 +841,12 @@ TEST_F(levin_notify, local_without_padding)
send_count = 0;
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
const std::size_t sent = context->process_send_queue();
if (sent)
{
EXPECT_EQ(1u, (context - contexts_.begin()) % 2);
EXPECT_EQ(1u, (context.base() - contexts_.begin()) % 2);
}
send_count += sent;
}
@ -913,7 +892,7 @@ TEST_F(levin_notify, forward_without_padding)
bool has_fluffed = false;
while (!has_stemmed || !has_fluffed)
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::forward));
io_service_.reset();
@ -929,12 +908,12 @@ TEST_F(levin_notify, forward_without_padding)
std::size_t send_count = 0;
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
const std::size_t sent = context->process_send_queue();
if (sent && is_stem)
{
EXPECT_EQ(1u, (context - contexts_.begin()) % 2);
EXPECT_EQ(1u, (context.base() - contexts_.begin()) % 2);
}
send_count += sent;
}
@ -980,7 +959,7 @@ TEST_F(levin_notify, block_without_padding)
ASSERT_EQ(10u, contexts_.size());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_FALSE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::block));
io_service_.reset();
@ -1010,7 +989,7 @@ TEST_F(levin_notify, none_without_padding)
ASSERT_EQ(10u, contexts_.size());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_FALSE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::none));
io_service_.reset();
@ -1040,7 +1019,7 @@ TEST_F(levin_notify, fluff_with_padding)
ASSERT_EQ(10u, contexts_.size());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::fluff));
io_service_.reset();
@ -1051,7 +1030,7 @@ TEST_F(levin_notify, fluff_with_padding)
EXPECT_EQ(txs, events_.take_relayed(cryptonote::relay_method::fluff));
std::sort(txs.begin(), txs.end());
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
EXPECT_EQ(1u, context->process_send_queue());
ASSERT_EQ(9u, receiver_.notified_size());
@ -1090,7 +1069,7 @@ TEST_F(levin_notify, stem_with_padding)
bool has_fluffed = false;
while (!has_stemmed || !has_fluffed)
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::stem));
io_service_.reset();
@ -1106,12 +1085,12 @@ TEST_F(levin_notify, stem_with_padding)
std::size_t send_count = 0;
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
const std::size_t sent = context->process_send_queue();
if (sent && is_stem)
{
EXPECT_EQ(1u, (context - contexts_.begin()) % 2);
EXPECT_EQ(1u, (context.base() - contexts_.begin()) % 2);
EXPECT_FALSE(context->is_incoming());
}
send_count += sent;
@ -1158,7 +1137,7 @@ TEST_F(levin_notify, stem_no_outs_with_padding)
ASSERT_EQ(10u, contexts_.size());
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::stem));
io_service_.reset();
@ -1174,7 +1153,7 @@ TEST_F(levin_notify, stem_no_outs_with_padding)
std::size_t send_count = 0;
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
send_count += context->process_send_queue();
}
@ -1219,9 +1198,15 @@ TEST_F(levin_notify, local_with_padding)
bool has_fluffed = false;
while (!has_stemmed || !has_fluffed)
{
/* <<<<<<< HEAD
// run their "their" txes first
auto context = contexts_.begin();
EXPECT_TRUE(notifier.send_txs(their_txs, context->get_id(), cryptonote::relay_method::stem));
======= */
// run their "their" txes first
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(their_txs, context->get_id(), cryptonote::relay_method::stem));
//>>>>>>> 4538bc0ed (Change p2p connection map from raw pointers to weak_ptrs)
io_service_.reset();
ASSERT_LT(0u, io_service_.poll());
@ -1236,12 +1221,12 @@ TEST_F(levin_notify, local_with_padding)
std::size_t send_count = 0;
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
const std::size_t sent = context->process_send_queue();
if (sent && is_stem)
{
EXPECT_EQ(1u, (context - contexts_.begin()) % 2);
EXPECT_EQ(1u, (context.base() - contexts_.begin()) % 2);
EXPECT_FALSE(context->is_incoming());
}
send_count += sent;
@ -1268,12 +1253,12 @@ TEST_F(levin_notify, local_with_padding)
send_count = 0;
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
const std::size_t sent = context->process_send_queue();
if (sent)
{
EXPECT_EQ(1u, (context - contexts_.begin()) % 2);
EXPECT_EQ(1u, (context.base() - contexts_.begin()) % 2);
}
send_count += sent;
}
@ -1316,7 +1301,7 @@ TEST_F(levin_notify, forward_with_padding)
bool has_fluffed = false;
while (!has_stemmed || !has_fluffed)
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::forward));
io_service_.reset();
@ -1332,12 +1317,12 @@ TEST_F(levin_notify, forward_with_padding)
std::size_t send_count = 0;
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
const std::size_t sent = context->process_send_queue();
if (sent && is_stem)
{
EXPECT_EQ(1u, (context - contexts_.begin()) % 2);
EXPECT_EQ(1u, (context.base() - contexts_.begin()) % 2);
EXPECT_FALSE(context->is_incoming());
}
send_count += sent;
@ -1381,7 +1366,7 @@ TEST_F(levin_notify, block_with_padding)
ASSERT_EQ(10u, contexts_.size());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_FALSE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::block));
io_service_.reset();
@ -1411,7 +1396,7 @@ TEST_F(levin_notify, none_with_padding)
ASSERT_EQ(10u, contexts_.size());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_FALSE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::none));
io_service_.reset();
@ -1441,7 +1426,7 @@ TEST_F(levin_notify, private_fluff_without_padding)
ASSERT_EQ(10u, contexts_.size());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::fluff));
io_service_.reset();
@ -1453,9 +1438,9 @@ TEST_F(levin_notify, private_fluff_without_padding)
EXPECT_EQ(txs, events_.take_relayed(cryptonote::relay_method::fluff));
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
const bool is_incoming = ((context - contexts_.begin()) % 2 == 0);
const bool is_incoming = ((context.base() - contexts_.begin()) % 2 == 0);
EXPECT_EQ(is_incoming ? 0u : 1u, context->process_send_queue());
}
@ -1493,7 +1478,7 @@ TEST_F(levin_notify, private_stem_without_padding)
ASSERT_EQ(10u, contexts_.size());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::stem));
io_service_.reset();
@ -1505,9 +1490,9 @@ TEST_F(levin_notify, private_stem_without_padding)
EXPECT_EQ(txs, events_.take_relayed(cryptonote::relay_method::stem));
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
const bool is_incoming = ((context - contexts_.begin()) % 2 == 0);
const bool is_incoming = ((context.base() - contexts_.begin()) % 2 == 0);
EXPECT_EQ(is_incoming ? 0u : 1u, context->process_send_queue());
}
@ -1545,7 +1530,7 @@ TEST_F(levin_notify, private_local_without_padding)
ASSERT_EQ(10u, contexts_.size());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::local));
io_service_.reset();
@ -1557,9 +1542,9 @@ TEST_F(levin_notify, private_local_without_padding)
EXPECT_EQ(txs, events_.take_relayed(cryptonote::relay_method::local));
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
const bool is_incoming = ((context - contexts_.begin()) % 2 == 0);
const bool is_incoming = ((context.base() - contexts_.begin()) % 2 == 0);
EXPECT_EQ(is_incoming ? 0u : 1u, context->process_send_queue());
}
@ -1597,7 +1582,7 @@ TEST_F(levin_notify, private_forward_without_padding)
ASSERT_EQ(10u, contexts_.size());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::forward));
io_service_.reset();
@ -1609,9 +1594,9 @@ TEST_F(levin_notify, private_forward_without_padding)
EXPECT_EQ(txs, events_.take_relayed(cryptonote::relay_method::forward));
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
const bool is_incoming = ((context - contexts_.begin()) % 2 == 0);
const bool is_incoming = ((context.base() - contexts_.begin()) % 2 == 0);
EXPECT_EQ(is_incoming ? 0u : 1u, context->process_send_queue());
}
@ -1649,7 +1634,7 @@ TEST_F(levin_notify, private_block_without_padding)
ASSERT_EQ(10u, contexts_.size());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_FALSE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::block));
io_service_.reset();
@ -1680,7 +1665,7 @@ TEST_F(levin_notify, private_none_without_padding)
ASSERT_EQ(10u, contexts_.size());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_FALSE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::none));
io_service_.reset();
@ -1710,7 +1695,7 @@ TEST_F(levin_notify, private_fluff_with_padding)
ASSERT_EQ(10u, contexts_.size());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::fluff));
io_service_.reset();
@ -1722,9 +1707,9 @@ TEST_F(levin_notify, private_fluff_with_padding)
EXPECT_EQ(txs, events_.take_relayed(cryptonote::relay_method::fluff));
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
const bool is_incoming = ((context - contexts_.begin()) % 2 == 0);
const bool is_incoming = ((context.base() - contexts_.begin()) % 2 == 0);
EXPECT_EQ(is_incoming ? 0u : 1u, context->process_send_queue());
}
@ -1761,7 +1746,7 @@ TEST_F(levin_notify, private_stem_with_padding)
ASSERT_EQ(10u, contexts_.size());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::stem));
io_service_.reset();
@ -1773,9 +1758,9 @@ TEST_F(levin_notify, private_stem_with_padding)
EXPECT_EQ(txs, events_.take_relayed(cryptonote::relay_method::stem));
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
const bool is_incoming = ((context - contexts_.begin()) % 2 == 0);
const bool is_incoming = ((context.base() - contexts_.begin()) % 2 == 0);
EXPECT_EQ(is_incoming ? 0u : 1u, context->process_send_queue());
}
@ -1812,7 +1797,7 @@ TEST_F(levin_notify, private_local_with_padding)
ASSERT_EQ(10u, contexts_.size());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::local));
io_service_.reset();
@ -1824,9 +1809,9 @@ TEST_F(levin_notify, private_local_with_padding)
EXPECT_EQ(txs, events_.take_relayed(cryptonote::relay_method::local));
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
const bool is_incoming = ((context - contexts_.begin()) % 2 == 0);
const bool is_incoming = ((context.base() - contexts_.begin()) % 2 == 0);
EXPECT_EQ(is_incoming ? 0u : 1u, context->process_send_queue());
}
@ -1863,7 +1848,7 @@ TEST_F(levin_notify, private_forward_with_padding)
ASSERT_EQ(10u, contexts_.size());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::forward));
io_service_.reset();
@ -1875,9 +1860,9 @@ TEST_F(levin_notify, private_forward_with_padding)
EXPECT_EQ(txs, events_.take_relayed(cryptonote::relay_method::forward));
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
const bool is_incoming = ((context - contexts_.begin()) % 2 == 0);
const bool is_incoming = ((context.base() - contexts_.begin()) % 2 == 0);
EXPECT_EQ(is_incoming ? 0u : 1u, context->process_send_queue());
}
@ -1914,7 +1899,7 @@ TEST_F(levin_notify, private_block_with_padding)
ASSERT_EQ(10u, contexts_.size());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_FALSE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::block));
io_service_.reset();
@ -1944,7 +1929,7 @@ TEST_F(levin_notify, private_none_with_padding)
ASSERT_EQ(10u, contexts_.size());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_FALSE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::none));
io_service_.reset();
@ -1977,7 +1962,7 @@ TEST_F(levin_notify, stem_mappings)
ASSERT_EQ(test_connections_count, contexts_.size());
for (;;)
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::stem));
io_service_.reset();
@ -1991,7 +1976,7 @@ TEST_F(levin_notify, stem_mappings)
ASSERT_LT(0u, io_service_.poll());
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
EXPECT_EQ(1u, context->process_send_queue());
ASSERT_EQ(test_connections_count - 1, receiver_.notified_size());
@ -2013,15 +1998,15 @@ TEST_F(levin_notify, stem_mappings)
std::map<boost::uuids::uuid, boost::uuids::uuid> mappings;
{
std::size_t send_count = 0;
for (auto context = contexts_.begin(); context != contexts_.end(); ++context)
for (auto context = boost::make_indirect_iterator(contexts_.begin()); context.base() != contexts_.end(); ++context)
{
const std::size_t sent = context->process_send_queue();
if (sent)
{
EXPECT_EQ(1u, (context - contexts_.begin()) % 2);
EXPECT_EQ(1u, (context.base() - contexts_.begin()) % 2);
EXPECT_FALSE(context->is_incoming());
used.insert(context->get_id());
mappings[contexts_.front().get_id()] = context->get_id();
mappings[contexts_.front()->get_id()] = context->get_id();
}
send_count += sent;
}
@ -2040,23 +2025,23 @@ TEST_F(levin_notify, stem_mappings)
for (unsigned i = 0; i < contexts_.size() * 2; i += 2)
{
auto& incoming = contexts_[i % contexts_.size()];
EXPECT_TRUE(notifier.send_txs(txs, incoming.get_id(), cryptonote::relay_method::stem));
EXPECT_TRUE(notifier.send_txs(txs, incoming->get_id(), cryptonote::relay_method::stem));
io_service_.reset();
ASSERT_LT(0u, io_service_.poll());
EXPECT_EQ(txs, events_.take_relayed(cryptonote::relay_method::stem));
std::size_t send_count = 0;
for (auto context = contexts_.begin(); context != contexts_.end(); ++context)
for (auto context = boost::make_indirect_iterator(contexts_.begin()); context.base() != contexts_.end(); ++context)
{
const std::size_t sent = context->process_send_queue();
if (sent)
{
EXPECT_EQ(1u, (context - contexts_.begin()) % 2);
EXPECT_EQ(1u, (context.base() - contexts_.begin()) % 2);
EXPECT_FALSE(context->is_incoming());
used.insert(context->get_id());
auto inserted = mappings.emplace(incoming.get_id(), context->get_id()).first;
auto inserted = mappings.emplace(incoming->get_id(), context->get_id()).first;
EXPECT_EQ(inserted->second, context->get_id()) << "incoming index " << i;
}
send_count += sent;
@ -2101,7 +2086,7 @@ TEST_F(levin_notify, fluff_multiple)
ASSERT_EQ(test_connections_count, contexts_.size());
for (;;)
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), cryptonote::relay_method::stem));
io_service_.reset();
@ -2113,12 +2098,12 @@ TEST_F(levin_notify, fluff_multiple)
std::size_t send_count = 0;
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
{
const std::size_t sent = context->process_send_queue();
if (sent)
{
EXPECT_EQ(1u, (context - contexts_.begin()) % 2);
EXPECT_EQ(1u, (context.base() - contexts_.begin()) % 2);
EXPECT_FALSE(context->is_incoming());
}
send_count += sent;
@ -2143,9 +2128,9 @@ TEST_F(levin_notify, fluff_multiple)
io_service_.reset();
ASSERT_LT(0u, io_service_.poll());
{
auto context = contexts_.begin();
auto context = boost::make_indirect_iterator(contexts_.begin());
EXPECT_EQ(0u, context->process_send_queue());
for (++context; context != contexts_.end(); ++context)
for (++context; context.base() != contexts_.end(); ++context)
EXPECT_EQ(1u, context->process_send_queue());
ASSERT_EQ(contexts_.size() - 1, receiver_.notified_size());
@ -2161,7 +2146,7 @@ TEST_F(levin_notify, fluff_multiple)
for (unsigned i = 0; i < contexts_.size() * 2; i += 2)
{
auto& incoming = contexts_[i % contexts_.size()];
EXPECT_TRUE(notifier.send_txs(txs, incoming.get_id(), cryptonote::relay_method::stem));
EXPECT_TRUE(notifier.send_txs(txs, incoming->get_id(), cryptonote::relay_method::stem));
io_service_.reset();
ASSERT_LT(0u, io_service_.poll());
@ -2174,9 +2159,9 @@ TEST_F(levin_notify, fluff_multiple)
for (auto& context : contexts_)
{
if (std::addressof(incoming) == std::addressof(context))
EXPECT_EQ(0u, context.process_send_queue());
EXPECT_EQ(0u, context->process_send_queue());
else
EXPECT_EQ(1u, context.process_send_queue());
EXPECT_EQ(1u, context->process_send_queue());
}
ASSERT_EQ(contexts_.size() - 1, receiver_.notified_size());
@ -2220,7 +2205,7 @@ TEST_F(levin_notify, noise)
{
std::size_t sent = 0;
for (auto& context : contexts_)
sent += context.process_send_queue();
sent += context->process_send_queue();
EXPECT_EQ(2u, sent);
EXPECT_EQ(0u, receiver_.notified_size());
@ -2235,7 +2220,7 @@ TEST_F(levin_notify, noise)
{
std::size_t sent = 0;
for (auto& context : contexts_)
sent += context.process_send_queue();
sent += context->process_send_queue();
ASSERT_EQ(2u, sent);
while (sent--)
@ -2257,7 +2242,7 @@ TEST_F(levin_notify, noise)
{
std::size_t sent = 0;
for (auto& context : contexts_)
sent += context.process_send_queue();
sent += context->process_send_queue();
EXPECT_EQ(2u, sent);
EXPECT_EQ(0u, receiver_.notified_size());
@ -2269,7 +2254,7 @@ TEST_F(levin_notify, noise)
{
std::size_t sent = 0;
for (auto& context : contexts_)
sent += context.process_send_queue();
sent += context->process_send_queue();
ASSERT_EQ(2u, sent);
while (sent--)
@ -2312,7 +2297,7 @@ TEST_F(levin_notify, noise_stem)
{
std::size_t sent = 0;
for (auto& context : contexts_)
sent += context.process_send_queue();
sent += context->process_send_queue();
EXPECT_EQ(2u, sent);
EXPECT_EQ(0u, receiver_.notified_size());
@ -2328,7 +2313,7 @@ TEST_F(levin_notify, noise_stem)
{
std::size_t sent = 0;
for (auto& context : contexts_)
sent += context.process_send_queue();
sent += context->process_send_queue();
ASSERT_EQ(2u, sent);
while (sent--)
@ -2355,13 +2340,13 @@ TEST_F(levin_notify, command_max_bytes)
bytes = dest.finalize_notify(ping_command);
}
EXPECT_EQ(1, get_connections().send(bytes.clone(), contexts_.front().get_id()));
EXPECT_EQ(1u, contexts_.front().process_send_queue(true));
EXPECT_EQ(1, get_connections().send(bytes.clone(), contexts_.front()->get_id()));
EXPECT_EQ(1u, contexts_.front()->process_send_queue(true));
EXPECT_EQ(1u, receiver_.notified_size());
const received_message msg = receiver_.get_raw_notification();
EXPECT_EQ(ping_command, msg.command);
EXPECT_EQ(contexts_.front().get_id(), msg.connection);
EXPECT_EQ(contexts_.front()->get_id(), msg.connection);
EXPECT_EQ(payload, msg.payload);
{
@ -2371,7 +2356,7 @@ TEST_F(levin_notify, command_max_bytes)
bytes = dest.finalize_notify(ping_command);
}
EXPECT_EQ(1, get_connections().send(std::move(bytes), contexts_.front().get_id()));
EXPECT_EQ(1u, contexts_.front().process_send_queue(false));
EXPECT_EQ(1, get_connections().send(std::move(bytes), contexts_.front()->get_id()));
EXPECT_EQ(1u, contexts_.front()->process_send_queue(false));
EXPECT_EQ(0u, receiver_.notified_size());
}

View File

@ -337,7 +337,7 @@ TEST(cryptonote_protocol_handler, race_condition)
using context_t = contexts::p2p;
using handler_t = epee::levin::async_protocol_handler<context_t>;
using connection_t = epee::net_utils::connection<handler_t>;
using connection_ptr = boost::shared_ptr<connection_t>;
using connection_ptr = std::shared_ptr<connection_t>;
using connections_t = std::vector<connection_ptr>;
using shared_state_t = typename connection_t::shared_state;
using shared_state_ptr = std::shared_ptr<shared_state_t>;
@ -1045,7 +1045,7 @@ TEST(node_server, race_condition)
};
using handler_t = epee::levin::async_protocol_handler<context_t>;
using connection_t = epee::net_utils::connection<handler_t>;
using connection_ptr = boost::shared_ptr<connection_t>;
using connection_ptr = std::shared_ptr<connection_t>;
using shared_state_t = typename connection_t::shared_state;
using shared_state_ptr = std::shared_ptr<shared_state_t>;
using io_context_t = boost::asio::io_service;