Skip to content

Commit

Permalink
docs: [skip ci] code annotations and doc updates (#421)
Browse files Browse the repository at this point in the history
* docs(hydra-indexer): update hydra-indexer.md

* docs(hydra-processor): update env variables

* docs(hydra-processor): annotate processor classes (WIP)

affects: @dzlzv/hydra-processor
  • Loading branch information
dzhelezov authored Jun 16, 2021
1 parent 401fdec commit 65b37a4
Show file tree
Hide file tree
Showing 14 changed files with 260 additions and 22 deletions.
18 changes: 15 additions & 3 deletions hydra-indexer.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,18 @@ The indexer is set up using the following environment variables
| DB\_PASS | - | **Yes** | Database password |
| DB\_LOGGING | `error` | No | TypeORM logging level |
| TYPES\_JSON | - | No | Path to a JSON type definition with custom Substrate types |
| SPEC\_TYPES | - | No | Path to JSON with spec-level type definitions |
| CHAIN\_TYPES | - | No | Path to JSON with chain-level type definitions |
| BUNDLE\_TYPES | - | No | Path to JSON with bundle type definitions |
| BLOCK\_HEIGHT | 0 | No | Block height to start indexing. Ignored if the database already contains indexed blocks |

### Manual setup

Prerequisites:

* Postgres 12 instance accepting connections
* Redis instance accepting connections

Run

```bash
Expand All @@ -58,6 +66,8 @@ Then run the indexer \(make sure that all the required environment variables are
docker run -e ... joystream/hydra-indexer
```

Examples for Joystream, Subsocial and node-template networks with complete docker-compose stacks are available in `examples/indexers`.

## Advanced environment variables

Some optional environment variables are available for fine-tuning.
Expand All @@ -67,7 +77,9 @@ Some optional environment variables are available for fine-tuning.
| BLOCK\_CACHE\_TTL\_SEC | `60*60` | TTL for processed blocks in the Redis cache |
| INDEXER\_HEAD\_TTL\_SEC | `60*15` | TTL for the indexer head block entry |
| WORKERS\_NUMBER | 5 | Number of concurrent workers fetching the blocks |
| BLOCK\_PRODUCER\_FETCH\_RETRIES | 3 | Number of attempts fetching each a block before throwing an error. Set to `-1` for indefinite attempts |
| BLOCK\_PRODUCER\_FETCH\_RETRIES | 3 | Number of attempts fetching a block before throwing an error. Set to `-1` for infinite retries |
| SUBSTRATE\_API\_TIMEOUT | `1000 * 60 * 5` | Timeout in \(milliseconds\) for API calls |
| NEW\_BLOCK\_TIMEOUT\_MS | `60 * 10 * 1000` | Panic if no blockchain blocks have been received within this time |

| SUBSTRATE\_API\_CALL\_RETRIES | `5` | Number of times an API call is retried before giving up and throwing and error |
| NEW\_BLOCK\_TIMEOUT\_MS | `60 * 10 * 1000` | Panic if no blocks were received within this time |
| HEADER\_CACHE\_CAPACITY | `100` | Number of finalized block headers retained in in-memory cache |
| FINALITY\_THRESHOLD | `5` | Number of block confirmations to consider a block final. Lower thresholds reduce the indexing latency |
11 changes: 8 additions & 3 deletions hydra-processor.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Hydra processor is a client-side 'sink' tool used to fetch substrate events from
display help for hydra-processor

```text
display help for <%= config.bin %>
display help
USAGE
$ hydra-processor help [COMMAND]
Expand All @@ -30,7 +30,7 @@ _See code:_ [_@oclif/plugin-help_](https://github.com/oclif/plugin-help/blob/v2.
## `hydra-processor migrate`

```text
undefined
Run migrations to setup the schema required for the processor operations
USAGE
$ hydra-processor migrate
Expand All @@ -42,7 +42,7 @@ OPTIONS
## `hydra-processor run`

```text
undefined
Run the processor
USAGE
$ hydra-processor run
Expand Down Expand Up @@ -83,6 +83,11 @@ Hydra processor requires a manifest file and certain environment variables to be
| DB\_PASS | - | **Yes** | Database password |
| PROMETHEUS\_PORT | 3000 | **No** | A prometheus metrics endpoint is started at this port |
| POLL\_INTERVAL\_MS | 1 sec \(60000 msec\) | **No** | How often the processor polls the indexer for new blocks |
| MIN\_BLOCKS\_AHEAD | 0 | **No** | The processor waits until the indexer is ahead by at least that number of blocks |
| BATCH_SIZE | 1000 | **No** | Batch size, used to gauge the size of the event and processing queue. Bigger values increase the startup time, but reduce the average processing time per event |
| BLOCK_CACHE_CAPACITY | 10000 | **No** | Number of blocks retained in in-memory cache |
| DEBUG | `hydra-indexer:*` | **No** | Debug template |


The required variables can either be set externally or loaded from a file using the `-e` flag, e.g.:

Expand Down
3 changes: 3 additions & 0 deletions packages/hydra-processor/src/executor/IMappingExecutor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { BlockData } from '../queue'

/**
* An interfaces responsible for processing a block with mappable events
*/
export interface IMappingExecutor {
executeBlock(
blockCtx: BlockData,
Expand Down
3 changes: 3 additions & 0 deletions packages/hydra-processor/src/executor/IMappingsLookup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ export interface BlockMappings {
mappings: MappingHandler[]
}

/**
* An interface for looking up the right handler for the given data and proving an execution context for it
*/
export interface IMappingsLookup {
lookupHandlers(ctx: BlockData): BlockMappings

Expand Down
30 changes: 28 additions & 2 deletions packages/hydra-processor/src/ingest/GraphQLSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import { compact } from 'lodash'

const debug = Debug('hydra-processor:graphql-source')

type SubstrateType = SubstrateBlock | SubstrateEvent | SubstrateExtrinsic
// a union of all substrate type
type SubstrateType = SubstrateBlock & SubstrateEvent & SubstrateExtrinsic

const REVIVE_SUBSTRATE_FIELDS: Partial<
{
Expand All @@ -41,6 +42,9 @@ query {
}
`

/**
* A concrete implementation of `IProcessorSource` which sends GraphQL queries to a indexer-gateway
*/
export class GraphQLSource implements IProcessorSource {
private graphClient: GraphQLClient
private blockCache: FIFOCache<number, SubstrateBlock>
Expand Down Expand Up @@ -97,6 +101,12 @@ export class GraphQLSource implements IProcessorSource {
}
}

/**
* Batch execution of indexer queries. Useful when we need to fetch different data in one go
*
* @param queries - a set of named queries to execute
* @returns an aggegated result of the queries as returned by the indexer
*/
executeQueries<T>(
queries: {
[K in keyof T]: GraphQLQuery<T[K]>
Expand All @@ -121,6 +131,12 @@ export class GraphQLSource implements IProcessorSource {
return this.blockCache.get(blockNumber) as SubstrateBlock
}

/**
* Queries the indexer and fetches all the relavant data on the blocks of given heights
*
* @param heights - an array of block heights to be queried
* @returns `SubstrateBlock` data from the indexer
*/
async fetchBlocks(heights: number[]): Promise<SubstrateBlock[]> {
if (conf().VERBOSE) debug(`Fetching blocks: ${JSON.stringify(heights)}`)

Expand Down Expand Up @@ -169,6 +185,11 @@ export class GraphQLSource implements IProcessorSource {
return [...cached, ...result.blocks].sort()
}

/**
*
* @param query - a query to fetch
* @returns a typed response with revived BigInt/BN fields (as they are not deserialized properly by default)
*/
private requestSubstrateData<T>(query: string): Promise<T> {
// TODO: use timeouts?
return this.request<T, SubstrateType>(query, REVIVE_SUBSTRATE_FIELDS)
Expand Down Expand Up @@ -208,7 +229,12 @@ export function collectQueries(queries: {
}`
}

// FIXME: refactor into a generic GraphQL query builder
// FIXME: refactor using GraphQL query builder
/**
*
* @param param0 - an `IndexerQuery` object containing all the filtering information to query the indexer
* @returns a query string to be passed to `GraphQLClient`
*/
export function getEventsGraphQLQuery({
event,
extrinsic,
Expand Down
51 changes: 50 additions & 1 deletion packages/hydra-processor/src/ingest/IProcessorSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ export type AsJson<T> = T extends
: never

export type QueryFields<T> = Array<QueryField<T>>

/**
* Query for fetching events
* Query interface for fetching events from the indexer
* - strictly after event with IDs > afterID (if present)
* - with name in names
* - block between fromBlock and toBlock (inclusive)
Expand All @@ -117,30 +118,78 @@ export interface IndexerQuery {
limit?: number
}

/**
* An typed interface for a generic indexer query
*/
export interface GraphQLQuery<T> {
/**
* query name as described by the schema of the indexer-gateway
*/
name: string

/**
* query filters, limit and ordering info
*/
query: QueryFilter<T>

/**
* query fields to fetch (including nested field)
*/
fields: QueryFields<T>
}

/**
* A service incapsulating communication with the indexer-gateway
*/
export interface IProcessorSource {
/**
* Run miltiple indexer queries in a single request.
* Should be deprecated in favour of a more general `executeQueries` method
*
* @param queries - a set of queries to run
*/
nextBatch<T>(
queries: {
[K in keyof T]: IndexerQuery
}
): Promise<{ [K in keyof typeof queries]: SubstrateEvent[] }>

/**
* Run multiple generic indexer queries in a single request
*
* @param queries - a set of queries to run
*/
executeQueries<T>(
queries: {
[K in keyof T]: GraphQLQuery<T[K]>
}
): Promise<{ [K in keyof T]: (T[K] & AsJson<T[K]>)[] }>

/**
* Get the current indexer state by running a `status` query
*/
getIndexerStatus(): Promise<IndexerStatus>

/**
* TODO: this is not implemented yet, and currently the processor is poll-only.
* In the future, the processor should rely both on polling and subscriptions to reduce
* the latency between the indexer and the processor
*
* @param events - a list of events the processor is listening
*/
subscribe(events: string[]): Promise<void>

/**
* Fetch block data for a given block
*
* @param blockNumber - block height to get the data
*/
getBlock(blockNumber: number): Promise<SubstrateBlock>

/**
* Fetch block data for multiple heights in a single batch
*
* @param heights - an array of block heights
*/
fetchBlocks(heights: number[]): Promise<SubstrateBlock[]>
}
39 changes: 39 additions & 0 deletions packages/hydra-processor/src/ingest/graphql-query-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ export function collectFieldClauses<T>(where: ObjectFilter<T>): string {
return `${compact(whereBody).join(', ')}`
}

/**
*
* @param where - filter object
* @returns the `where` part of the GQL query
*/
export function buildWhere<T>(where: ObjectFilter<T>): string {
// let whereBody: string[] = Object.keys(where).reduce((acc, f) => {
// acc.push(...clauses(f, where[f as keyof typeof where]))
Expand Down Expand Up @@ -127,6 +132,11 @@ export function buildQueryFields<T>(fields: QueryFields<T>): string {
return format(output)
}

/**
*
* @param orderBy - order by object
* @returns an orderBy part of the query, following the OpenCRUD standard
*/
export function formatOrderBy(
orderBy:
| Partial<{
Expand All @@ -150,6 +160,20 @@ export function formatOrderBy(
return `orderBy: ${field}_${suffix}`
}

/**
* Builds a gql query string based off the `GraphQLQuery` object, following the OpenCRUD standard.
* Essentially it follows the tempalte
* ```
* query {
* ${name} (where: { <filtering parts> }, <limit parts>, <order by parts>) {
* <fields>
* }
* }
* ```
*
* @param - a `GraphQLQuery` object representing the query
* @returns a string that can be used with any GraphQL client
*/
export function buildQuery<T>({
name,
query: { where, limit, orderBy },
Expand All @@ -166,6 +190,21 @@ export function buildQuery<T>({
}`)
}

/**
* Builds multple named queries off `GraphQLQuery` and builds an aggregated
* query of the form
*
* ```
* query {
* <name1>: <first query>,
* <name2>: <second query>,
* ...
* }
* ```
*
* @param queries
* @returns a string to be passed to any `GraphQLClient>
*/
export function collectNamedQueries<T>(
queries: {
[K in keyof T]: GraphQLQuery<T[K]>
Expand Down
Loading

0 comments on commit 65b37a4

Please sign in to comment.