Skip to content

Commit

Permalink
Implemented filtering options for eth_subscribe with logs #27842
Browse files Browse the repository at this point in the history
Signed-off-by: Vadym Struts <vstruts@brave.com>
  • Loading branch information
vadimstruts committed Feb 2, 2023
1 parent 0ad5802 commit df4e7d7
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 44 deletions.
73 changes: 73 additions & 0 deletions browser/brave_wallet/ethereum_provider_impl_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ std::vector<uint8_t> DecodeHexHash(const std::string& hash_hex) {
return hash;
}

absl::optional<base::Value> ToValue(const network::ResourceRequest& request) {
base::StringPiece request_string(request.request_body->elements()
->at(0)
.As<network::DataElementBytes>()
.AsStringPiece());
return base::JSONReader::Read(request_string,
base::JSONParserOptions::JSON_PARSE_RFC);
}

} // namespace

class TestEventsListener : public brave_wallet::mojom::EventsListener {
Expand Down Expand Up @@ -2171,6 +2180,70 @@ TEST_F(EthereumProviderImplUnitTest, EthSubscribeLogs) {
EXPECT_FALSE(provider_->eth_logs_tracker_.IsRunning());
}

TEST_F(EthereumProviderImplUnitTest, EthSubscribeLogsFiltered) {
CreateWallet();
url_loader_factory_.SetInterceptor(
base::BindLambdaForTesting([&](const network::ResourceRequest& request) {
url_loader_factory_.ClearResponses();

std::string header_value;
EXPECT_TRUE(request.headers.GetHeader("X-Eth-Method", &header_value));

if (header_value == "eth_getLogs") {
const absl::optional<base::Value> req_body_payload =
base::JSONReader::Read(
R"({"id":1,"jsonrpc":"2.0","method":"eth_getLogs","params":
[{"address":["0x1111"],"fromBlock":"0x2211","toBlock":"0xab65",
"topics":["0x2edc","0xb832","0x8dc8"]}]})",
base::JSON_PARSE_CHROMIUM_EXTENSIONS |
base::JSONParserOptions::JSON_PARSE_RFC);

const auto payload = ToValue(request);
EXPECT_EQ(*payload, req_body_payload.value());
}
url_loader_factory_.AddResponse(
request.url.spec(),
R"({"id":1,"jsonrpc":"2.0","result":[{"address":"0x91",
"blockHash":"0xe8","blockNumber":"0x10","data":"0x0067",
"logIndex":"0x0","removed":false,
"topics":["0x4b","0x06e","0x085"],
"transactionHash":"0x22f7","transactionIndex":"0x0"}]})");
}));

// Logs subscription with parameters
std::string request_payload_json =
R"({"id":1,"jsonrpc:": "2.0","method":"eth_subscribe",
"params": ["logs", {"address": "0x1111", "fromBlock": "0x2211",
"toBlock": "0xab65", "topics": ["0x2edc", "0xb832", "0x8dc8"]}]})";
absl::optional<base::Value> request_payload = base::JSONReader::Read(
request_payload_json, base::JSON_PARSE_CHROMIUM_EXTENSIONS |
base::JSONParserOptions::JSON_PARSE_RFC);
std::string error_message;
auto response = CommonRequestOrSendAsync(request_payload.value());
EXPECT_EQ(response.first, false);
EXPECT_TRUE(response.second.is_string());
std::string subscription = *response.second.GetIfString();
browser_task_environment_.FastForwardBy(
base::Seconds(kLogTrackerDefaultTimeInSeconds));
EXPECT_TRUE(observer_->MessageEventFired());
base::Value rv = observer_->GetLastMessage();
ASSERT_TRUE(rv.is_dict());

std::string* address = rv.GetDict().FindString("address");
EXPECT_EQ(*address, "0x91");

// The first unsubscribe should not stop the block tracker
request_payload_json = base::StringPrintf(R"({"id":1,"jsonrpc:": "2.0",
"method":"eth_unsubscribe",
"params": ["%s"]})",
subscription.c_str());
request_payload = base::JSONReader::Read(
request_payload_json, base::JSON_PARSE_CHROMIUM_EXTENSIONS |
base::JSONParserOptions::JSON_PARSE_RFC);
response = CommonRequestOrSendAsync(request_payload.value());
EXPECT_FALSE(provider_->eth_logs_tracker_.IsRunning());
}

TEST_F(EthereumProviderImplUnitTest, Web3ClientVersion) {
std::string expected_version = base::StringPrintf(
"BraveWallet/v%s", version_info::GetBraveChromiumVersionNumber().c_str());
Expand Down
10 changes: 7 additions & 3 deletions components/brave_wallet/browser/asset_discovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,13 @@ void AssetDiscoveryManager::OnGetEthTokenRegistry(
weak_ptr_factory_.GetWeakPtr(),
base::OwnedRef(std::move(tokens_to_search)),
triggered_by_accounts_added, chain_id);
json_rpc_service_->EthGetLogs(chain_id, from_block, to_block,
std::move(contract_addresses_to_search),
std::move(topics), std::move(callback));
base::Value::Dict filtering;
filtering.Set("fromBlock", from_block);
filtering.Set("toBlock", to_block);
filtering.Set("address", std::move(contract_addresses_to_search));
filtering.Set("topics", std::move(topics));
json_rpc_service_->EthGetLogs(chain_id, base::Value(std::move(filtering)),
std::move(callback));
}

void AssetDiscoveryManager::OnGetTokenTransferLogs(
Expand Down
26 changes: 20 additions & 6 deletions components/brave_wallet/browser/eth_logs_tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "brave/components/brave_wallet/browser/eth_logs_tracker.h"

#include <string>
#include <vector>
#include <utility>

namespace brave_wallet {

Expand All @@ -31,6 +31,16 @@ bool EthLogsTracker::IsRunning() const {
return timer_.IsRunning();
}

void EthLogsTracker::AddSubscriber(const std::string& subscription_id,
const base::Value& params) {
eth_logs_subscription_info_.insert(
std::pair<std::string, base::Value>(subscription_id, params.Clone()));
}

void EthLogsTracker::RemoveSubscriber(const std::string& subscription_id) {
eth_logs_subscription_info_.erase(subscription_id);
}

void EthLogsTracker::AddObserver(EthLogsTracker::Observer* observer) {
observers_.AddObserver(observer);
}
Expand All @@ -42,18 +52,22 @@ void EthLogsTracker::RemoveObserver(EthLogsTracker::Observer* observer) {
void EthLogsTracker::GetLogs() {
const auto chain_id = json_rpc_service_->GetChainId(mojom::CoinType::ETH);

json_rpc_service_->EthGetLogs(
chain_id, {}, {}, {}, {},
base::BindOnce(&EthLogsTracker::OnGetLogs, weak_factory_.GetWeakPtr()));
for (auto const& esi : std::as_const(eth_logs_subscription_info_)) {
json_rpc_service_->EthGetLogs(
chain_id, esi.second,
base::BindOnce(&EthLogsTracker::OnGetLogs, weak_factory_.GetWeakPtr(),
esi.first));
}
}

void EthLogsTracker::OnGetLogs([[maybe_unused]] const std::vector<Log>& logs,
void EthLogsTracker::OnGetLogs(const std::string& subscription,
[[maybe_unused]] const std::vector<Log>& logs,
base::Value rawlogs,
mojom::ProviderError error,
const std::string& error_message) {
if (error == mojom::ProviderError::kSuccess && rawlogs.is_dict()) {
for (auto& observer : observers_)
observer.OnLogsReceived(rawlogs.Clone());
observer.OnLogsReceived(subscription, rawlogs.Clone());
} else {
LOG(ERROR) << "OnGetLogs failed";
}
Expand Down
13 changes: 11 additions & 2 deletions components/brave_wallet/browser/eth_logs_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#ifndef BRAVE_COMPONENTS_BRAVE_WALLET_BROWSER_ETH_LOGS_TRACKER_H_
#define BRAVE_COMPONENTS_BRAVE_WALLET_BROWSER_ETH_LOGS_TRACKER_H_

#include <map>
#include <string>
#include <vector>

Expand Down Expand Up @@ -34,27 +35,35 @@ class EthLogsTracker {

class Observer : public base::CheckedObserver {
public:
virtual void OnLogsReceived(base::Value rawlogs) = 0;
virtual void OnLogsReceived(const std::string& subscription,
base::Value rawlogs) = 0;
};

// If timer is already running, it will be replaced with new interval
void Start(base::TimeDelta interval);
void Stop();
bool IsRunning() const;

void AddSubscriber(const std::string& subscription_id,
const base::Value& params);
void RemoveSubscriber(const std::string& subscription_id);

void AddObserver(Observer* observer);
void RemoveObserver(Observer* observer);

private:
void GetLogs();
void OnGetLogs(const std::vector<Log>& logs,
void OnGetLogs(const std::string& subscription,
const std::vector<Log>& logs,
base::Value rawlogs,
mojom::ProviderError error,
const std::string& error_message);

base::RepeatingTimer timer_;
raw_ptr<JsonRpcService> json_rpc_service_ = nullptr;

std::map<std::string, base::Value> eth_logs_subscription_info_;

base::ObserverList<Observer> observers_;

base::WeakPtrFactory<EthLogsTracker> weak_factory_{this};
Expand Down
65 changes: 54 additions & 11 deletions components/brave_wallet/browser/ethereum_provider_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ void EthereumProviderImpl::RecoverAddress(const std::string& message,
false);
}

void EthereumProviderImpl::EthSubscribe(const std::string& event_type,
void EthereumProviderImpl::EthSubscribe(const base::Value& params,
RequestCallback callback,
base::Value id) {
const auto generateHexBytes = [](std::vector<std::string>& subscriptions) {
Expand All @@ -589,19 +589,61 @@ void EthereumProviderImpl::EthSubscribe(const std::string& event_type,
return std::tuple<bool, std::string>{subscriptions.size() == 1, hex_bytes};
};

if (event_type == kEthSubscribeNewHeads) {
const auto is_event_type = [&](const std::string& eq_val) {
if (!params.is_list()) {
return false;
}

const auto& list = params.GetList();
auto it = std::find_if(list.begin(), list.end(), [&](const auto& item) {
return item.is_string() && eq_val == item.GetString();
});

if (it != list.end()) {
return true;
}

return false;
};

if (is_event_type(kEthSubscribeNewHeads)) {
const auto gen_res = generateHexBytes(eth_subscriptions_);
if (std::get<0>(gen_res)) {
eth_block_tracker_.Start(
base::Seconds(kBlockTrackerDefaultTimeInSeconds));
}
std::move(callback).Run(std::move(id), base::Value(std::get<1>(gen_res)),
false, "", false);
} else if (event_type == kEthSubscribeLogs) {
} else if (is_event_type(kEthSubscribeLogs)) {
const auto gen_res = generateHexBytes(eth_log_subscriptions_);

const auto get_filter_options = [&]() {
if (!params.is_list()) {
return base::Value();
}

const auto& list = params.GetList();
auto it = std::find_if(list.begin(), list.end(), [](const auto& item) {
return item.is_dict() && (nullptr != item.GetDict().Find("address") ||
nullptr != item.GetDict().Find("topics") ||
nullptr != item.GetDict().Find("fromBlock") ||
nullptr != item.GetDict().Find("toBlock") ||
nullptr != item.GetDict().Find("blockHash"));
});
if (it == list.end()) {
return base::Value();
}

return base::Value(it->Clone());
};

if (std::get<0>(gen_res)) {
eth_logs_tracker_.Start(base::Seconds(kLogTrackerDefaultTimeInSeconds));
}

eth_logs_tracker_.AddSubscriber(std::get<1>(gen_res),
std::move(get_filter_options()));

std::move(callback).Run(std::move(id), base::Value(std::get<1>(gen_res)),
false, "", false);
} else {
Expand Down Expand Up @@ -641,9 +683,10 @@ bool EthereumProviderImpl::UnsubscribeBlockObserver(
bool EthereumProviderImpl::UnsubscribeLogObserver(
const std::string& subscription_id) {
if (base::Erase(eth_log_subscriptions_, subscription_id)) {
if (eth_log_subscriptions_.empty())
eth_logs_tracker_.RemoveSubscriber(subscription_id);
if (eth_log_subscriptions_.empty()) {
eth_logs_tracker_.Stop();

}
return true;
}
return false;
Expand Down Expand Up @@ -1236,13 +1279,13 @@ void EthereumProviderImpl::CommonRequestOrSendAsync(base::ValueView input_value,
} else if (method == kWeb3ClientVersion) {
Web3ClientVersion(std::move(callback), std::move(id));
} else if (method == kEthSubscribe) {
std::string event_type;
if (!ParseEthSubscribeParams(normalized_json_request, &event_type)) {
base::Value params;
if (!ParseEthSubscribeParams(normalized_json_request, &params)) {
SendErrorOnRequest(error, error_message, std::move(callback),
std::move(id));
return;
}
EthSubscribe(event_type, std::move(callback), std::move(id));
EthSubscribe(std::move(params), std::move(callback), std::move(id));
} else if (method == kEthUnsubscribe) {
std::string subscription_id;
if (!ParseEthUnsubscribeParams(normalized_json_request, &subscription_id)) {
Expand Down Expand Up @@ -1705,7 +1748,8 @@ void EthereumProviderImpl::OnGetBlockByNumber(

void EthereumProviderImpl::OnNewBlock(uint256_t block_num) {}

void EthereumProviderImpl::OnLogsReceived(base::Value rawlogs) {
void EthereumProviderImpl::OnLogsReceived(const std::string& subscription,
base::Value rawlogs) {
if (!rawlogs.is_dict() || !events_listener_.is_bound()) {
return;
}
Expand All @@ -1718,8 +1762,7 @@ void EthereumProviderImpl::OnLogsReceived(base::Value rawlogs) {
}

for (auto& results_item : *results) {
for (const auto& subscription_id : eth_log_subscriptions_)
events_listener_->MessageEvent(subscription_id, results_item.Clone());
events_listener_->MessageEvent(subscription, results_item.Clone());
}
}

Expand Down
7 changes: 5 additions & 2 deletions components/brave_wallet/browser/ethereum_provider_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class EthereumProviderImpl final
RequestCallback callback,
base::Value id);

void EthSubscribe(const std::string& event_type,
void EthSubscribe(const base::Value& params,
RequestCallback callback,
base::Value id);
void EthUnsubscribe(const std::string& subscription_id,
Expand Down Expand Up @@ -173,6 +173,8 @@ class EthereumProviderImpl final
RequestEthereumPermissionsWithAccounts);
FRIEND_TEST_ALL_PREFIXES(EthereumProviderImplUnitTest, EthSubscribe);
FRIEND_TEST_ALL_PREFIXES(EthereumProviderImplUnitTest, EthSubscribeLogs);
FRIEND_TEST_ALL_PREFIXES(EthereumProviderImplUnitTest,
EthSubscribeLogsFiltered);
friend class EthereumProviderImplUnitTest;

// mojom::BraveWalletProvider:
Expand Down Expand Up @@ -378,7 +380,8 @@ class EthereumProviderImpl final
bool UnsubscribeBlockObserver(const std::string& subscription_id);

// EthLogsTracker::Observer:
void OnLogsReceived(base::Value rawlogs) override;
void OnLogsReceived(const std::string& subscription,
base::Value rawlogs) override;
bool UnsubscribeLogObserver(const std::string& subscription_id);

raw_ptr<HostContentSettingsMap> host_content_settings_map_ = nullptr;
Expand Down
Loading

0 comments on commit df4e7d7

Please sign in to comment.