-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: TAP Agent #83
feat: TAP Agent #83
Conversation
Pull Request Test Coverage Report for Build 6698506386
💛 - Coveralls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work! still need to digest the tap
details, would you mind sharing the flow diagram somewhere? 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should there be a ...tap_receipts.down.sql
migration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's there, I just didn't change the version from main: https://github.com/graphops/indexer-service-rs/blob/407c17d3ecf766c8e37a2b7e4b9b1132136c4a33/migrations/20230912220523_tap_receipts.down.sql
Also, I modified the migrations in place instead of creating new ones. I thought at this stage of development we don't need to care about migrating the database. (tell me if that's the wrong thinking though)
tap_agent/src/agent.rs
Outdated
SubgraphClient::new( | ||
config | ||
.network_subgraph | ||
.network_subgraph_deployment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems complicated for making a subgraph client. Maybe we can later on make an easier creation helper?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the new API introduced in #81
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it looks complicated. Perhaps the DeploymentDetails
I added into the API is making it a little verbose. Might be worth cleaning up a little at some point, but doesn't feel urgent.
tap_agent/src/config.rs
Outdated
)] | ||
pub rav_request_timestamp_buffer_ns: u64, | ||
|
||
// TODO: Remove this whenever the the gateway registry is ready |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do we expect indexers to have this file? does senders register or host a discoverable aggregator endpoint somewhere? would you please provide an example file I could run the service with?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be in the form of a smart contract. That's the current plan/suggestion I got from @Theodus . But no work started on this yet I believe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PS: the file is just to be able to get going for testing, basically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, ok, if you got this from @Theodus, then that answers my earlier question about which contract we're talking about - there's just one. 😄
I think it's fine to keep this file as a temporary approach.
tap_agent/src/config.rs
Outdated
as Receipt Aggregate Voucher (RAV) requests for an indexer." | ||
)] | ||
#[command(author, version, about, long_about = None, arg_required_else_help = true)] | ||
pub struct Cli { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of these are duplicates of indexer-service CLI
, would it make sense to refactor the duplicates to indexer-common
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup! Not sure there is an elegant way to do that though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest leaving these duplicates in for the moment. One thing I'm doing with the indexer-service framework is to have a common config file structure (TOML) and allow indexer service implementations to add their own structure to that (within the same TOML file).
But indexer-service and indexer-agent and tap-agent are all their own processes and I think it's better to keep their interfaces independent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, from now on I'll advocate using as few CLI arguments as possible and moving everything to a config file. It is more secure (env vars are really bad for passing in private keys, mnemonics, passwords) and also easier to manage in systems like terraform, kubernetes etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually in k8s, the preferred way of passing secrets is using env vars. Unless you are willing to integrate your software directly with the k8s or your cloud's secrets API.
My favorite config implementation is that of graph-node
. It uses TOML, but also does something like envsubst
over the file before loading it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://www.cloudtruth.com/blog/the-pitfalls-of-using-environment-variables-for-config-and-secrets suggests that environment variables for secrets is insecure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://www.cloudtruth.com/blog/the-pitfalls-of-using-environment-variables-for-config-and-secrets suggests that environment variables for secrets is insecure.
I'm not sure this really applies to containers though. This is still one of the recommended ways in k8s for ex: https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/
Working on it! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made it 30% through. Will keep going but wanted to drop my comments so far. 😉
tap_agent/src/agent.rs
Outdated
// TODO: replace with a proper implementation once the gateway registry contract is ready | ||
let sender_aggregator_endpoints = aggregator_endpoints::load_aggregator_endpoints( | ||
config.tap.sender_aggregator_endpoints_file.clone(), | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once gateways register somewhere (they will do this for subscriptions — perhaps that's the contract you're referring to here or is there another one for TAP?), this could make for a nice Eventual
in the indexer service. Add the TAP agent gRPC API, pipe the gateways eventual into an eventual that polls the "need a RAV?" status for each gateway periodically and whenever the gateways change, pipe that into code that requests the RAVs and stores them back in the DB. Done? 😁
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Saved your comment in the appropriate issue: #84 (comment)
tap_agent/src/config.rs
Outdated
as Receipt Aggregate Voucher (RAV) requests for an indexer." | ||
)] | ||
#[command(author, version, about, long_about = None, arg_required_else_help = true)] | ||
pub struct Cli { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest leaving these duplicates in for the moment. One thing I'm doing with the indexer-service framework is to have a common config file structure (TOML) and allow indexer service implementations to add their own structure to that (within the same TOML file).
But indexer-service and indexer-agent and tap-agent are all their own processes and I think it's better to keep their interfaces independent.
tap_agent/src/config.rs
Outdated
as Receipt Aggregate Voucher (RAV) requests for an indexer." | ||
)] | ||
#[command(author, version, about, long_about = None, arg_required_else_help = true)] | ||
pub struct Cli { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, from now on I'll advocate using as few CLI arguments as possible and moving everything to a config file. It is more secure (env vars are really bad for passing in private keys, mnemonics, passwords) and also easier to manage in systems like terraform, kubernetes etc.
tap_agent/src/tap/account.rs
Outdated
pub async fn start_last_rav_request(&self) { | ||
*(self.inner.state.lock().await) = State::LastRavPending; | ||
let mut rav_requester_task = self.rav_requester_task.lock().await; | ||
if !Self::rav_requester_task_is_running(&rav_requester_task) { | ||
*rav_requester_task = Some(tokio::spawn(Self::rav_requester(self.inner.clone()))); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a reason to rework this PR but it would be nice to avoid the state machine and all the locking happening in Account
. But I'd need to think how for a bit, not sure right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, it's complex enough that we could have a risk of an unforeseen deadlock.
That was the best I could come up with to make sure that:
- Only one RAV request happens at a time
- The RAV request is async from all the rest
- I can set the "last rav" state asynchronously
- The RAV request can clear the state on its own (while living in its own async bubble)
Sorry, I guess I default back to Electrical Engineer mode in situations like this 😁
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An mpsc::channel
with a size of 1 could perhaps work to ensure there's only one RAV request processed and it's processed somewhere else. Buuuuuut, I'm not sure. I don't think it works, as reading the next RAV request from the receiver is enough to allow the next one in. And you want more coordination to happen there. I'll think about it some more, but this can be improved later if needed.
tap_agent/src/tap/account.rs
Outdated
AND sender_address = $2 | ||
) | ||
SELECT | ||
COUNT(*), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How many receipts do we think this would at max count over? count(*)
and the other aggregations too are pretty slow; I'd like to get an idea of how slow this query could get.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I estimate that for a regular indexer, should be rarely above ~1000s, as there should be only the unaggregated receipts in there.
Also, I believe things could be optimized such that this query would happen only once at TAP Agent start.
tap_agent/src/tap/account.rs
Outdated
// `COUNT(*)` will always return a value, so we don't need to check for `None`. | ||
match res.count.unwrap() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually think something like the below might be preferable to querying count(*)
only to avoid the check for None
:
unaggregated_fees.last_id = res.max.unwrap_or(0).try_into()?;
unaggregated_fees.value = res.sum.unwrap_or(0).to_string().parse::<u128>()?;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The actual reason I wanted the count was to run those checks:
indexer-rs/tap_agent/src/tap/account.rs
Lines 263 to 270 in f7447f7
ensure!( | |
res.max.is_some(), | |
"MAX(id) is null but the receipt COUNT(*) is not zero" | |
); | |
ensure!( | |
res.sum.is_some(), | |
"SUM(value) is null, but the receipt COUNT(*) is not zero" | |
); |
That's indeed a bit overzealous, and a check like this should be more than enough:
ensure!(
res.sum.is_none() == res.max.is_none(),
"Exactly one of SUM(value) and MAX(id) is null. This should not happen."
);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 8515c03
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! (And good point about the checks, I missed that in my oversimplified suggestion.)
tap_agent/src/tap/account.rs
Outdated
} | ||
|
||
async fn rav_requester_try(inner: &Arc<Inner>) -> anyhow::Result<()> { | ||
loop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to this: do we make sure that we don't request multiple RAVs for the same receipts in parallel if a new receipt comes in before the RAV comes back and we still have unaggregated fees > threshold?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
We do make sure that we don't do RAV requests in parallel for any given (sender,allocation), by checking
Self::rav_requester_task_is_running
before spawning a RAV request task. Could be made more elegant I think. -
Each RAV request must contain the last RAV (unless it is the first RAV request). We basically aggregate the receipts into the latest RAV we got, and we get a new RAV out of that. That's why it is very important the the whole RAV request dance is serial.
-
The TapManager will only aggregate receipts that have a timestamp more recent than the last RAV (though note that we are OK with multiple receipts having the same timestamp, they also have a nonce). To make sure that there is no confusion with the new receipts coming in while we're requesting a RAV, the TapManager will only aggregate receipts up to timestamps that are more than
timestamp_buffer
old: https://github.com/semiotic-ai/timeline-aggregation-protocol/blob/656fc15700f6d29299d78f9c790539cbddb68d02/tap_core/src/tap_manager/manager.rs#L179One issue I can see here is that we are not checking (in indexer-service) that new receipts have a reasonably accurate timestamp (let's say within 5 seconds of our own clock). So as it is right now it is not really safe :/.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be useful to track locking this in better via an issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, created and issue: #92
tap_agent/src/tap/account.rs
Outdated
// TODO: Request compression and response decompression. Also a fancy user agent? | ||
let client = HttpClientBuilder::default().build(&inner.sender_aggregator_endpoint)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious: Say you receive 5000 receipts/s from a gateway, each for a value of $0.0001
and your threshold is $100
. 5000 receipts/s make for $0.5/s. So you'd hit the threshold after 200s, at which point you'll have 1M receipts to aggregate.
My questions would be:
- How long will the SQL query to calculate the sum of the 1M unaggregated receipts take roughly?
- How many MBs of payload is this to send to the gateway?
- How long will it take the gateway to aggregate 1M receipts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We set some standards here: https://github.com/semiotic-ai/timeline-aggregation-protocol/blob/main/tap_aggregator/README.md
We recommend a max of 15,000 receipts / RAV request.
So here are the missing bits currently:
- In tap_core's Manager, we should be able to specify a maximim number of receipts / RAV request
- In TAP agent, we should trigger RAV requests not only a value, but also a number of receipts (< 15,000)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like it would be good to have an issue for this. 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's say we had 500 indexers all receiving 5,000 receipts/s from a single gateway (the numbers are higher than we'd see them right now, but one day...), this gateway would have 167 RAVs to generate for 15,000 receipts each, per second. Would it be able to keep up? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created issue #96
Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
Was `scalar_tap_latest_ravs` everywhere except in the down migration. Opted for the shorter of the 2 everywhere. Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
Instead of underscores. Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
In particular, simplify handling of NULL results from the DB. Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
c734829
to
446bde2
Compare
Rebased onto latest main (at least so that the tests don't fail) |
Pull Request Test Coverage Report for Build 6857909753
💛 - Coveralls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overall structure makes sense to me. Would you be able to provide a list (maybe in docs or something) that tracks todo/unimplemented items so we can have a better view of the progress? I think we should also provide a link to the docs describing the lifetime of a receipt/rav
type AdapterError = AdapterError; | ||
|
||
/// We don't need this method in TAP Agent. | ||
async fn store_receipt(&self, _receipt: ReceivedReceipt) -> Result<u64, Self::AdapterError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What component calls this function and update_receipt_by_id
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It the TapManager from tap_core: https://github.com/semiotic-ai/timeline-aggregation-protocol/blob/3d247c06380038d0a871a7427ce00e0406909160/tap_core/src/tap_manager/manager.rs#L80-L111.
But we don't need those since the storage of receipts is done only in service
, and in a mush simplified way, so it doesn't even leverage the TapManager at all.
tap_agent/src/tap/account.rs
Outdated
let updated_rows = sqlx::query!( | ||
r#" | ||
UPDATE scalar_tap_latest_ravs | ||
SET is_last = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should call this "closing RAV" or the "final RAV" or simply "is_last_for_allocation_and_sender" to be very explicit. Something like that. "last" can have so many meanings: the last received, the last in some other ordering (I first wanted to comment that we can find the last easily and quickly via the timestamp, but then I realized last has a different meaning here), the last for the allocation/sender...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 1178c63
tap_agent/src/agent.rs
Outdated
SubgraphClient::new( | ||
config | ||
.network_subgraph | ||
.network_subgraph_deployment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it looks complicated. Perhaps the DeploymentDetails
I added into the API is making it a little verbose. Might be worth cleaning up a little at some point, but doesn't feel urgent.
tap_agent/src/config.rs
Outdated
as Receipt Aggregate Voucher (RAV) requests for an indexer." | ||
)] | ||
#[command(author, version, about, long_about = None, arg_required_else_help = true)] | ||
pub struct Cli { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://www.cloudtruth.com/blog/the-pitfalls-of-using-environment-variables-for-config-and-secrets suggests that environment variables for secrets is insecure.
tap_agent/src/tap/account.rs
Outdated
unaggregated_fees.last_id = new_receipt_notification.id; | ||
|
||
let mut rav_requester_task = self.rav_requester_task.lock().await; | ||
// TODO: consider making the trigger per sender, instead of per (sender, allocation). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potentially. Now that you talk about it like it's a scheduling algorithm, I'm thinking we should perhaps merge the logic as is and play with different "algorithms" using realistic amounts of queries, receipts and thresholds. But that's more about optimizing RAV requests in the face of millons of receipts than it is about how an indexer would define a threshold. Hm...
tap_agent/src/tap/account.rs
Outdated
// TODO: Request compression and response decompression. Also a fancy user agent? | ||
let client = HttpClientBuilder::default().build(&inner.sender_aggregator_endpoint)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's say we had 500 indexers all receiving 5,000 receipts/s from a single gateway (the numbers are higher than we'd see them right now, but one day...), this gateway would have 167 RAVs to generate for 15,000 receipts each, per second. Would it be able to keep up? 🤔
Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
For the progress, I created a milestone here: https://github.com/graphprotocol/indexer-rs/milestone/1. I'm going to add all the todos there. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good to go from my point of view. Thanks for addressing the feedback so quickly!
Thanks for taking the time to go through the big PR 🙏 ! |
Sorry, it's a big one. Perhaps we could set up a call to discuss this PR.
The TAP Agent is a new daemon that indexers will have to run alongside the indexer-service instances. It:
What it should, but does not do yet:
TODO
in the code...I'm not particularly proud of the overall design, and it also shone some light onto design shortcomings in
tap_core
that we did not foresee while designing it "in a vacuum". That's why there is some weirdness in the TAP adapters implementation such as https://github.com/graphops/indexer-service-rs/blob/20e48b1191a8d39f02e32200ef75fc065470cdd6/tap_agent/src/tap/receipt_checks_adapter.rs#L64-L76.My goal here is to get something through the door so that we can continue improving this incrementally together. I propose that we try moving as many review issues into actual issues that we tackle later.