Start and stop mining IPC

This commit is contained in:
Oran Juice 2015-05-14 22:23:38 +05:30
parent 015847b0d0
commit 9eb15fffa5
No known key found for this signature in database
GPG Key ID: 71C5AF46CCB28124
11 changed files with 156 additions and 110 deletions

View File

@ -76,8 +76,11 @@ namespace IPC
return;
}
cryptonote::account_public_address adr;
const char *address = wap_proto_address(message);
if (!get_account_address_from_str(adr, testnet, std::string(address)))
zchunk_t *address_chunk = wap_proto_address(message);
char *address = (char*)zchunk_data(address_chunk);
std::string address_string(address, zchunk_size(address_chunk));
if (!get_account_address_from_str(adr, testnet, std::string(address_string)))
{
wap_proto_set_status(message, STATUS_WRONG_ADDRESS);
return;
@ -86,8 +89,8 @@ namespace IPC
boost::thread::attributes attrs;
attrs.set_stack_size(THREAD_STACK_SIZE);
uint64_t threads_count = wap_proto_thread_count(message);
if (!core->get_miner().start(adr, static_cast<size_t>(threads_count), attrs))
uint64_t thread_count = wap_proto_thread_count(message);
if (!core->get_miner().start(adr, static_cast<size_t>(thread_count), attrs))
{
wap_proto_set_status(message, STATUS_MINING_NOT_STARTED);
return;
@ -95,6 +98,16 @@ namespace IPC
wap_proto_set_status(message, STATUS_OK);
}
void stop_mining(wap_proto_t *message)
{
if (!core->get_miner().stop())
{
wap_proto_set_status(message, STATUS_MINING_NOT_STOPPED);
return;
}
wap_proto_set_status(message, STATUS_OK);
}
void retrieve_blocks(wap_proto_t *message)
{
if (!check_core_busy()) {

View File

@ -64,9 +64,11 @@ namespace IPC
const uint64_t STATUS_TX_VERIFICATION_FAILED = 7;
const uint64_t STATUS_TX_NOT_RELAYED = 8;
const uint64_t STATUS_RANDOM_OUTS_FAILED = 9;
const uint64_t STATUS_MINING_NOT_STOPPED = 10;
namespace Daemon
{
void start_mining(wap_proto_t *message);
void stop_mining(wap_proto_t *message);
void retrieve_blocks(wap_proto_t *message);
void send_raw_transaction(wap_proto_t *message);
void get_output_indexes(wap_proto_t *message);

View File

@ -100,7 +100,7 @@ WAP_EXPORT int
// Send start command to server.
// Returns >= 0 if successful, -1 if interrupted.
WAP_EXPORT int
wap_client_start (wap_client_t *self, const char *address, uint64_t thread_count);
wap_client_start (wap_client_t *self, zchunk_t **address_p, uint64_t thread_count);
// Send stop command to server.
// Returns >= 0 if successful, -1 if interrupted.

View File

@ -139,7 +139,7 @@ struct _client_args_t {
zchunk_t *tx_id;
uint64_t outs_count;
zframe_t *amounts;
char *address;
zchunk_t *address;
uint64_t thread_count;
};
@ -283,7 +283,7 @@ s_client_destroy (s_client_t **self_p)
zchunk_destroy (&self->args.tx_as_hex);
zchunk_destroy (&self->args.tx_id);
zframe_destroy (&self->args.amounts);
zstr_free (&self->args.address);
zchunk_destroy (&self->args.address);
client_terminate (&self->client);
wap_proto_destroy (&self->message);
zsock_destroy (&self->msgpipe);
@ -403,6 +403,7 @@ s_satisfy_pedantic_compilers (void)
engine_set_timeout (NULL, 0);
engine_set_wakeup_event (NULL, 0, NULL_event);
engine_handle_socket (NULL, 0, NULL);
engine_set_connected (NULL, 0);
}
@ -1472,8 +1473,8 @@ s_client_handle_cmdpipe (zloop_t *loop, zsock_t *reader, void *argument)
}
else
if (streq (method, "START")) {
zstr_free (&self->args.address);
zsock_recv (self->cmdpipe, "s8", &self->args.address, &self->args.thread_count);
zchunk_destroy (&self->args.address);
zsock_recv (self->cmdpipe, "p8", &self->args.address, &self->args.thread_count);
s_client_execute (self, start_event);
}
else
@ -1694,10 +1695,10 @@ bool
wap_client_connected (wap_client_t *self)
{
assert (self);
bool connected;
int connected;
zsock_send (self->actor, "s", "$CONNECTED");
zsock_recv (self->actor, "i", &connected);
return connected;
return (bool) connected;
}
@ -1922,11 +1923,12 @@ wap_client_random_outs (wap_client_t *self, uint64_t outs_count, zframe_t **amou
// Returns >= 0 if successful, -1 if interrupted.
int
wap_client_start (wap_client_t *self, const char *address, uint64_t thread_count)
wap_client_start (wap_client_t *self, zchunk_t **address_p, uint64_t thread_count)
{
assert (self);
zsock_send (self->actor, "ss8", "START", address, thread_count);
zsock_send (self->actor, "sp8", "START", *address_p, thread_count);
*address_p = NULL; // Take ownership of address
if (s_accept_reply (self, "START OK", "FAILURE", NULL))
return -1; // Interrupted or timed-out
return self->status;

View File

@ -76,7 +76,7 @@ with GET-OK, or ERROR.
START - Wallet asks daemon to start mining. Daemon replies with START-OK, or
ERROR.
address string
address chunk
thread_count number 8
START_OK - Daemon replies to a start mining request.
@ -295,11 +295,15 @@ zchunk_t *
void
wap_proto_set_tx_data (wap_proto_t *self, zchunk_t **chunk_p);
// Get/set the address field
const char *
// Get a copy of the address field
zchunk_t *
wap_proto_address (wap_proto_t *self);
// Get the address field and transfer ownership to caller
zchunk_t *
wap_proto_get_address (wap_proto_t *self);
// Set the address field, transferring ownership from caller
void
wap_proto_set_address (wap_proto_t *self, const char *value);
wap_proto_set_address (wap_proto_t *self, zchunk_t **chunk_p);
// Get/set the thread_count field
uint64_t

View File

@ -179,7 +179,7 @@ signal_have_blocks_ok (client_t *self)
static void
prepare_start_command (client_t *self)
{
wap_proto_set_address (self->message, self->args->address);
wap_proto_set_address (self->message, &self->args->address);
wap_proto_set_thread_count (self->message, self->args->thread_count);
}

View File

@ -34,38 +34,22 @@ struct _wap_proto_t {
int id; // wap_proto message ID
byte *needle; // Read/write pointer for serialization
byte *ceiling; // Valid upper limit for read pointer
// Wallet identity
char identity [256];
// block_ids
zlist_t *block_ids;
// start_height
uint64_t start_height;
// status
uint64_t status;
// curr_height
uint64_t curr_height;
// Frames of block data
zmsg_t *block_data;
// Transaction as hex
zchunk_t *tx_as_hex;
// Transaction ID
zchunk_t *tx_id;
// Output Indexes
zframe_t *o_indexes;
// Outs count
uint64_t outs_count;
// Amounts
zframe_t *amounts;
// Outputs
zframe_t *random_outputs;
// Transaction data
zchunk_t *tx_data;
// address
char address [256];
// thread_count
uint64_t thread_count;
// Printable explanation
char reason [256];
char identity [256]; // Wallet identity
zlist_t *block_ids; // block_ids
uint64_t start_height; // start_height
uint64_t status; // status
uint64_t curr_height; // curr_height
zmsg_t *block_data; // Frames of block data
zchunk_t *tx_as_hex; // Transaction as hex
zchunk_t *tx_id; // Transaction ID
zframe_t *o_indexes; // Output Indexes
uint64_t outs_count; // Outs count
zframe_t *amounts; // Amounts
zframe_t *random_outputs; // Outputs
zchunk_t *tx_data; // Transaction data
zchunk_t *address; // address
uint64_t thread_count; // thread_count
char reason [256]; // Printable explanation
};
// --------------------------------------------------------------------------
@ -250,6 +234,7 @@ wap_proto_destroy (wap_proto_t **self_p)
zframe_destroy (&self->amounts);
zframe_destroy (&self->random_outputs);
zchunk_destroy (&self->tx_data);
zchunk_destroy (&self->address);
// Free object itself
free (self);
@ -449,7 +434,17 @@ wap_proto_recv (wap_proto_t *self, zsock_t *input)
break;
case WAP_PROTO_START:
GET_STRING (self->address);
{
size_t chunk_size;
GET_NUMBER4 (chunk_size);
if (self->needle + chunk_size > (self->ceiling)) {
zsys_warning ("wap_proto: address is missing data");
goto malformed;
}
zchunk_destroy (&self->address);
self->address = zchunk_new (self->needle, chunk_size);
self->needle += chunk_size;
}
GET_NUMBER8 (self->thread_count);
break;
@ -565,7 +560,9 @@ wap_proto_send (wap_proto_t *self, zsock_t *output)
frame_size += zchunk_size (self->tx_data);
break;
case WAP_PROTO_START:
frame_size += 1 + strlen (self->address);
frame_size += 4; // Size is 4 octets
if (self->address)
frame_size += zchunk_size (self->address);
frame_size += 8; // thread_count
break;
case WAP_PROTO_START_OK:
@ -582,7 +579,7 @@ wap_proto_send (wap_proto_t *self, zsock_t *output)
self->needle = (byte *) zmq_msg_data (&frame);
PUT_NUMBER2 (0xAAA0 | 0);
PUT_NUMBER1 (self->id);
bool send_block_data = false;
bool have_block_data = false;
size_t nbr_frames = 1; // Total number of frames to send
switch (self->id) {
@ -611,7 +608,7 @@ wap_proto_send (wap_proto_t *self, zsock_t *output)
PUT_NUMBER8 (self->start_height);
PUT_NUMBER8 (self->curr_height);
nbr_frames += self->block_data? zmsg_size (self->block_data): 1;
send_block_data = true;
have_block_data = true;
break;
case WAP_PROTO_PUT:
@ -682,7 +679,15 @@ wap_proto_send (wap_proto_t *self, zsock_t *output)
break;
case WAP_PROTO_START:
PUT_STRING (self->address);
if (self->address) {
PUT_NUMBER4 (zchunk_size (self->address));
memcpy (self->needle,
zchunk_data (self->address),
zchunk_size (self->address));
self->needle += zchunk_size (self->address);
}
else
PUT_NUMBER4 (0); // Empty chunk
PUT_NUMBER8 (self->thread_count);
break;
@ -724,7 +729,7 @@ wap_proto_send (wap_proto_t *self, zsock_t *output)
zmq_send (zsock_resolve (output), NULL, 0, (--nbr_frames? ZMQ_SNDMORE: 0));
}
// Now send the block_data if necessary
if (send_block_data) {
if (have_block_data) {
if (self->block_data) {
zframe_t *frame = zmsg_first (self->block_data);
while (frame) {
@ -751,10 +756,7 @@ wap_proto_print (wap_proto_t *self)
zsys_debug ("WAP_PROTO_OPEN:");
zsys_debug (" protocol=wap");
zsys_debug (" version=1");
if (self->identity)
zsys_debug (" identity='%s'", self->identity);
else
zsys_debug (" identity=");
zsys_debug (" identity='%s'", self->identity);
break;
case WAP_PROTO_OPEN_OK:
@ -851,10 +853,7 @@ wap_proto_print (wap_proto_t *self)
case WAP_PROTO_START:
zsys_debug ("WAP_PROTO_START:");
if (self->address)
zsys_debug (" address='%s'", self->address);
else
zsys_debug (" address=");
zsys_debug (" address=[ ... ]");
zsys_debug (" thread_count=%ld", (long) self->thread_count);
break;
@ -890,10 +889,7 @@ wap_proto_print (wap_proto_t *self)
case WAP_PROTO_ERROR:
zsys_debug ("WAP_PROTO_ERROR:");
zsys_debug (" status=%ld", (long) self->status);
if (self->reason)
zsys_debug (" reason='%s'", self->reason);
else
zsys_debug (" reason=");
zsys_debug (" reason='%s'", self->reason);
break;
}
@ -1377,24 +1373,35 @@ wap_proto_set_tx_data (wap_proto_t *self, zchunk_t **chunk_p)
// --------------------------------------------------------------------------
// Get/set the address field
// Get the address field without transferring ownership
const char *
zchunk_t *
wap_proto_address (wap_proto_t *self)
{
assert (self);
return self->address;
}
// Get the address field and transfer ownership to caller
zchunk_t *
wap_proto_get_address (wap_proto_t *self)
{
zchunk_t *address = self->address;
self->address = NULL;
return address;
}
// Set the address field, transferring ownership from caller
void
wap_proto_set_address (wap_proto_t *self, const char *value)
wap_proto_set_address (wap_proto_t *self, zchunk_t **chunk_p)
{
assert (self);
assert (value);
if (value == self->address)
return;
strncpy (self->address, value, 255);
self->address [255] = 0;
assert (chunk_p);
zchunk_destroy (&self->address);
self->address = *chunk_p;
*chunk_p = NULL;
}
@ -1455,7 +1462,6 @@ wap_proto_test (bool verbose)
wap_proto_t *self = wap_proto_new ();
assert (self);
wap_proto_destroy (&self);
// Create pair of sockets we can send through
// We must bind before connect if we wish to remain compatible with ZeroMQ < v4
zsock_t *output = zsock_new (ZMQ_DEALER);
@ -1468,6 +1474,7 @@ wap_proto_test (bool verbose)
rc = zsock_connect (input, "inproc://selftest-wap_proto");
assert (rc == 0);
// Encode/send/decode and verify each message type
int instance;
self = wap_proto_new ();
@ -1677,7 +1684,8 @@ wap_proto_test (bool verbose)
}
wap_proto_set_id (self, WAP_PROTO_START);
wap_proto_set_address (self, "Life is short but Now lasts for ever");
zchunk_t *start_address = zchunk_new ("Captcha Diem", 12);
wap_proto_set_address (self, &start_address);
wap_proto_set_thread_count (self, 123);
// Send twice
wap_proto_send (self, output);
@ -1686,7 +1694,8 @@ wap_proto_test (bool verbose)
for (instance = 0; instance < 2; instance++) {
wap_proto_recv (self, input);
assert (wap_proto_routing_id (self));
assert (streq (wap_proto_address (self), "Life is short but Now lasts for ever"));
assert (memcmp (zchunk_data (wap_proto_address (self)), "Captcha Diem", 12) == 0);
zchunk_destroy (&start_address);
assert (wap_proto_thread_count (self) == 123);
}
wap_proto_set_id (self, WAP_PROTO_START_OK);

View File

@ -184,7 +184,6 @@ static void
start_mining_process (client_t *self)
{
IPC::Daemon::start_mining(self->message);
printf("\n\n Request: %d \n\n", (int)wap_proto_start_height(self->message));
}
@ -195,7 +194,7 @@ start_mining_process (client_t *self)
static void
stop_mining_process (client_t *self)
{
IPC::Daemon::stop_mining(self->message);
}
// ---------------------------------------------------------------------------

View File

@ -756,20 +756,24 @@ bool simple_wallet::start_mining(const std::vector<std::string>& args)
return true;
COMMAND_RPC_START_MINING::request req;
req.miner_address = m_wallet->get_account().get_public_address_str(m_wallet->testnet());
// req.miner_address = m_wallet->get_account().get_public_address_str(m_wallet->testnet());
std::string miner_address = m_wallet->get_account().get_public_address_str(m_wallet->testnet());
uint64_t threads_count;
bool ok = true;
size_t max_mining_threads_count = (std::max)(std::thread::hardware_concurrency(), static_cast<unsigned>(2));
if (0 == args.size())
{
req.threads_count = 1;
// req.threads_count = 1;
threads_count = 1;
}
else if (1 == args.size())
{
uint16_t num = 1;
ok = string_tools::get_xtype_from_string(num, args[0]);
ok = ok && (1 <= num && num <= max_mining_threads_count);
req.threads_count = num;
// req.threads_count = num;
threads_count = num;
}
else
{
@ -783,13 +787,16 @@ bool simple_wallet::start_mining(const std::vector<std::string>& args)
return true;
}
COMMAND_RPC_START_MINING::response res;
bool r = net_utils::invoke_http_json_remote_command2(m_daemon_address + "/start_mining", req, res, m_http_client);
std::string err = interpret_rpc_response(r, res.status);
if (err.empty())
// COMMAND_RPC_START_MINING::response res;
// bool r = net_utils::invoke_http_json_remote_command2(m_daemon_address + "/start_mining", req, res, m_http_client);
// std::string err = interpret_rpc_response(r, res.status);
uint64_t status = m_wallet->start_mining(miner_address, threads_count);
// res has to be true since we have checked before.
if (status == IPC::STATUS_OK)
success_msg_writer() << "Mining started in daemon";
else
fail_msg_writer() << "mining has NOT been started: " << err;
fail_msg_writer() << "mining has NOT been started: " << status;
return true;
}
//----------------------------------------------------------------------------------------------------
@ -798,14 +805,15 @@ bool simple_wallet::stop_mining(const std::vector<std::string>& args)
if (!try_connect_to_daemon())
return true;
COMMAND_RPC_STOP_MINING::request req;
COMMAND_RPC_STOP_MINING::response res;
bool r = net_utils::invoke_http_json_remote_command2(m_daemon_address + "/stop_mining", req, res, m_http_client);
std::string err = interpret_rpc_response(r, res.status);
if (err.empty())
// COMMAND_RPC_STOP_MINING::request req;
// COMMAND_RPC_STOP_MINING::response res;
// bool r = net_utils::invoke_http_json_remote_command2(m_daemon_address + "/stop_mining", req, res, m_http_client);
// std::string err = interpret_rpc_response(r, res.status);
uint64_t status = m_wallet->stop_mining();
if (status == IPC::STATUS_OK)
success_msg_writer() << "Mining stopped in daemon";
else
fail_msg_writer() << "mining has NOT been stopped: " << err;
fail_msg_writer() << "mining has NOT been stopped: " << status;
return true;
}
//----------------------------------------------------------------------------------------------------
@ -1069,9 +1077,8 @@ bool simple_wallet::show_blockchain_height(const std::vector<std::string>& args)
//----------------------------------------------------------------------------------------------------
bool simple_wallet::transfer(const std::vector<std::string> &args_)
{
// TODO: Find a way to check if daemon is connectible via 0MQ.
/*if (!try_connect_to_daemon())
return true;*/
if (!try_connect_to_daemon())
return true;
std::vector<std::string> local_args = args_;

View File

@ -762,18 +762,7 @@ bool wallet2::prepare_file_names(const std::string& file_path)
//----------------------------------------------------------------------------------------------------
bool wallet2::check_connection()
{
if(m_http_client.is_connected())
return true;
net_utils::http::url_content u;
net_utils::parse_url(m_daemon_address, u);
if(!u.port)
{
u.port = m_testnet ? config::testnet::RPC_DEFAULT_PORT : config::RPC_DEFAULT_PORT;
}
return m_http_client.connect(u.host, std::to_string(u.port), WALLET_RCP_CONNECTION_TIMEOUT);
return ipc_client && wap_client_connected(ipc_client);
}
//----------------------------------------------------------------------------------------------------
void wallet2::load(const std::string& wallet_, const std::string& password)
@ -1286,4 +1275,18 @@ void wallet2::connect_to_daemon() {
ipc_client = wap_client_new();
wap_client_connect(ipc_client, "ipc://@/monero", 200, "wallet identity");
}
uint64_t wallet2::start_mining(const std::string &address, uint64_t thread_count) {
zchunk_t *address_chunk = zchunk_new((void*)address.c_str(), address.length());
int rc = wap_client_start(ipc_client, &address_chunk, thread_count);
zchunk_destroy(&address_chunk);
THROW_WALLET_EXCEPTION_IF(rc < 0, error::no_connection_to_daemon, "start_mining");
return wap_client_status(ipc_client);
}
uint64_t wallet2::stop_mining() {
int rc = wap_client_stop(ipc_client);
THROW_WALLET_EXCEPTION_IF(rc < 0, error::no_connection_to_daemon, "stop_mining");
return wap_client_status(ipc_client);
}
}

View File

@ -84,6 +84,7 @@ namespace tools
wallet2(const wallet2&) : m_run(true), m_callback(0), m_testnet(false) {};
public:
wallet2(bool testnet = false, bool restricted = false) : m_run(true), m_callback(0), m_testnet(testnet) {
ipc_client = NULL;
connect_to_daemon();
if (!ipc_client) {
std::cout << "Couldn't connect to daemon\n\n";
@ -91,6 +92,9 @@ namespace tools
// it's not null and otherwise throw.
}
};
~wallet2() {
stop_ipc_client();
};
struct transfer_details
{
uint64_t m_block_height;
@ -261,6 +265,9 @@ namespace tools
static std::vector<std::string> addresses_from_url(const std::string& url, bool& dnssec_valid);
static std::string address_from_txt_record(const std::string& s);
uint64_t start_mining(const std::string &address, uint64_t thread_count);
uint64_t stop_mining();
private:
/*!
* \brief Stores wallet information to wallet file.