Skip to content

Commit

Permalink
ENG-1287 remove metadata copy from SQL artifact (#1348)
Browse files Browse the repository at this point in the history
<!-- The PR description should answer 2 important questions: -->

### What

Remove copies of metadata from SQL artifacts:

Resulting in **19% smaller artifact** for chinook, **17% faster
runtime**.
Customer metadata with multiple subgraphs will show more dramatic
effect.

There was a separate copy of the metadata stored for each subgraph in
the sql "catalog". From the point of view of MBS we were serializing 2+N
copies of the same metadata which bloats artifacts and is very slow.

Additionally the usage sites became confusing because you had multiple
identical (we assume) copies of metadata in scope at the same time.

Instead we reconstruct the original Catalog type before use, because
it's required for the Datafusion impls

### How

convert to a different type for serialization, re-hydrate in body of sql
code

V3_GIT_ORIGIN_REV_ID: 0e7e35255cfe8fe01ea328a1d7cb96db0e2dd726
  • Loading branch information
jberryman authored and hasura-bot committed Nov 18, 2024
1 parent 5dd7edc commit 426a834
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 49 deletions.
19 changes: 13 additions & 6 deletions v3/crates/engine/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use metadata_resolve::{data_connectors::NdcVersion, LifecyclePluginConfigs};
use open_dds::session_variables::{SessionVariableName, SESSION_VARIABLE_ROLE};
use pretty_assertions::assert_eq;
use serde_json as json;
use sql::catalog::CatalogSerializable;
use sql::execute::SqlRequest;
use std::collections::BTreeMap;
use std::iter;
Expand Down Expand Up @@ -261,9 +262,12 @@ pub fn test_execution_expectation_for_multiple_ndc_versions(

// Ensure sql_context can be serialized and deserialized
let sql_context = sql::catalog::Catalog::from_metadata(&gds.metadata);
let sql_context_str = serde_json::to_string(&sql_context)?;
let sql_context_parsed = serde_json::from_str(&sql_context_str)?;
assert_eq!(sql_context, sql_context_parsed);
let sql_context_str = serde_json::to_string(&sql_context.clone().to_serializable())?;
let sql_context_parsed: CatalogSerializable = serde_json::from_str(&sql_context_str)?;
assert_eq!(
sql_context,
sql_context_parsed.from_serializable(&gds.metadata)
);
assert_eq!(
schema, deserialized_metadata,
"initial built metadata does not match deserialized metadata"
Expand Down Expand Up @@ -577,9 +581,12 @@ pub(crate) fn test_sql(test_path_string: &str) -> anyhow::Result<()> {

// Ensure sql_context can be serialized and deserialized
let sql_context = sql::catalog::Catalog::from_metadata(&gds.metadata);
let sql_context_str = serde_json::to_string(&sql_context)?;
let sql_context_parsed = serde_json::from_str(&sql_context_str)?;
assert_eq!(sql_context, sql_context_parsed);
let sql_context_str = serde_json::to_string(&sql_context.clone().to_serializable())?;
let sql_context_parsed: CatalogSerializable = serde_json::from_str(&sql_context_str)?;
assert_eq!(
sql_context,
sql_context_parsed.from_serializable(&gds.metadata)
);

let request = if let Ok(content) = read_to_string(&request_path) {
SqlRequest::new(content)
Expand Down
12 changes: 6 additions & 6 deletions v3/crates/jsonapi/src/catalog/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use open_dds::{
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet};

#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub struct Catalog {
pub state_per_role: BTreeMap<Role, State>,
}
Expand All @@ -40,7 +40,7 @@ impl Catalog {
}
}

#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub struct State {
pub routes: BTreeMap<String, Model>,
#[serde(
Expand All @@ -50,7 +50,7 @@ pub struct State {
pub object_types: BTreeMap<Qualified<CustomTypeName>, ObjectType>,
}

#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub struct ObjectType(pub IndexMap<FieldName, Type>);

impl State {
Expand Down Expand Up @@ -125,22 +125,22 @@ impl State {
//
// for now we'll try d) but we should check we're happy with this before general release
// of the feature
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub enum Type {
Scalar(ndc_models::TypeRepresentation),
ScalarForDataConnector(ScalarTypeForDataConnector),
List(Box<Type>),
Object(Qualified<CustomTypeName>),
}

#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub struct ScalarTypeForDataConnector {
pub type_representations: BTreeSet<ndc_models::TypeRepresentation>,
}

// only the parts of a Model we need to construct a JSONAPI
// we'll filter out fields a given role can't see
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub struct Model {
pub name: Qualified<ModelName>,
pub description: Option<String>,
Expand Down
62 changes: 54 additions & 8 deletions v3/crates/sql/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{any::Any, sync::Arc};

use crate::catalog::subgraph::Subgraph;
use hasura_authn_core::Session;
use indexmap::IndexMap;
use metadata_resolve::{self as resolved};
Expand All @@ -25,15 +26,46 @@ pub mod model;
pub mod subgraph;
pub mod types;

/// The context in which to compile and execute SQL queries.
/// [`Catalog`] but parameterized, so that `subgraph` can be `SubgraphSerializable` for
/// serialization.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct Catalog {
pub(crate) subgraphs: IndexMap<String, Arc<subgraph::Subgraph>>,
pub struct CatalogSer<SG> {
pub(crate) subgraphs: IndexMap<String, SG>,
pub(crate) table_valued_functions: IndexMap<String, Arc<command::Command>>,
pub(crate) introspection: Arc<introspection::IntrospectionSchemaProvider>,
pub(crate) default_schema: Option<String>,
}

/// The context in which to compile and execute SQL queries.
pub type Catalog = CatalogSer<subgraph::Subgraph>;

pub type CatalogSerializable = CatalogSer<subgraph::SubgraphSerializable>;

impl CatalogSerializable {
pub fn from_serializable(self, metadata: &Arc<resolved::Metadata>) -> Catalog {
let subgraphs = self
.subgraphs
.into_iter()
.map(|(name, tables)| {
(
name,
Subgraph {
metadata: metadata.clone(),
tables,
},
)
})
.collect();

CatalogSer {
subgraphs,
table_valued_functions: self.table_valued_functions,
introspection: self.introspection,
default_schema: self.default_schema,
}
}
}

impl Catalog {
/// Create a no-op Catalog, used when `sql` layer is disabled
pub fn empty() -> Self {
Expand All @@ -46,6 +78,23 @@ impl Catalog {
default_schema: None,
}
}

/// prepare a Catalog for serializing by stripping redundant artifacts
pub fn to_serializable(self) -> CatalogSer<subgraph::SubgraphSerializable> {
let serialized_subgraphs = self
.subgraphs
.into_iter()
.map(|(name, subgraph)| (name, subgraph.tables))
.collect();

CatalogSer {
subgraphs: serialized_subgraphs,
table_valued_functions: self.table_valued_functions,
introspection: self.introspection,
default_schema: self.default_schema,
}
}

/// Derive a SQL Context from resolved Open DDS metadata.
pub fn from_metadata(metadata: &Arc<resolved::Metadata>) -> Self {
let type_registry = TypeRegistry::build_type_registry(metadata);
Expand Down Expand Up @@ -109,10 +158,7 @@ impl Catalog {
);

Catalog {
subgraphs: subgraphs
.into_iter()
.map(|(k, v)| (k, Arc::new(v)))
.collect(),
subgraphs,
table_valued_functions,
introspection: Arc::new(introspection),
default_schema,
Expand All @@ -134,7 +180,7 @@ impl datafusion::CatalogProvider for model::WithSession<Catalog> {
fn schema(&self, name: &str) -> Option<Arc<dyn datafusion::SchemaProvider>> {
let subgraph_provider = self.value.subgraphs.get(name).cloned().map(|schema| {
Arc::new(model::WithSession {
value: schema,
value: schema.into(),
session: self.session.clone(),
}) as Arc<dyn datafusion::SchemaProvider>
});
Expand Down
16 changes: 8 additions & 8 deletions v3/crates/sql/src/catalog/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,23 @@ pub enum UnsupportedModel {
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct Model {
pub subgraph: SubgraphName,
pub name: ModelName,
pub struct Model {
pub(crate) subgraph: SubgraphName,
pub(crate) name: ModelName,

pub description: Option<String>,
pub(crate) description: Option<String>,

pub arguments: IndexMap<ArgumentName, ArgumentInfo>,
pub(crate) arguments: IndexMap<ArgumentName, ArgumentInfo>,

// The struct type of the model's object type
pub struct_type: StructTypeName,
pub(crate) struct_type: StructTypeName,

// Datafusion table schema
pub schema: datafusion::SchemaRef,
pub(crate) schema: datafusion::SchemaRef,

// This is the entry point for the type mappings stored
// in ModelSource
pub data_type: Qualified<CustomTypeName>,
pub(crate) data_type: Qualified<CustomTypeName>,
}

impl Model {
Expand Down
19 changes: 14 additions & 5 deletions v3/crates/sql/src/catalog/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use metadata_resolve::Metadata;
use std::{any::Any, sync::Arc};

use indexmap::IndexMap;
use serde::{Deserialize, Serialize};

mod datafusion {
pub(super) use datafusion::error::Result;
Expand All @@ -14,12 +13,22 @@ use crate::catalog;

use super::model;

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct Subgraph {
pub metadata: Arc<Metadata>,
pub tables: IndexMap<String, Arc<catalog::model::Model>>,
/// TODO document
///
/// This is intentionally not `Serialize`/`Deserialize`, and constructed from a
/// [`SubgraphSerializable`].
#[derive(Clone, Debug, PartialEq)]
pub struct Subgraph {
pub(crate) metadata: Arc<Metadata>,
pub(crate) tables: IndexMap<String, Arc<catalog::model::Model>>,
}

/// This is [`Subgraph`] but with `metadata` removed (to avoid redundancy in artifact creation, and
/// to avoid the confusion of multiple copies of the same thing expected to be identical but
/// perhaps not, or perhaps no one knows...). It is reconstituted as `Subgraph` at some point after
/// deserializing.
pub type SubgraphSerializable = IndexMap<String, Arc<catalog::model::Model>>;

#[async_trait]
impl datafusion::SchemaProvider for catalog::model::WithSession<Subgraph> {
fn as_any(&self) -> &dyn Any {
Expand Down
26 changes: 13 additions & 13 deletions v3/crates/sql/src/catalog/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{BTreeSet, HashMap, HashSet},
collections::{BTreeMap, BTreeSet},
fmt::{Debug, Display},
sync::Arc,
};
Expand Down Expand Up @@ -29,7 +29,7 @@ type SupportedScalar = (NormalizedType, datafusion::DataType);
#[derive(Error, Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum UnsupportedScalar {
#[error("Multiple NDC type representations found for scalar type '{0}'")]
MultipleNdcTypeRepresentations(CustomTypeName, HashSet<ndc_models::TypeRepresentation>),
MultipleNdcTypeRepresentations(CustomTypeName, BTreeSet<ndc_models::TypeRepresentation>),
#[error("No NDC representation found for scalar type '{0}'")]
NoNdcRepresentation(CustomTypeName),
#[error("Unsupported NDC type representation for scalar type '{0}': {1:?}")]
Expand All @@ -42,7 +42,7 @@ pub fn resolve_scalar_type(
scalar_type_name: &Qualified<CustomTypeName>,
representation: &ScalarTypeRepresentation,
) -> Scalar {
let representations: HashSet<ndc_models::TypeRepresentation> =
let representations: BTreeSet<ndc_models::TypeRepresentation> =
representation.representations.values().cloned().collect();
let mut iter = representations.into_iter();
let ndc_representation = match iter.next() {
Expand Down Expand Up @@ -83,8 +83,8 @@ pub fn resolve_scalar_type(

pub fn resolve_scalar_types(
metadata: &resolved::Metadata,
) -> HashMap<Qualified<CustomTypeName>, Scalar> {
let mut custom_scalars: HashMap<Qualified<CustomTypeName>, Scalar> = HashMap::new();
) -> BTreeMap<Qualified<CustomTypeName>, Scalar> {
let mut custom_scalars: BTreeMap<Qualified<CustomTypeName>, Scalar> = BTreeMap::new();
for (scalar_type_name, representation) in &metadata.scalar_types {
let scalar = resolve_scalar_type(scalar_type_name, representation);
custom_scalars.insert(scalar_type_name.clone(), scalar);
Expand All @@ -93,8 +93,8 @@ pub fn resolve_scalar_types(
}

pub struct TypeRegistry {
custom_scalars: HashMap<Qualified<CustomTypeName>, Scalar>,
struct_types: HashMap<Qualified<CustomTypeName>, Struct>,
custom_scalars: BTreeMap<Qualified<CustomTypeName>, Scalar>,
struct_types: BTreeMap<Qualified<CustomTypeName>, Struct>,
default_schema: Option<SubgraphName>,
}

Expand All @@ -113,7 +113,7 @@ impl TypeRegistry {
// build a default schema by checking if the object types are spread across more than one
// subgraph
let default_schema = {
let subgraphs: HashSet<_> = metadata
let subgraphs: BTreeSet<_> = metadata
.object_types
.keys()
.map(|object_type_name| &object_type_name.subgraph)
Expand All @@ -127,7 +127,7 @@ impl TypeRegistry {

let custom_scalars = resolve_scalar_types(metadata);

let mut struct_types: HashMap<Qualified<CustomTypeName>, Struct> = HashMap::new();
let mut struct_types: BTreeMap<Qualified<CustomTypeName>, Struct> = BTreeMap::new();

for (object_type_name, object_type) in &metadata.object_types {
let struct_type = struct_type(
Expand Down Expand Up @@ -206,15 +206,15 @@ impl TypeRegistry {
// }
}

pub(crate) fn struct_types(&self) -> &HashMap<Qualified<CustomTypeName>, Struct> {
pub(crate) fn struct_types(&self) -> &BTreeMap<Qualified<CustomTypeName>, Struct> {
&self.struct_types
}

pub(crate) fn default_schema(&self) -> Option<&SubgraphName> {
self.default_schema.as_ref()
}

pub fn custom_scalars(&self) -> &HashMap<Qualified<CustomTypeName>, Scalar> {
pub fn custom_scalars(&self) -> &BTreeMap<Qualified<CustomTypeName>, Scalar> {
&self.custom_scalars
}
}
Expand Down Expand Up @@ -338,7 +338,7 @@ type Struct = Result<StructType, UnsupportedObject>;
fn struct_type(
default_schema: &Option<SubgraphName>,
metadata: &resolved::Metadata,
custom_scalars: &HashMap<Qualified<CustomTypeName>, Scalar>,
custom_scalars: &BTreeMap<Qualified<CustomTypeName>, Scalar>,
object_type_name: &Qualified<CustomTypeName>,
object_type: &resolved::ObjectTypeWithRelationships,
disallowed_object_types: &BTreeSet<Qualified<CustomTypeName>>,
Expand Down Expand Up @@ -491,7 +491,7 @@ fn ndc_representation_to_datatype(
fn to_arrow_type(
default_schema: &Option<SubgraphName>,
metadata: &resolved::Metadata,
custom_scalars: &HashMap<Qualified<CustomTypeName>, Scalar>,
custom_scalars: &BTreeMap<Qualified<CustomTypeName>, Scalar>,
ty: &resolved::QualifiedBaseType,
disallowed_object_types: &BTreeSet<Qualified<CustomTypeName>>,
// ) -> GeneratedArrowType {
Expand Down
6 changes: 3 additions & 3 deletions v3/crates/sql/src/execute/planner/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use datafusion::{
error::DataFusionError,
scalar::ScalarValue,
};
use std::{collections::HashSet, sync::Arc};
use std::{collections::BTreeSet, sync::Arc};

/// Parses a datafusion literal into a json value, supports most types (including structs and arrays).
pub(crate) fn parse_datafusion_literal(
Expand Down Expand Up @@ -197,7 +197,7 @@ pub(crate) fn parse_struct_literal(
}
} else {
// Create a map of expected fields
let expected_field_set: HashSet<_> =
let expected_field_set: BTreeSet<_> =
expected_fields.iter().map(|field| field.name()).collect();

// Check for extra fields in the struct that aren't expected
Expand All @@ -216,7 +216,7 @@ pub(crate) fn parse_struct_literal(
}
}

let field_name_to_index: std::collections::HashMap<_, _> = struct_array
let field_name_to_index: std::collections::BTreeMap<_, _> = struct_array
.fields()
.iter()
.enumerate()
Expand Down

0 comments on commit 426a834

Please sign in to comment.