Skip to content

Commit

Permalink
Async wallet scanner (monero-project#23)
Browse files Browse the repository at this point in the history
Implement async wallet scanner.

Adds a new functional test for direct wallet2 -> live RPC daemon
interactions. This sets up a framework to test pointing the
Seraphis wallet lib to a live daemon.

Tests sending and scanning:
- a normal transfer
- a sweep single (0 change)
- to a single subaddress
- to 3 subaddresses (i.e. scanning using additional pub keys)

* scan machine: option to force reorg avoidance increment first pass

- when pointing to a daemon that does not support returning empty
blocks when the client requests too high of a height, we have to
be careful in our scanner to always request blocks below the chain
tip, in every request.
- by forcing the reorg avoidance increment on first pass, we make
sure clients will always include the reorg avoidance increment
when requesting blocks from the daemon, so the client can expect
the request for blocks should *always* return an ok height.

* core tests: check conversion tool on all legacy enote version types

Stil TODO:
- check complete scanning on all enote types
- hit every branch condition for all enote versions

* conn pool mock: epee http client connection pool

- Enables concurrent network requests using the epee http client.
- Still TODO for production:

1) close_connections
2) require the pool respect max_connections

* enote finding context: IN LegacyUnscannedChunk, OUT ChunkData

- finds owned enotes by legacy view scanning a chunk of blocks

* async: function to remove minimum element from token queue

- Useful when we want to remove elements of the token queue in an
order that is different than insertion order.

* async scanner: scan via RPC, fetching & scanning parallel chunks

*How it works*

Assume the user's wallet must start scanning blocks from height
5000.

1. The scanner begins by launching 10 RPC requests in parallel to
fetch chunks of blocks as follows:

```
request 0: { start_height: 5000, max_block_count: 20 }
request 1: { start_height: 5020, max_block_count: 20 }
...
request 9: { start_height: 5180, max_block_count: 20 }
```

2. As soon as any single request completes, the wallet immediately
parses that chunk.
  - This is all in parallel. For example, as soon as request 7
  responds, the wallet immediately begins parsing that chunk in
  parallel to any other chunks it's already parsing.
3. If a chunk does not include a total of max_block_count blocks,
and the chunk is not the tip of the chain, this means there was a
"gap" in the chunk request. The scanner launches another parallel
RPC request to fill in the gap.
  - This gap can occur because the server will **not** return a
  chunk of blocks greater than 100mb (or 20k txs) via the
  /getblocks.bin` RPC endpoint
  ([`FIND_BLOCKCHAIN_SUPPLEMENT_MAX_SIZE`](https://github.com/monero-project/monero/blob/053ba2cf07649cea8134f8a188685ab7a5365e5c/src/cryptonote_core/blockchain.cpp#L65))
  - The gap is from `(req.start_height + res.blocks.size())` to
  `(req.start_height + req.max_block_count)`.
4. As soon as the scanner finishes parsing the chunk, it
immediately submits another parallel RPC request.
5. In parallel, the scanner identifies a user's received (and
spent) enotes in each chunk.
  - For example, as soon as request 7 responds and the wallet
  parses it, the wallet scans that chunk in parallel to any other
  chunks it's already scanning.
6. Once a single chunk is fully scanned locally, the scanner
launches a parallel task to fetch and scan the next chunk.
7. Once the scanner reaches the tip of the chain (the terminal
chunk), the scanner terminates.

*Some technical highlights*

- The wallet scanner is backwards compatible with existing daemons
(though it would need to point to an updated daemon to realize the
perf speed-up).
- On error cases such as the daemon going offline, the same wallet
errors that wallet2 uses (that the wallet API expects) are
propagated up to the higher-level Seraphis lib.
- The implementation uses an http client connection pool (reusing
the epee http client) to support parallel network requests
([related](seraphis-migration/wallet3#58)).
- A developer using the scanner can "bring their own blocks/network
implementation" to the scanner by providing a callback function of
the following type as a param to the async scanner constructor:
`std::function<bool(const cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::request, cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::response)>`

---------

Co-authored-by: jeffro256 <jeffro256@tutanota.com>
  • Loading branch information
2 people authored and UkoeHB committed Aug 2, 2024
1 parent 8d862f1 commit 414cdf2
Show file tree
Hide file tree
Showing 35 changed files with 3,359 additions and 58 deletions.
1 change: 1 addition & 0 deletions src/async/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ monero_add_library(async
target_link_libraries(async
PUBLIC
common
epee
PRIVATE
${EXTRA_LIBRARIES})

Expand Down
96 changes: 96 additions & 0 deletions src/async/mutex.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) 2024, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

/// mutex

#pragma once

//local headers
#include "misc_language.h"
#include "misc_log_ex.h"

//third-party headers

//standard headers
#include <atomic>
#include <mutex>
#include <string>
#include <thread>

//forward declarations


// Lock the mutex and then unlock it at the end of the local scope
// - if it fails to unlock (either it's already unlocked or owned by a different thread), the exception is ignored
#define SCOPE_LOCK_MUTEX(mutex) \
mutex.lock(); \
auto scope_exit_handler_##mutex = epee::misc_utils::create_scope_leave_handler([this](){ \
CHECK_AND_ASSERT_THROW_MES(mutex.unlock(), "failed to unlock " + std::string(#mutex)); \
})

namespace async
{

/// mutex
class Mutex final
{
public:
/// disable copy/move
Mutex& operator=(Mutex&&) = delete;

//member functions
/// Lock the mutex and claim ownership
void lock()
{
m_mutex.lock();
m_mutex_owner.store(std::this_thread::get_id(), std::memory_order_relaxed);
}

/// Release ownership and unlock the mutex. If this thread does not own the lock already, returns false.
bool unlock()
{
if (!thread_owns_lock())
return false;
m_mutex_owner.store(std::thread::id{}, std::memory_order_relaxed);
m_mutex.unlock();
return true;
}

/// Check if the given thread owns the mutex
bool thread_owns_lock(const std::thread::id thread_id = std::this_thread::get_id()) const
{
return thread_id == m_mutex_owner.load(std::memory_order_relaxed);
}

private:
//member variables
std::mutex m_mutex;
std::atomic<std::thread::id> m_mutex_owner{std::thread::id{}};
};

} //namespace asyc
32 changes: 31 additions & 1 deletion src/async/token_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ enum class TokenQueueResult : unsigned char
SUCCESS,
QUEUE_EMPTY,
TRY_LOCK_FAIL,
SHUTTING_DOWN
SHUTTING_DOWN,
QUEUE_NOT_EMPTY
};

/// async token queue
Expand Down Expand Up @@ -116,7 +117,27 @@ class TokenQueue final
m_queue.pop_front();
return TokenQueueResult::SUCCESS;
}
/// try to remove the minimum element
TokenQueueResult try_remove_min(TokenT &token_out)
{
// try to lock the queue, then check if there are any elements
std::unique_lock<std::mutex> lock{m_mutex, std::try_to_lock};
if (!lock.owns_lock())
return TokenQueueResult::TRY_LOCK_FAIL;
if (m_queue.size() == 0)
return TokenQueueResult::QUEUE_EMPTY;

// find the min element
auto min_elem = m_queue.begin();
for (auto it = m_queue.begin(); it != m_queue.end(); ++it)
{
if (*it < *min_elem)
min_elem = it;
}
token_out = std::move(*min_elem);
m_queue.erase(min_elem);
return TokenQueueResult::SUCCESS;
}
/// shut down the queue
void shut_down()
{
Expand All @@ -126,6 +147,15 @@ class TokenQueue final
}
m_condvar.notify_all();
}
/// reset the queue (queue must already be empty)
TokenQueueResult reset()
{
std::lock_guard<std::mutex> lock{m_mutex};
if (!m_queue.empty())
return TokenQueueResult::QUEUE_NOT_EMPTY;
m_is_shutting_down = false;
return TokenQueueResult::SUCCESS;
}

private:
//member variables
Expand Down
110 changes: 110 additions & 0 deletions src/common/rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ namespace tools
);
}

t_rpc_client(
const std::string &daemon_addr
, boost::optional<epee::net_utils::http::login> user
, epee::net_utils::ssl_options_t ssl_options
)
: m_http_client{}
{
m_http_client.set_server(
daemon_addr, std::move(user), std::move(ssl_options)
);
}

template <typename T_req, typename T_res>
bool basic_json_rpc_request(
T_req & req
Expand Down Expand Up @@ -114,6 +126,33 @@ namespace tools
}
}

template <typename T_req, typename T_res>
bool basic_rpc_request(
T_req & req
, T_res & res
, std::string const & relative_url
)
{
t_http_connection connection(&m_http_client);

bool ok = connection.is_open();
if (!ok)
{
fail_msg_writer() << "Couldn't connect to daemon: " << m_http_client.get_host() << ":" << m_http_client.get_port();
return false;
}
ok = epee::net_utils::invoke_http_json(relative_url, req, res, m_http_client, t_http_connection::TIMEOUT());
if (!ok)
{
fail_msg_writer() << "basic_rpc_request: Daemon request failed";
return false;
}
else
{
return true;
}
}

template <typename T_req, typename T_res>
bool rpc_request(
T_req & req
Expand Down Expand Up @@ -148,4 +187,75 @@ namespace tools
return connection.is_open();
}
};

class t_daemon_rpc_client final
{
private:
t_rpc_client m_rpc_client;
public:
t_daemon_rpc_client(
const std::string &daemon_addr
, const boost::optional<epee::net_utils::http::login> &daemon_login
, const epee::net_utils::ssl_options_t ssl_support
)
: m_rpc_client{daemon_addr, daemon_login, ssl_support}
{
}

cryptonote::COMMAND_RPC_GET_HEIGHT::response get_height()
{
cryptonote::COMMAND_RPC_GET_HEIGHT::request req = AUTO_VAL_INIT(req);
cryptonote::COMMAND_RPC_GET_HEIGHT::response res = AUTO_VAL_INIT(res);
CHECK_AND_ASSERT_THROW_MES(m_rpc_client.basic_rpc_request(req, res, "/get_height"), "failed to get height");
return res;
}

cryptonote::COMMAND_RPC_POP_BLOCKS::response pop_blocks(uint64_t nblocks = 1)
{
cryptonote::COMMAND_RPC_POP_BLOCKS::request req = AUTO_VAL_INIT(req);
cryptonote::COMMAND_RPC_POP_BLOCKS::response res = AUTO_VAL_INIT(res);
req.nblocks = nblocks;
CHECK_AND_ASSERT_THROW_MES(m_rpc_client.basic_rpc_request(req, res, "/pop_blocks"), "failed to pop blocks");
return res;
}

cryptonote::COMMAND_RPC_GET_TRANSACTIONS::response get_transactions(const std::vector<std::string> &txs_hashes)
{
cryptonote::COMMAND_RPC_GET_TRANSACTIONS::request req = AUTO_VAL_INIT(req);
cryptonote::COMMAND_RPC_GET_TRANSACTIONS::response res = AUTO_VAL_INIT(res);
req.txs_hashes = txs_hashes;
req.decode_as_json = false;
CHECK_AND_ASSERT_THROW_MES(m_rpc_client.basic_rpc_request(req, res, "/get_transactions"), "failed to get transactions");
return res;
}

cryptonote::COMMAND_RPC_FLUSH_TRANSACTION_POOL::response flush_txpool()
{
cryptonote::COMMAND_RPC_FLUSH_TRANSACTION_POOL::request req = AUTO_VAL_INIT(req);
cryptonote::COMMAND_RPC_FLUSH_TRANSACTION_POOL::response res = AUTO_VAL_INIT(res);
CHECK_AND_ASSERT_THROW_MES(m_rpc_client.basic_json_rpc_request(req, res, "flush_txpool"), "failed to flush txpool");
return res;
}

cryptonote::COMMAND_RPC_GENERATEBLOCKS::response generateblocks(const std::string &address, uint64_t amount_of_blocks)
{
cryptonote::COMMAND_RPC_GENERATEBLOCKS::request req = AUTO_VAL_INIT(req);
cryptonote::COMMAND_RPC_GENERATEBLOCKS::response res = AUTO_VAL_INIT(res);
req.amount_of_blocks = amount_of_blocks;
req.wallet_address = address;
req.prev_block = "";
req.starting_nonce = 0;
CHECK_AND_ASSERT_THROW_MES(m_rpc_client.basic_json_rpc_request(req, res, "generateblocks"), "failed to generate blocks");
return res;
}

cryptonote::COMMAND_RPC_GET_LAST_BLOCK_HEADER::response get_last_block_header()
{
cryptonote::COMMAND_RPC_GET_LAST_BLOCK_HEADER::request req = AUTO_VAL_INIT(req);
cryptonote::COMMAND_RPC_GET_LAST_BLOCK_HEADER::response res = AUTO_VAL_INIT(res);
req.client = "";
CHECK_AND_ASSERT_THROW_MES(m_rpc_client.basic_json_rpc_request(req, res, "get_last_block_header"), "failed to get last block header");
return res;
}
};
}
10 changes: 6 additions & 4 deletions src/cryptonote_core/blockchain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2774,8 +2774,9 @@ bool Blockchain::find_blockchain_supplement(const uint64_t req_start_block, cons
if(req_start_block > 0)
{
// if requested height is higher than our chain, return false -- we can't help
top_hash = m_db->top_block_hash(&total_height);
++total_height;
uint64_t top_height;
top_hash = m_db->top_block_hash(&top_height);
total_height = top_height + 1;
if (req_start_block >= total_height)
{
return false;
Expand All @@ -2791,8 +2792,9 @@ bool Blockchain::find_blockchain_supplement(const uint64_t req_start_block, cons
}

db_rtxn_guard rtxn_guard(m_db);
top_hash = m_db->top_block_hash(&total_height);
++total_height;
uint64_t top_height;
top_hash = m_db->top_block_hash(&top_height);
total_height = top_height + 1;
blocks.reserve(std::min(std::min(max_block_count, (size_t)10000), (size_t)(total_height - start_height)));
CHECK_AND_ASSERT_MES(m_db->get_blocks_from(start_height, 3, max_block_count, max_tx_count, FIND_BLOCKCHAIN_SUPPLEMENT_MAX_SIZE, blocks, pruned, true, get_miner_tx_hash),
false, "Error getting blocks");
Expand Down
1 change: 1 addition & 0 deletions src/ringct/rctTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ namespace rct {
return bytes[i];
}
bool operator==(const key &k) const { return !crypto_verify_32(bytes, k.bytes); }
bool operator!=(const key &k) const { return crypto_verify_32(bytes, k.bytes); }
unsigned char bytes[32];
};
typedef std::vector<key> keyV; //vector of keys
Expand Down
8 changes: 4 additions & 4 deletions src/rpc/core_rpc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -725,12 +725,12 @@ namespace cryptonote
}
}

size_t max_blocks = COMMAND_RPC_GET_BLOCKS_FAST_MAX_BLOCK_COUNT;
size_t max_blocks = req.max_block_count > 0
? std::min(req.max_block_count, (uint64_t)COMMAND_RPC_GET_BLOCKS_FAST_MAX_BLOCK_COUNT)
: COMMAND_RPC_GET_BLOCKS_FAST_MAX_BLOCK_COUNT;
if (m_rpc_payment)
{
max_blocks = res.credits / COST_PER_BLOCK;
if (max_blocks > COMMAND_RPC_GET_BLOCKS_FAST_MAX_BLOCK_COUNT)
max_blocks = COMMAND_RPC_GET_BLOCKS_FAST_MAX_BLOCK_COUNT;
max_blocks = std::min((size_t)(res.credits / COST_PER_BLOCK), max_blocks);
if (max_blocks == 0)
{
res.status = CORE_RPC_STATUS_PAYMENT_REQUIRED;
Expand Down
5 changes: 4 additions & 1 deletion src/rpc/core_rpc_server_commands_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ namespace cryptonote
#define CORE_RPC_STATUS_NOT_MINING "NOT MINING"
#define CORE_RPC_STATUS_PAYMENT_REQUIRED "PAYMENT REQUIRED"

inline const std::string get_rpc_status(const bool trusted_daemon, const std::string &s)
static inline const std::string get_rpc_status(const bool trusted_daemon, const std::string &s)
{
if (trusted_daemon)
return s;
Expand Down Expand Up @@ -191,6 +191,8 @@ inline const std::string get_rpc_status(const bool trusted_daemon, const std::st
bool no_miner_tx;
bool high_height_ok;
uint64_t pool_info_since;
uint64_t max_block_count;

BEGIN_KV_SERIALIZE_MAP()
KV_SERIALIZE_PARENT(rpc_access_request_base)
KV_SERIALIZE_OPT(requested_info, (uint8_t)0)
Expand All @@ -200,6 +202,7 @@ inline const std::string get_rpc_status(const bool trusted_daemon, const std::st
KV_SERIALIZE_OPT(no_miner_tx, false)
KV_SERIALIZE_OPT(high_height_ok, false) // default false maintains backwards compatibility for clients that relied on failure on high height
KV_SERIALIZE_OPT(pool_info_since, (uint64_t)0)
KV_SERIALIZE_OPT(max_block_count, (uint64_t)0)
END_KV_SERIALIZE_MAP()
};
typedef epee::misc_utils::struct_init<request_t> request;
Expand Down
1 change: 1 addition & 0 deletions src/rpc/daemon_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ BEGIN_RPC_MESSAGE_CLASS(GetBlocksFast);
RPC_MESSAGE_MEMBER(std::list<crypto::hash>, block_ids);
RPC_MESSAGE_MEMBER(uint64_t, start_height);
RPC_MESSAGE_MEMBER(bool, prune);
RPC_MESSAGE_MEMBER(uint64_t, max_block_count);
END_RPC_MESSAGE_REQUEST;
BEGIN_RPC_MESSAGE_RESPONSE;
RPC_MESSAGE_MEMBER(std::vector<cryptonote::rpc::block_with_transactions>, blocks);
Expand Down
1 change: 1 addition & 0 deletions src/seraphis_impl/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

set(seraphis_impl_sources
checkpoint_cache.cpp
enote_finding_context_legacy.cpp
enote_store.cpp
enote_store_payment_validator.cpp
enote_store_utils.cpp
Expand Down
Loading

0 comments on commit 414cdf2

Please sign in to comment.