diff --git a/hubble/hasura/config.yaml b/hubble/hasura/config.yaml index bb7e270490..03fce04079 100644 --- a/hubble/hasura/config.yaml +++ b/hubble/hasura/config.yaml @@ -1,7 +1,7 @@ version: 3 -metadata_directory: metadata -endpoint: http://localhost:8080 +endpoint: https://localhost:8080 admin_secret: secret +metadata_directory: metadata actions: kind: synchronous handler_webhook_baseurl: http://localhost:3000 diff --git a/hubble/hasura/metadata/api_limits.yaml b/hubble/hasura/metadata/api_limits.yaml new file mode 100644 index 0000000000..0967ef424b --- /dev/null +++ b/hubble/hasura/metadata/api_limits.yaml @@ -0,0 +1 @@ +{} diff --git a/hubble/hasura/metadata/backend_configs.yaml b/hubble/hasura/metadata/backend_configs.yaml new file mode 100644 index 0000000000..3a181b59bb --- /dev/null +++ b/hubble/hasura/metadata/backend_configs.yaml @@ -0,0 +1,13 @@ +dataconnector: + athena: + uri: http://localhost:8081/api/v1/athena + mariadb: + uri: http://localhost:8081/api/v1/mariadb + mongodb: + uri: http://localhost:8082 + mysql8: + uri: http://localhost:8081/api/v1/mysql + oracle: + uri: http://localhost:8081/api/v1/oracle + snowflake: + uri: http://localhost:8081/api/v1/snowflake diff --git a/hubble/hasura/metadata/databases/databases.yaml b/hubble/hasura/metadata/databases/databases.yaml index b25fa9ecb0..529da07e2e 100644 --- a/hubble/hasura/metadata/databases/databases.yaml +++ b/hubble/hasura/metadata/databases/databases.yaml @@ -6,4 +6,15 @@ from_env: PG_DATABASE_URL isolation_level: read-committed use_prepared_statements: false + logical_models: + - fields: + - name: receiver + type: + nullable: false + scalar: text + - name: amount + type: + nullable: true + scalar: text + name: coin_received tables: "!include default/tables/tables.yaml" diff --git a/hubble/hasura/metadata/databases/default/tables/public_blocks.yaml b/hubble/hasura/metadata/databases/default/tables/public_blocks.yaml index 772118550c..48aa022221 100644 --- a/hubble/hasura/metadata/databases/default/tables/public_blocks.yaml +++ b/hubble/hasura/metadata/databases/default/tables/public_blocks.yaml @@ -14,9 +14,8 @@ array_relationships: name: events schema: public select_permissions: - - comment: "" + - role: user permission: - allow_aggregations: true columns: - is_finalized - chain_id @@ -25,4 +24,5 @@ select_permissions: - hash filter: {} limit: 50 - role: user + allow_aggregations: true + comment: "" diff --git a/hubble/hasura/metadata/graphql_schema_introspection.yaml b/hubble/hasura/metadata/graphql_schema_introspection.yaml new file mode 100644 index 0000000000..61a4dcac29 --- /dev/null +++ b/hubble/hasura/metadata/graphql_schema_introspection.yaml @@ -0,0 +1 @@ +disabled_for_roles: [] diff --git a/hubble/hasura/metadata/inherited_roles.yaml b/hubble/hasura/metadata/inherited_roles.yaml new file mode 100644 index 0000000000..fe51488c70 --- /dev/null +++ b/hubble/hasura/metadata/inherited_roles.yaml @@ -0,0 +1 @@ +[] diff --git a/hubble/hasura/metadata/metrics_config.yaml b/hubble/hasura/metadata/metrics_config.yaml new file mode 100644 index 0000000000..33b18c3b3b --- /dev/null +++ b/hubble/hasura/metadata/metrics_config.yaml @@ -0,0 +1,2 @@ +analyze_query_variables: true +analyze_response_body: true diff --git a/hubble/hasura/metadata/network.yaml b/hubble/hasura/metadata/network.yaml new file mode 100644 index 0000000000..0967ef424b --- /dev/null +++ b/hubble/hasura/metadata/network.yaml @@ -0,0 +1 @@ +{} diff --git a/hubble/hasura/metadata/opentelemetry.yaml b/hubble/hasura/metadata/opentelemetry.yaml new file mode 100644 index 0000000000..0967ef424b --- /dev/null +++ b/hubble/hasura/metadata/opentelemetry.yaml @@ -0,0 +1 @@ +{} diff --git a/hubble/hasura/migrations/default/1694620457696_init/up.sql b/hubble/hasura/migrations/default/1694700761455_init/up.sql similarity index 98% rename from hubble/hasura/migrations/default/1694620457696_init/up.sql rename to hubble/hasura/migrations/default/1694700761455_init/up.sql index cc88176ae2..0f94a2b676 100644 --- a/hubble/hasura/migrations/default/1694620457696_init/up.sql +++ b/hubble/hasura/migrations/default/1694700761455_init/up.sql @@ -30,7 +30,8 @@ CREATE TABLE public.blocks ( id integer NOT NULL, is_finalized boolean DEFAULT false, created_at timestamp with time zone DEFAULT now() NOT NULL, - updated_at timestamp with time zone DEFAULT now() + updated_at timestamp with time zone DEFAULT now(), + extra_data jsonb ); CREATE SEQUENCE public.blocks_id_seq AS integer diff --git a/hubble/src/graphql/operations.graphql b/hubble/src/graphql/operations.graphql index 31f21f8ce5..d107a22ec7 100644 --- a/hubble/src/graphql/operations.graphql +++ b/hubble/src/graphql/operations.graphql @@ -1,5 +1,6 @@ -mutation InsertBlock($chain_id: Int!, $hash: String!, $height: Int!, $finalized: Boolean!, $events: events_arr_rel_insert_input) { - insert_blocks_one(object: {chain_id: $chain_id, hash: $hash, height: $height, is_finalized: $finalized, events: $events}) { + +mutation InsertBlock($object: blocks_insert_input!) { + insert_blocks_one(object: $object) { id } } diff --git a/hubble/src/graphql/schema.graphql b/hubble/src/graphql/schema.graphql index ee714bd828..d154a72a0f 100644 --- a/hubble/src/graphql/schema.graphql +++ b/hubble/src/graphql/schema.graphql @@ -378,6 +378,10 @@ type blocks { """filter the rows returned""" where: events_bool_exp ): events_aggregate! + extra_data( + """JSON select path""" + path: String + ): jsonb hash: String! height: Int! id: Int! @@ -454,6 +458,11 @@ input blocks_aggregate_order_by { variance: blocks_variance_order_by } +"""append existing jsonb value of filtered columns with new jsonb value""" +input blocks_append_input { + extra_data: jsonb +} + """ input type for inserting array relation for remote table "blocks" """ @@ -492,6 +501,7 @@ input blocks_bool_exp { created_at: timestamptz_comparison_exp events: events_bool_exp events_aggregate: events_aggregate_bool_exp + extra_data: jsonb_comparison_exp hash: String_comparison_exp height: Int_comparison_exp id: Int_comparison_exp @@ -509,6 +519,27 @@ enum blocks_constraint { blocks_pkey } +""" +delete the field or element with specified path (for JSON arrays, negative integers count from the end) +""" +input blocks_delete_at_path_input { + extra_data: [String!] +} + +""" +delete the array element with specified index (negative integers count from the end). throws an error if top level container is not an array +""" +input blocks_delete_elem_input { + extra_data: Int +} + +""" +delete key/value pair or string element. key/value pairs are matched based on their key value +""" +input blocks_delete_key_input { + extra_data: String +} + """ input type for incrementing numeric columns in table "blocks" """ @@ -526,6 +557,7 @@ input blocks_insert_input { chain_id: Int created_at: timestamptz events: events_arr_rel_insert_input + extra_data: jsonb hash: String height: Int id: Int @@ -613,6 +645,7 @@ input blocks_order_by { chain_id: order_by created_at: order_by events_aggregate: events_aggregate_order_by + extra_data: order_by hash: order_by height: order_by id: order_by @@ -625,6 +658,11 @@ input blocks_pk_columns_input { id: Int! } +"""prepend existing jsonb value of filtered columns with new jsonb value""" +input blocks_prepend_input { + extra_data: jsonb +} + """ select columns of table "blocks" """ @@ -635,6 +673,9 @@ enum blocks_select_column { """column name""" created_at + """column name""" + extra_data + """column name""" hash @@ -673,6 +714,7 @@ input type for updating data in table "blocks" input blocks_set_input { chain_id: Int created_at: timestamptz + extra_data: jsonb hash: String height: Int id: Int @@ -743,6 +785,7 @@ input blocks_stream_cursor_input { input blocks_stream_cursor_value_input { chain_id: Int created_at: timestamptz + extra_data: jsonb hash: String height: Int id: Int @@ -776,6 +819,9 @@ enum blocks_update_column { """column name""" created_at + """column name""" + extra_data + """column name""" hash @@ -793,9 +839,30 @@ enum blocks_update_column { } input blocks_updates { + """append existing jsonb value of filtered columns with new jsonb value""" + _append: blocks_append_input + + """ + delete the field or element with specified path (for JSON arrays, negative integers count from the end) + """ + _delete_at_path: blocks_delete_at_path_input + + """ + delete the array element with specified index (negative integers count from the end). throws an error if top level container is not an array + """ + _delete_elem: blocks_delete_elem_input + + """ + delete key/value pair or string element. key/value pairs are matched based on their key value + """ + _delete_key: blocks_delete_key_input + """increments the numeric columns with given value of the filtered values""" _inc: blocks_inc_input + """prepend existing jsonb value of filtered columns with new jsonb value""" + _prepend: blocks_prepend_input + """sets the columns of the filtered rows to the given values""" _set: blocks_set_input @@ -3163,9 +3230,30 @@ type mutation_root { update data of the table: "blocks" """ update_blocks( + """append existing jsonb value of filtered columns with new jsonb value""" + _append: blocks_append_input + + """ + delete the field or element with specified path (for JSON arrays, negative integers count from the end) + """ + _delete_at_path: blocks_delete_at_path_input + + """ + delete the array element with specified index (negative integers count from the end). throws an error if top level container is not an array + """ + _delete_elem: blocks_delete_elem_input + + """ + delete key/value pair or string element. key/value pairs are matched based on their key value + """ + _delete_key: blocks_delete_key_input + """increments the numeric columns with given value of the filtered values""" _inc: blocks_inc_input + """prepend existing jsonb value of filtered columns with new jsonb value""" + _prepend: blocks_prepend_input + """sets the columns of the filtered rows to the given values""" _set: blocks_set_input @@ -3177,9 +3265,30 @@ type mutation_root { update single row of the table: "blocks" """ update_blocks_by_pk( + """append existing jsonb value of filtered columns with new jsonb value""" + _append: blocks_append_input + + """ + delete the field or element with specified path (for JSON arrays, negative integers count from the end) + """ + _delete_at_path: blocks_delete_at_path_input + + """ + delete the array element with specified index (negative integers count from the end). throws an error if top level container is not an array + """ + _delete_elem: blocks_delete_elem_input + + """ + delete key/value pair or string element. key/value pairs are matched based on their key value + """ + _delete_key: blocks_delete_key_input + """increments the numeric columns with given value of the filtered values""" _inc: blocks_inc_input + """prepend existing jsonb value of filtered columns with new jsonb value""" + _prepend: blocks_prepend_input + """sets the columns of the filtered rows to the given values""" _set: blocks_set_input pk_columns: blocks_pk_columns_input! diff --git a/hubble/src/tm.rs b/hubble/src/tm.rs index b2298eb605..d2f53e8b9a 100644 --- a/hubble/src/tm.rs +++ b/hubble/src/tm.rs @@ -1,6 +1,6 @@ use color_eyre::eyre::{bail, Report}; use futures::future::join_all; -use tendermint::{block::Height, genesis::Genesis}; +use tendermint::{block::Height, consensus::Params, genesis::Genesis, validator::Update}; use tendermint_rpc::{ dialect::v0_37::Event, endpoint::block_results::Response as BlockResponse, error::ErrorDetail, response_error::Code, Client, Error, HttpClient, @@ -9,7 +9,7 @@ use tokio::time::{sleep, Duration}; use tracing::{debug, info}; use url::Url; -use crate::{hasura::*, tm::insert_block::EventsArrRelInsertInput}; +use crate::hasura::*; #[derive(Clone, Debug, serde::Deserialize)] pub struct Config { @@ -42,11 +42,11 @@ impl Config { // Fast sync protocol. We sync up to latest.height - batch-size + 1 while let Some(up_to) = should_fast_sync_up_to(&client, Self::BATCH_SIZE, height).await? { - debug!("starting fast sync protocol up to: {}", up_to); + info!("starting fast sync protocol up to: {}", up_to); loop { height = batch_sync(&client, &db, chain_db_id, Self::BATCH_SIZE, height).await?; if height >= up_to { - debug!("re-evaluating fast sync protocol"); + info!("re-evaluating fast sync protocol"); break; // go back to the should_fast_sync_up_to. If this returns None, we continue to slow sync. } } @@ -103,7 +103,7 @@ async fn get_current_data( }) .await?; - let data = dbg!(latest_stored) + let data = latest_stored .data .expect("db should be prepared for indexing"); @@ -158,22 +158,31 @@ async fn batch_sync( sync_next(client, db, chain_db_id, from).await?; } + let min = from.value() as u32; + let max = (from.value() + batch_size as u64) as u32; + debug!("fetching batch of headers from {} to {}", min, max); + let headers = client .blockchain( // Tendermint-rs is buggy, it - from.value() as u32, - (from.value() + batch_size as u64) as u32, + min, max, ) .await?; let objects: Result, Report> = join_all(headers.block_metas.iter().rev().map(|header| async { + debug!("fetching block results for height {}", header.header.height); let block = client.block_results(header.header.height).await?; let events: Vec<_> = block .events() .enumerate() .map(|event| event.into()) .collect(); + debug!( + "found {} events for block {}", + events.len(), + header.header.height + ); Ok(insert_blocks_many::BlocksInsertInput { chain_id: Some(chain_db_id), chain: None, @@ -187,6 +196,7 @@ async fn batch_sync( created_at: None, updated_at: None, is_finalized: Some(true), + extra_data: Some(serde_json::to_value(header.clone())?), }) })) .await @@ -194,6 +204,7 @@ async fn batch_sync( .collect(); let variables = insert_blocks_many::Variables { objects: objects? }; + debug!("inserting batch of blocks"); db.do_post::(variables).await?; Ok((from.value() as u32 + headers.block_metas.len() as u32).into()) } @@ -226,22 +237,29 @@ async fn sync_next( debug!("storing events for block {}", &height); let v = insert_block::Variables { - chain_id: chain_db_id, - hash: header.hash().to_string(), - height: height.into(), - events: Some(EventsArrRelInsertInput { - data: events, - on_conflict: None, - }), - finalized: true, + object: insert_block::BlocksInsertInput { + chain: None, + chain_id: Some(chain_db_id), + created_at: None, + events: Some(insert_block::EventsArrRelInsertInput { + data: events, + on_conflict: None, + }), + hash: Some(header.hash().to_string()), + extra_data: Some(serde_json::to_value(header.clone())?), + height: Some(header.height.into()), + id: None, + is_finalized: Some(true), + updated_at: None, + }, }; db.do_post::(v).await?; Ok(Some(height.increment())) } -impl From<(usize, Event)> for insert_blocks_many::EventsInsertInput { - fn from(value: (usize, Event)) -> Self { +impl From<(usize, StateChange)> for insert_blocks_many::EventsInsertInput { + fn from(value: (usize, StateChange)) -> Self { Self { id: None, index: Some(value.0 as i64), @@ -252,8 +270,8 @@ impl From<(usize, Event)> for insert_blocks_many::EventsInsertInput { } } -impl From<(usize, Event)> for insert_block::EventsInsertInput { - fn from(value: (usize, Event)) -> Self { +impl From<(usize, StateChange)> for insert_block::EventsInsertInput { + fn from(value: (usize, StateChange)) -> Self { Self { id: None, index: Some(value.0 as i64), @@ -265,11 +283,11 @@ impl From<(usize, Event)> for insert_block::EventsInsertInput { } pub trait BlockExt { - fn events(self) -> impl Iterator; + fn events(self) -> impl Iterator; } impl BlockExt for BlockResponse { - fn events(self) -> impl Iterator { + fn events(self) -> impl Iterator { self.begin_block_events .unwrap_or_default() .into_iter() @@ -281,5 +299,110 @@ impl BlockExt for BlockResponse { ) .chain(self.end_block_events.unwrap_or_default()) .chain(self.finalize_block_events) + .map(StateChange::Event) + .chain( + self.validator_updates + .into_iter() + .map(StateChange::validator_update), + ) + .chain( + self.consensus_param_updates + .into_iter() + .map(StateChange::consensus_param_update), + ) + } +} + +#[derive(serde::Serialize)] +#[serde(untagged)] +pub enum StateChange { + Event(Event), + ValidatorUpdate(WithType), + ConsensusUpdate(WithType), +} + +impl StateChange { + fn validator_update(inner: Update) -> Self { + StateChange::ValidatorUpdate(WithType::validator_update(inner)) + } + + fn consensus_param_update(inner: Params) -> Self { + StateChange::ConsensusUpdate(WithType::consensus_param_update(inner)) + } +} + +#[derive(serde::Serialize)] +pub struct WithType { + #[serde(rename = "type")] + kind: &'static str, + #[serde(flatten)] + inner: I, +} + +impl WithType { + fn validator_update(inner: I) -> Self { + WithType { + kind: "validator_update", + inner, + } + } + + fn consensus_param_update(inner: I) -> Self { + WithType { + kind: "consensus_param_update", + inner, + } + } +} + +#[cfg(test)] +mod tests { + use serde::Serialize; + use tendermint::{abci::EventAttribute, vote::Power}; + + use super::*; + + #[test] + fn state_change_serializes_correctly() { + use serde_json::{json, to_value}; + + fn check(t: T, json: serde_json::Value) { + assert_eq!(to_value(t).unwrap(), json) + } + + check( + StateChange::Event(Event { + kind: "foo".to_string(), + attributes: vec![EventAttribute { + index: false, + key: "bar".to_string(), + value: "bax".to_string(), + }], + }), + json!({ + "type": "foo", + "attributes": [ + { + "key": "bar", + "index": false, + "value": "bax", + } + ] + }), + ); + check( + StateChange::validator_update(Update { + pub_key: tendermint::PublicKey::Bn254(Default::default()), + power: Power::from(1_u8), + }), + json!({ + "type": "validator_update", + "power": "1", + "pub_key": { + "type": "tendermint/PubKeyBn254", + "value": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=" + } + }), + ); } }