Skip to content

Commit

Permalink
fix!: Only load schemas for allocated subgraphs
Browse files Browse the repository at this point in the history
BREAKING CHANGE: autoagora-processor needs to communicate with the
indexer-agent API (--indexer-agent-mgmt-endpoint) instead of
graph-node's PG DB (--graph-postgres-... args are gone).

Fixes #33

Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
  • Loading branch information
aasseman committed Oct 30, 2023
1 parent 7ebc4b1 commit 2a24173
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 115 deletions.
82 changes: 43 additions & 39 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,61 +35,65 @@ Because it is a stateless service, and only consumes from a RabbitMQ queue and a
Autoagora Processor can safely be dynamically scaled horizontally. Note however that it has been observed processing
upwards of 50 queries per second on a single CPU core.

Note that it also needs to connect to the `graph-node`'s database to determine the currently indexed subgraphs, and then
pull their schemas using the `graph-node` query endpoint.
Note that it also needs to connect to the `indexer-agent`'s management API to determine the indexer's current
allocations, and then pull their schemas using the `graph-node` query endpoint.

Configuration:

```txt
usage: autoagora-processor [-h] --graph-postgres-host GRAPH_POSTGRES_HOST --graph-postgres-database
GRAPH_POSTGRES_DATABASE --graph-postgres-username GRAPH_POSTGRES_USERNAME
--graph-postgres-password GRAPH_POSTGRES_PASSWORD --graph-node-query-endpoint
GRAPH_NODE_QUERY_ENDPOINT --rabbitmq-host RABBITMQ_HOST
[--rabbitmq-queue-name RABBITMQ_QUEUE_NAME] [--rabbitmq-queue-limit RABBITMQ_QUEUE_LIMIT]
[--rabbitmq-exchange-name RABBITMQ_EXCHANGE_NAME] [--rabbitmq-username RABBITMQ_USERNAME]
[--rabbitmq-password RABBITMQ_PASSWORD] --postgres-host POSTGRES_HOST --postgres-database
POSTGRES_DATABASE --postgres-username POSTGRES_USERNAME --postgres-password
POSTGRES_PASSWORD [--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}]
usage: autoagora-processor [-h] [--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}] --rabbitmq-host
RABBITMQ_HOST [--rabbitmq-queue-name RABBITMQ_QUEUE_NAME]
[--rabbitmq-queue-limit RABBITMQ_QUEUE_LIMIT]
[--rabbitmq-exchange-name RABBITMQ_EXCHANGE_NAME]
[--rabbitmq-username RABBITMQ_USERNAME]
[--rabbitmq-password RABBITMQ_PASSWORD] --postgres-host POSTGRES_HOST
--postgres-database POSTGRES_DATABASE --postgres-username
POSTGRES_USERNAME --postgres-password POSTGRES_PASSWORD
[--postgres-port POSTGRES_PORT] --graph-node-query-endpoint
GRAPH_NODE_QUERY_ENDPOINT --indexer-agent-mgmt-endpoint
INDEXER_AGENT_MGMT_ENDPOINT
options:
-h, --help show this help message and exit
--graph-postgres-host GRAPH_POSTGRES_HOST
URL of the postgres instance use by the graph-nodes. [env var: GRAPH_POSTGRES_HOST] (default:
None)
--graph-postgres-database GRAPH_POSTGRES_DATABASE
Name of the graph-node database. [env var: GRAPH_POSTGRES_DATABASE] (default: None)
--graph-postgres-username GRAPH_POSTGRES_USERNAME
Username for the graph-node databse. [env var: GRAPH_POSTGRES_USERNAME] (default: None)
--graph-postgres-password GRAPH_POSTGRES_PASSWORD
Password for the graph-node database. [env var: GRAPH_POSTGRES_PASSWORD] (default: None)
--graph-node-query-endpoint GRAPH_NODE_QUERY_ENDPOINT
URL of the indexer's graph-node GraphQL query endpoint. [env var: GRAPH_NODE_QUERY_ENDPOINT]
(default: None)
--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}
[env var: LOG_LEVEL] (default: WARNING)
--rabbitmq-host RABBITMQ_HOST
Hostname of the RabbitMQ server used for queuing the GQL logs. [env var: RABBITMQ_HOST]
(default: None)
Hostname of the RabbitMQ server used for queuing the GQL logs. [env var:
RABBITMQ_HOST] (default: None)
--rabbitmq-queue-name RABBITMQ_QUEUE_NAME
Name of the RabbitMQ queue to pull query-node logs from. [env var: RABBITMQ_QUEUE_NAME]
(default: gql_logs_processor)
Name of the RabbitMQ queue to pull query-node logs from. [env var:
RABBITMQ_QUEUE_NAME] (default: gql_logs_processor)
--rabbitmq-queue-limit RABBITMQ_QUEUE_LIMIT
Size limit of the created RabbitMQ queue. It is discouraged to change that value while the
system is running, because it requires manual destruction of the queue and a restart of the
whole Auto Agora stack. [env var: RABBITMQ_QUEUE_LIMIT] (default: 1000)
Size limit of the created RabbitMQ queue. It is discouraged to change that
value while the system is running, because it requires manual destruction
of the queue and a restart of the whole Auto Agora stack. [env var:
RABBITMQ_QUEUE_LIMIT] (default: 1000)
--rabbitmq-exchange-name RABBITMQ_EXCHANGE_NAME
Name of the RabbitMQ exchange query-node logs are pushed to. [env var: RABBITMQ_EXCHANGE_NAME]
(default: gql_logs)
Name of the RabbitMQ exchange query-node logs are pushed to. [env var:
RABBITMQ_EXCHANGE_NAME] (default: gql_logs)
--rabbitmq-username RABBITMQ_USERNAME
Username to use for the GQL logs RabbitMQ queue. [env var: RABBITMQ_USERNAME] (default: guest)
Username to use for the GQL logs RabbitMQ queue. [env var:
RABBITMQ_USERNAME] (default: guest)
--rabbitmq-password RABBITMQ_PASSWORD
Password to use for the GQL logs RabbitMQ queue. [env var: RABBITMQ_PASSWORD] (default: guest)
Password to use for the GQL logs RabbitMQ queue. [env var:
RABBITMQ_PASSWORD] (default: guest)
--postgres-host POSTGRES_HOST
Host of the postgres instance storing the logs. [env var: POSTGRES_HOST] (default: None)
Host of the postgres instance storing the logs. [env var: POSTGRES_HOST]
(default: None)
--postgres-database POSTGRES_DATABASE
Name of the logs database. [env var: POSTGRES_DATABASE] (default: None)
--postgres-username POSTGRES_USERNAME
Username for the logs databse. [env var: POSTGRES_USERNAME] (default: None)
Username for the logs database. [env var: POSTGRES_USERNAME] (default:
None)
--postgres-password POSTGRES_PASSWORD
Password for the logs database. [env var: POSTGRES_PASSWORD] (default: None)
--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}
[env var: LOG_LEVEL] (default: WARNING)
Password for the logs database. [env var: POSTGRES_PASSWORD] (default:
None)
--postgres-port POSTGRES_PORT
Port for the logs database. [env var: POSTGRES_PORT] (default: 5432)
--graph-node-query-endpoint GRAPH_NODE_QUERY_ENDPOINT
URL of the indexer's graph-node GraphQL query endpoint. [env var:
GRAPH_NODE_QUERY_ENDPOINT] (default: None)
--indexer-agent-mgmt-endpoint INDEXER_AGENT_MGMT_ENDPOINT
URL to the indexer-agent management GraphQL endpoint. [env var:
INDEXER_AGENT_MGMT_ENDPOINT] (default: None)
```
33 changes: 4 additions & 29 deletions autoagora_processor/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,38 +130,13 @@ def init_config():
help="URL of the indexer's graph-node GraphQL query endpoint.",
)
#
# Indexer Agent API endpoint
#
#
argsparser.add_argument(
"--graph-postgres-host",
env_var="GRAPH_POSTGRES_HOST",
required=True,
help="URL of the postgres instance use by the graph-nodes.",
)
argsparser.add_argument(
"--graph-postgres-database",
env_var="GRAPH_POSTGRES_DATABASE",
required=True,
help="Name of the graph-node database.",
)
argsparser.add_argument(
"--graph-postgres-username",
env_var="GRAPH_POSTGRES_USERNAME",
"--indexer-agent-mgmt-endpoint",
env_var="INDEXER_AGENT_MGMT_ENDPOINT",
required=True,
help="Username for the graph-node databse.",
)
argsparser.add_argument(
"--graph-postgres-password",
env_var="GRAPH_POSTGRES_PASSWORD",
required=True,
help="Password for the graph-node database.",
)
argsparser.add_argument(
"--graph-postgres-port",
env_var="GRAPH_POSTGRES_PORT",
default=5432,
required=False,
help="Port for the graph-node database.",
help="URL to the indexer-agent management GraphQL endpoint.",
)
argsparser.parse_args(namespace=args)

Expand Down
42 changes: 0 additions & 42 deletions autoagora_processor/db_utils.py

This file was deleted.

50 changes: 50 additions & 0 deletions autoagora_processor/indexer_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2023-, Semiotic AI, Inc.
# SPDX-License-Identifier: Apache-2.0

import logging
from typing import Mapping, Optional, Set

import backoff
from gql import Client, gql
from gql.transport.requests import RequestsHTTPTransport
from requests import RequestException

from autoagora_processor.config import args


@backoff.on_exception(backoff.expo, RequestException, max_time=30, logger=logging.root)
def query_indexer_agent(query: str, variables: Optional[Mapping] = None):
client = Client(
transport=RequestsHTTPTransport(args.indexer_agent_mgmt_endpoint),
fetch_schema_from_transport=False,
)
result = client.execute(gql(query), variable_values=variables) # type: ignore
return result


def get_network_allocated_subgraphs(network: str) -> Set[str]:
"""Get the indexer's subgraph allocations for the given Graph network."""

result = query_indexer_agent(
"""
query ($protocolNetwork: String!) {
indexerAllocations (protocolNetwork: $protocolNetwork) {
subgraphDeployment
}
}
""",
variables={
"protocolNetwork": network,
},
)

return set(e["subgraphDeployment"] for e in result["indexerAllocations"])


def get_allocated_subgraphs() -> Set[str]:
"""Get the indexer's subgraph allocations for all Graph networks."""

networks = ("mainnet", "arbitrum-one")
results = map(get_network_allocated_subgraphs, networks)

return set.union(*results)
1 change: 0 additions & 1 deletion autoagora_processor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from hashlib import blake2b
from typing import Any, List, Mapping, Tuple, Union

import configargparse
import pika
from graphql import GraphQLSchema, parse
from graphql.utilities import strip_ignored_characters as gql_strip_ignored_characters
Expand Down
7 changes: 3 additions & 4 deletions autoagora_processor/subgraph_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from collections.abc import Mapping
from typing import Dict, Iterator, Optional

import configargparse
from gql import Client, gql
from gql.transport.exceptions import TransportQueryError
from gql.transport.requests import RequestsHTTPTransport
Expand All @@ -17,7 +16,7 @@
)

from autoagora_processor.config import args
from autoagora_processor.db_utils import get_indexed_subgraphs
from autoagora_processor.indexer_api import get_allocated_subgraphs


class SubgraphSchemas(Mapping):
Expand All @@ -26,7 +25,7 @@ def __init__(self) -> None:

self.schemas: Dict[str, Optional[GraphQLSchema]] = {
subgraph: self.get_schema_from_graph_node(subgraph)
for subgraph in get_indexed_subgraphs()
for subgraph in get_allocated_subgraphs()
}
logging.info("Added schemas for %s subgraphs", len(self.schemas))

Expand Down Expand Up @@ -70,7 +69,7 @@ def get_schema_from_graph_node(subgraph_ipfs_hash: str) -> Optional[GraphQLSchem

def __getitem__(self, subgraph_ipfs_hash: str) -> Optional[GraphQLSchema]:
if subgraph_ipfs_hash not in self.schemas.keys():
assert subgraph_ipfs_hash in get_indexed_subgraphs()
assert subgraph_ipfs_hash in get_allocated_subgraphs()
new_schema = self.get_schema_from_graph_node(subgraph_ipfs_hash)
self.schemas[subgraph_ipfs_hash] = new_schema
if new_schema:
Expand Down

0 comments on commit 2a24173

Please sign in to comment.