wallet2: parse blocks in the RPC thread, not the processing thread
Processing typically is the bottleneck
This commit is contained in:
parent
31a895e876
commit
ba8331ce41
|
@ -1720,39 +1720,23 @@ void wallet2::pull_hashes(uint64_t start_height, uint64_t &blocks_start_height,
|
||||||
hashes = std::move(res.m_block_ids);
|
hashes = std::move(res.m_block_ids);
|
||||||
}
|
}
|
||||||
//----------------------------------------------------------------------------------------------------
|
//----------------------------------------------------------------------------------------------------
|
||||||
void wallet2::process_blocks(uint64_t start_height, const std::vector<cryptonote::block_complete_entry> &blocks, const std::vector<cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices> &o_indices, uint64_t& blocks_added)
|
void wallet2::process_parsed_blocks(uint64_t start_height, const std::vector<cryptonote::block_complete_entry> &blocks, const std::vector<parsed_block> &parsed_blocks, uint64_t& blocks_added)
|
||||||
{
|
{
|
||||||
size_t current_index = start_height;
|
size_t current_index = start_height;
|
||||||
blocks_added = 0;
|
blocks_added = 0;
|
||||||
size_t tx_o_indices_idx = 0;
|
size_t tx_o_indices_idx = 0;
|
||||||
|
|
||||||
THROW_WALLET_EXCEPTION_IF(blocks.size() != o_indices.size(), error::wallet_internal_error, "size mismatch");
|
THROW_WALLET_EXCEPTION_IF(blocks.size() != parsed_blocks.size(), error::wallet_internal_error, "size mismatch");
|
||||||
THROW_WALLET_EXCEPTION_IF(!m_blockchain.is_in_bounds(current_index), error::wallet_internal_error, "Index out of bounds of hashchain");
|
THROW_WALLET_EXCEPTION_IF(!m_blockchain.is_in_bounds(current_index), error::wallet_internal_error, "Index out of bounds of hashchain");
|
||||||
|
|
||||||
std::vector<crypto::hash> round_block_hashes(blocks.size());
|
|
||||||
std::vector<cryptonote::block> round_blocks(blocks.size());
|
|
||||||
std::deque<bool> error(blocks.size());
|
|
||||||
|
|
||||||
tools::threadpool& tpool = tools::threadpool::getInstance();
|
|
||||||
tools::threadpool::waiter waiter;
|
|
||||||
for (size_t i = 0; i < blocks.size(); ++i)
|
for (size_t i = 0; i < blocks.size(); ++i)
|
||||||
{
|
{
|
||||||
tpool.submit(&waiter, boost::bind(&wallet2::parse_block_round, this, std::cref(blocks[i].block),
|
const crypto::hash &bl_id = parsed_blocks[i].hash;
|
||||||
std::ref(round_blocks[i]), std::ref(round_block_hashes[i]), std::ref(error[i])));
|
const cryptonote::block &bl = parsed_blocks[i].block;
|
||||||
}
|
|
||||||
waiter.wait();
|
|
||||||
for (size_t i = 0; i < blocks.size(); ++i)
|
|
||||||
{
|
|
||||||
THROW_WALLET_EXCEPTION_IF(error[i], error::block_parse_error, blocks[i].block);
|
|
||||||
}
|
|
||||||
for (size_t i = 0; i < blocks.size(); ++i)
|
|
||||||
{
|
|
||||||
const crypto::hash &bl_id = round_block_hashes[i];
|
|
||||||
cryptonote::block &bl = round_blocks[i];
|
|
||||||
|
|
||||||
if(current_index >= m_blockchain.size())
|
if(current_index >= m_blockchain.size())
|
||||||
{
|
{
|
||||||
process_new_blockchain_entry(bl, blocks[i], bl_id, current_index, o_indices[i]);
|
process_new_blockchain_entry(bl, blocks[i], bl_id, current_index, parsed_blocks[i].o_indices);
|
||||||
++blocks_added;
|
++blocks_added;
|
||||||
}
|
}
|
||||||
else if(bl_id != m_blockchain[current_index])
|
else if(bl_id != m_blockchain[current_index])
|
||||||
|
@ -1764,7 +1748,7 @@ void wallet2::process_blocks(uint64_t start_height, const std::vector<cryptonote
|
||||||
string_tools::pod_to_hex(m_blockchain[current_index]));
|
string_tools::pod_to_hex(m_blockchain[current_index]));
|
||||||
|
|
||||||
detach_blockchain(current_index);
|
detach_blockchain(current_index);
|
||||||
process_new_blockchain_entry(bl, blocks[i], bl_id, current_index, o_indices[i]);
|
process_new_blockchain_entry(bl, blocks[i], bl_id, current_index, parsed_blocks[i].o_indices);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -1786,7 +1770,7 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched)
|
||||||
refresh(start_height, blocks_fetched, received_money);
|
refresh(start_height, blocks_fetched, received_money);
|
||||||
}
|
}
|
||||||
//----------------------------------------------------------------------------------------------------
|
//----------------------------------------------------------------------------------------------------
|
||||||
void wallet2::pull_next_blocks(uint64_t start_height, uint64_t &blocks_start_height, std::list<crypto::hash> &short_chain_history, const std::vector<cryptonote::block_complete_entry> &prev_blocks, std::vector<cryptonote::block_complete_entry> &blocks, std::vector<cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices> &o_indices, bool &error)
|
void wallet2::pull_and_parse_next_blocks(uint64_t start_height, uint64_t &blocks_start_height, std::list<crypto::hash> &short_chain_history, const std::vector<cryptonote::block_complete_entry> &prev_blocks, std::vector<cryptonote::block_complete_entry> &blocks, std::vector<parsed_block> &parsed_blocks, bool &error)
|
||||||
{
|
{
|
||||||
error = false;
|
error = false;
|
||||||
|
|
||||||
|
@ -1806,7 +1790,28 @@ void wallet2::pull_next_blocks(uint64_t start_height, uint64_t &blocks_start_hei
|
||||||
}
|
}
|
||||||
|
|
||||||
// pull the new blocks
|
// pull the new blocks
|
||||||
|
std::vector<cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices> o_indices;
|
||||||
pull_blocks(start_height, blocks_start_height, short_chain_history, blocks, o_indices);
|
pull_blocks(start_height, blocks_start_height, short_chain_history, blocks, o_indices);
|
||||||
|
THROW_WALLET_EXCEPTION_IF(blocks.size() != o_indices.size(), error::wallet_internal_error, "Mismatched sizes of blocks and o_indices");
|
||||||
|
|
||||||
|
tools::threadpool& tpool = tools::threadpool::getInstance();
|
||||||
|
tools::threadpool::waiter waiter;
|
||||||
|
parsed_blocks.resize(blocks.size());
|
||||||
|
for (size_t i = 0; i < blocks.size(); ++i)
|
||||||
|
{
|
||||||
|
tpool.submit(&waiter, boost::bind(&wallet2::parse_block_round, this, std::cref(blocks[i].block),
|
||||||
|
std::ref(parsed_blocks[i].block), std::ref(parsed_blocks[i].hash), std::ref(parsed_blocks[i].error)));
|
||||||
|
}
|
||||||
|
waiter.wait();
|
||||||
|
for (size_t i = 0; i < blocks.size(); ++i)
|
||||||
|
{
|
||||||
|
if (parsed_blocks[i].error)
|
||||||
|
{
|
||||||
|
error = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
parsed_blocks[i].o_indices = std::move(o_indices[i]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch(...)
|
catch(...)
|
||||||
{
|
{
|
||||||
|
@ -2197,7 +2202,7 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re
|
||||||
tools::threadpool::waiter waiter;
|
tools::threadpool::waiter waiter;
|
||||||
uint64_t blocks_start_height;
|
uint64_t blocks_start_height;
|
||||||
std::vector<cryptonote::block_complete_entry> blocks;
|
std::vector<cryptonote::block_complete_entry> blocks;
|
||||||
std::vector<COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices> o_indices;
|
std::vector<parsed_block> parsed_blocks;
|
||||||
bool refreshed = false;
|
bool refreshed = false;
|
||||||
|
|
||||||
// pull the first set of blocks
|
// pull the first set of blocks
|
||||||
|
@ -2218,11 +2223,11 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re
|
||||||
// If stop() is called during fast refresh we don't need to continue
|
// If stop() is called during fast refresh we don't need to continue
|
||||||
if(!m_run.load(std::memory_order_relaxed))
|
if(!m_run.load(std::memory_order_relaxed))
|
||||||
return;
|
return;
|
||||||
pull_blocks(start_height, blocks_start_height, short_chain_history, blocks, o_indices);
|
|
||||||
// always reset start_height to 0 to force short_chain_ history to be used on
|
// always reset start_height to 0 to force short_chain_ history to be used on
|
||||||
// subsequent pulls in this refresh.
|
// subsequent pulls in this refresh.
|
||||||
start_height = 0;
|
start_height = 0;
|
||||||
|
|
||||||
|
bool first = true;
|
||||||
while(m_run.load(std::memory_order_relaxed))
|
while(m_run.load(std::memory_order_relaxed))
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
|
@ -2230,19 +2235,22 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re
|
||||||
// pull the next set of blocks while we're processing the current one
|
// pull the next set of blocks while we're processing the current one
|
||||||
uint64_t next_blocks_start_height;
|
uint64_t next_blocks_start_height;
|
||||||
std::vector<cryptonote::block_complete_entry> next_blocks;
|
std::vector<cryptonote::block_complete_entry> next_blocks;
|
||||||
std::vector<cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices> next_o_indices;
|
std::vector<parsed_block> next_parsed_blocks;
|
||||||
bool error = false;
|
bool error = false;
|
||||||
if (blocks.empty())
|
if (!first && blocks.empty())
|
||||||
{
|
{
|
||||||
refreshed = false;
|
refreshed = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
tpool.submit(&waiter, [&]{pull_next_blocks(start_height, next_blocks_start_height, short_chain_history, blocks, next_blocks, next_o_indices, error);});
|
tpool.submit(&waiter, [&]{pull_and_parse_next_blocks(start_height, next_blocks_start_height, short_chain_history, blocks, next_blocks, next_parsed_blocks, error);});
|
||||||
|
|
||||||
process_blocks(blocks_start_height, blocks, o_indices, added_blocks);
|
if (!first)
|
||||||
blocks_fetched += added_blocks;
|
{
|
||||||
|
process_parsed_blocks(blocks_start_height, blocks, parsed_blocks, added_blocks);
|
||||||
|
blocks_fetched += added_blocks;
|
||||||
|
}
|
||||||
waiter.wait();
|
waiter.wait();
|
||||||
if(blocks_start_height == next_blocks_start_height)
|
if(!first && blocks_start_height == next_blocks_start_height)
|
||||||
{
|
{
|
||||||
m_node_rpc_proxy.set_height(m_blockchain.size());
|
m_node_rpc_proxy.set_height(m_blockchain.size());
|
||||||
refreshed = true;
|
refreshed = true;
|
||||||
|
@ -2252,7 +2260,8 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re
|
||||||
// switch to the new blocks from the daemon
|
// switch to the new blocks from the daemon
|
||||||
blocks_start_height = next_blocks_start_height;
|
blocks_start_height = next_blocks_start_height;
|
||||||
blocks = std::move(next_blocks);
|
blocks = std::move(next_blocks);
|
||||||
o_indices = next_o_indices;
|
parsed_blocks = std::move(next_parsed_blocks);
|
||||||
|
first = false;
|
||||||
|
|
||||||
// handle error from async fetching thread
|
// handle error from async fetching thread
|
||||||
if (error)
|
if (error)
|
||||||
|
|
|
@ -452,6 +452,14 @@ namespace tools
|
||||||
|
|
||||||
typedef std::tuple<uint64_t, crypto::public_key, rct::key> get_outs_entry;
|
typedef std::tuple<uint64_t, crypto::public_key, rct::key> get_outs_entry;
|
||||||
|
|
||||||
|
struct parsed_block
|
||||||
|
{
|
||||||
|
crypto::hash hash;
|
||||||
|
cryptonote::block block;
|
||||||
|
cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices o_indices;
|
||||||
|
bool error;
|
||||||
|
};
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Generates a wallet or restores one.
|
* \brief Generates a wallet or restores one.
|
||||||
* \param wallet_ Name of wallet file
|
* \param wallet_ Name of wallet file
|
||||||
|
@ -1126,8 +1134,8 @@ namespace tools
|
||||||
void pull_blocks(uint64_t start_height, uint64_t& blocks_start_height, const std::list<crypto::hash> &short_chain_history, std::vector<cryptonote::block_complete_entry> &blocks, std::vector<cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices> &o_indices);
|
void pull_blocks(uint64_t start_height, uint64_t& blocks_start_height, const std::list<crypto::hash> &short_chain_history, std::vector<cryptonote::block_complete_entry> &blocks, std::vector<cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices> &o_indices);
|
||||||
void pull_hashes(uint64_t start_height, uint64_t& blocks_start_height, const std::list<crypto::hash> &short_chain_history, std::vector<crypto::hash> &hashes);
|
void pull_hashes(uint64_t start_height, uint64_t& blocks_start_height, const std::list<crypto::hash> &short_chain_history, std::vector<crypto::hash> &hashes);
|
||||||
void fast_refresh(uint64_t stop_height, uint64_t &blocks_start_height, std::list<crypto::hash> &short_chain_history);
|
void fast_refresh(uint64_t stop_height, uint64_t &blocks_start_height, std::list<crypto::hash> &short_chain_history);
|
||||||
void pull_next_blocks(uint64_t start_height, uint64_t &blocks_start_height, std::list<crypto::hash> &short_chain_history, const std::vector<cryptonote::block_complete_entry> &prev_blocks, std::vector<cryptonote::block_complete_entry> &blocks, std::vector<cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices> &o_indices, bool &error);
|
void pull_and_parse_next_blocks(uint64_t start_height, uint64_t &blocks_start_height, std::list<crypto::hash> &short_chain_history, const std::vector<cryptonote::block_complete_entry> &prev_blocks, std::vector<cryptonote::block_complete_entry> &blocks, std::vector<parsed_block> &parsed_blocks, bool &error);
|
||||||
void process_blocks(uint64_t start_height, const std::vector<cryptonote::block_complete_entry> &blocks, const std::vector<cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices> &o_indices, uint64_t& blocks_added);
|
void process_parsed_blocks(uint64_t start_height, const std::vector<cryptonote::block_complete_entry> &blocks, const std::vector<parsed_block> &parsed_blocks, uint64_t& blocks_added);
|
||||||
uint64_t select_transfers(uint64_t needed_money, std::vector<size_t> unused_transfers_indices, std::vector<size_t>& selected_transfers, bool trusted_daemon) const;
|
uint64_t select_transfers(uint64_t needed_money, std::vector<size_t> unused_transfers_indices, std::vector<size_t>& selected_transfers, bool trusted_daemon) const;
|
||||||
bool prepare_file_names(const std::string& file_path);
|
bool prepare_file_names(const std::string& file_path);
|
||||||
void process_unconfirmed(const crypto::hash &txid, const cryptonote::transaction& tx, uint64_t height);
|
void process_unconfirmed(const crypto::hash &txid, const cryptonote::transaction& tx, uint64_t height);
|
||||||
|
|
Loading…
Reference in New Issue