diff --git a/Cargo.toml b/Cargo.toml index dfd0eaeb1..92c6bc003 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -103,7 +103,7 @@ strum = "0.24.1" strum_macros = "0.24.3" substring = "1.4.5" tempfile = "3.4.0" -tera = "1.18.1" +tera = "1.19.0" termcolor = "1.2.0" thiserror = "1.0.40" tokio = { version = "1.27.0", features = [ diff --git a/crates/sparrow-catalog/README.md b/crates/sparrow-catalog/README.md index 4d463a615..19140b450 100644 --- a/crates/sparrow-catalog/README.md +++ b/crates/sparrow-catalog/README.md @@ -7,11 +7,14 @@ Each `toml` file defines the documentation for a Fenl function. ## Generating Documentation -Documentation may be generated using (from `sparrow-rs` directory) the following command. -This uses the `catalog.md` template from the template-dir to render the catalog. +Documentation may be generated using the following command. +This renders all the templates in the template dir other than partials. ```sh -cargo run -p sparrow-catalog -- --input-dir sparrow-catalog/catalog generate --template-dir sparrow-catalog/templates +cargo run -p sparrow-catalog -- \ + --input-dir crates/sparrow-catalog/catalog generate \ + --template-dir crates/sparrow-catalog/templates \ + --output-dir=catalog-tmp ``` ## Updating Examples / Signatures @@ -19,15 +22,15 @@ cargo run -p sparrow-catalog -- --input-dir sparrow-catalog/catalog generate --t The following updates the signature and example output in the `toml` files. ```sh -cargo run -p sparrow-catalog -- --input-dir sparrow-catalog/catalog update +cargo run -p sparrow-catalog -- --input-dir crates/sparrow-catalog/catalog update ``` -## Checking Examlpes / Signatures +## Checking Examples / Signatures The following ensures that the signature and example output in the `toml` files are up to date. ```sh -cargo run -p sparrow-catalog -- --input-dir sparrow-catalog/catalog check +cargo run -p sparrow-catalog -- --input-dir crates/sparrow-catalog/catalog check ``` ## Function Documentation Style Guide diff --git a/crates/sparrow-catalog/src/generate.rs b/crates/sparrow-catalog/src/generate.rs index 7a75e2e7e..b4b81aaf3 100644 --- a/crates/sparrow-catalog/src/generate.rs +++ b/crates/sparrow-catalog/src/generate.rs @@ -1,10 +1,9 @@ -use std::path::PathBuf; +use std::path::{Path, PathBuf}; -use error_stack::{IntoReport, ResultExt}; -use futures::TryStreamExt; +use error_stack::{IntoReport, Report, ResultExt}; +use futures::{StreamExt, TryStreamExt}; use hashbrown::HashSet; -use serde::Serialize; -use tokio::io::AsyncWriteExt; +use tera::Tera; use crate::list_doc_files; use crate::structs::CatalogEntry; @@ -17,104 +16,211 @@ pub(super) struct GenerateOptions { #[arg(long)] template_dir: PathBuf, - /// Output file to write the output to. Defaults to stdout. + /// Output directory to write the output to. #[arg(long)] - output: Option, + output_dir: PathBuf, } -#[derive(Serialize)] struct CatalogContext { - pub functions: Vec, + functions: Vec, /// The set of all tags. Computing this within the template would be /// painful. - pub tags: HashSet, + tags: HashSet, + base_context: tera::Context, +} + +impl CatalogContext { + async fn try_new(doc_root: PathBuf) -> error_stack::Result { + let mut functions: Vec<_> = list_doc_files(doc_root) + .await + .into_report() + .change_context(Error::ListingFiles)? + .map_err(|e| error_stack::report!(e).change_context(Error::ListingFiles)) + .map_ok(parse_doc_file) + .try_buffer_unordered(4) + .try_collect() + .await?; + + functions.sort_by(|a, b| a.name.cmp(&b.name)); + + let mut tags = HashSet::new(); + for function in &functions { + for tag in &function.tags { + tags.get_or_insert_with(tag, |tag| tag.to_owned()); + } + } + + let mut base_context = tera::Context::new(); + base_context.insert("functions", &functions); + base_context.insert("tags", &tags); + Ok(Self { + functions, + tags, + base_context, + }) + } + + fn global_context(&self) -> tera::Context { + self.base_context.clone() + } + + fn function_contexts(&self) -> impl Iterator + '_ { + self.functions.iter().map(|entry| { + let mut context = self.base_context.clone(); + context.insert("function", &entry); + (entry.name.as_ref(), context) + }) + } + + fn tag_contexts(&self) -> impl Iterator + '_ { + self.tags.iter().map(|tag| { + let mut context = self.base_context.clone(); + context.insert("tag", &tag); + (tag.as_ref(), context) + }) + } } #[derive(derive_more::Display, Debug)] pub enum Error { - #[display(fmt = "failed to write catalog")] - WriteCatalog, - #[display(fmt = "failed to print catalog")] - PrintCatalog, + #[display(fmt = "failed to make output dir")] + MakeOutputDir, #[display(fmt = "failed to list files")] ListingFiles, #[display(fmt = "failed to read input doc")] ReadingInputDoc, #[display(fmt = "failed to compile templates")] CompileTemplates, - #[display(fmt = "failed to render template")] - RenderTemplate, + #[display(fmt = "failed to render template '{_0}")] + RenderTemplate(String), } impl error_stack::Context for Error {} +#[allow(clippy::print_stdout)] pub(super) async fn generate( doc_root: PathBuf, options: GenerateOptions, ) -> error_stack::Result<(), Error> { - let catalog = generate_catalog(doc_root, options.template_dir).await?; + // Create the output directory if it doesn't exist. + tokio::fs::create_dir_all(&options.output_dir) + .await + .into_report() + .change_context(Error::MakeOutputDir)?; - match options.output { - Some(path) => tokio::fs::write(path, catalog) - .await - .into_report() - .change_context(Error::WriteCatalog)?, - None => tokio::io::stdout() - .write_all(catalog.as_bytes()) - .await - .into_report() - .change_context(Error::PrintCatalog)?, - } + render_templates(doc_root, &options.template_dir, &options.output_dir).await?; + + println!( + "Generated catalog contents in {}", + options.output_dir.display() + ); Ok(()) } -/// Generate the `catalog.md` from the templates and function docs. -/// -/// This applies the `catalog.md` template. -pub(super) async fn generate_catalog( +/// Render the top-level files in the `templates` directory. +pub(super) async fn render_templates( doc_root: PathBuf, - template_dir: PathBuf, -) -> error_stack::Result { - let mut functions: Vec<_> = list_doc_files(doc_root) - .await - .into_report() - .change_context(Error::ListingFiles)? - .map_err(|e| error_stack::report!(e).change_context(Error::ListingFiles)) - .map_ok(parse_doc_file) - .try_buffer_unordered(4) - .try_collect() - .await?; - - functions.sort_by(|a, b| a.name.cmp(&b.name)); - - let mut tags = HashSet::new(); - for function in &functions { - for tag in &function.tags { - tags.get_or_insert_with(tag, |tag| tag.to_owned()); - } - } - - let catalog = CatalogContext { functions, tags }; + template_dir: &Path, + output_dir: &Path, +) -> error_stack::Result<(), Error> { + let context = CatalogContext::try_new(doc_root).await?; - let template_glob = format!("{}/**/*.md", template_dir.to_string_lossy()); + let template_glob = format!("{}/**/*", template_dir.to_string_lossy()); let mut tera = tera::Tera::new(&template_glob) .into_report() .change_context(Error::CompileTemplates)?; - tera.register_filter("csv2md", filters::CsvToMdFilter); tera.register_filter("link_fenl_types", filters::LinkFenlTypes); - tera.register_filter("warning_block", filters::WarningBlockQuote); - let context = tera::Context::from_serialize(catalog) + + // 0. Make the output directories. + tokio::fs::create_dir_all(output_dir.join("category")) + .await .into_report() - .change_context(Error::RenderTemplate)?; + .change_context(Error::MakeOutputDir)?; + tokio::fs::create_dir_all(output_dir.join("function")) + .await + .into_report() + .change_context(Error::MakeOutputDir)?; + + let mut futures = Vec::new(); + + // 1. Render `nav.adoc`, `index.adoc` and `operators.adoc`. + futures.push(render( + &tera, + context.global_context(), + "nav.adoc", + output_dir.join("nav.adoc"), + )); + futures.push(render( + &tera, + context.global_context(), + "index.adoc", + output_dir.join("index.adoc"), + )); + futures.push(render( + &tera, + context.global_context(), + "operators.adoc", + output_dir.join("category/operators.adoc"), + )); + + // 2. Render `.adoc` for each category. + for (tag, context) in context.tag_contexts() { + futures.push(render( + &tera, + context, + "category.adoc", + output_dir.join(format!("category/{tag}.adoc")), + )); + } + + // 3. Render `function.adoc` for each function. + for (name, context) in context.function_contexts() { + futures.push(render( + &tera, + context, + "function.adoc", + output_dir.join(format!("function/{name}.adoc")), + )) + } - let template_name = "catalog.md"; + futures::stream::iter(futures) + .buffer_unordered(8) + .try_collect() + .await +} - let catalog = tera +async fn render( + tera: &Tera, + context: tera::Context, + template_name: &str, + destination: PathBuf, +) -> error_stack::Result<(), Error> { + let error = || Error::RenderTemplate(template_name.to_owned()); + let contents = tera .render(template_name, &context) + .map_err(|e| { + // Converting tera errors to error stack drops important context. + // Make sure to grab the causes. + let mut sources = Vec::new(); + let mut error: &dyn std::error::Error = &e; + while let Some(source) = error.source() { + sources.push(source.to_string()); + error = source; + } + + let mut report = Report::new(e); + for source in sources { + report = report.attach_printable(source); + } + report + }) + .change_context_lazy(error)?; + + tokio::fs::write(destination, contents) + .await .into_report() - .change_context(Error::RenderTemplate)?; - Ok(catalog) + .change_context_lazy(error) } /// Parse an existing `.toml` document to a CatalogEntry. diff --git a/crates/sparrow-catalog/src/generate/filters.rs b/crates/sparrow-catalog/src/generate/filters.rs index 44b134f59..7137029b2 100644 --- a/crates/sparrow-catalog/src/generate/filters.rs +++ b/crates/sparrow-catalog/src/generate/filters.rs @@ -1,70 +1,5 @@ use std::collections::HashMap; -pub(super) struct CsvToMdFilter; - -impl tera::Filter for CsvToMdFilter { - fn filter( - &self, - value: &tera::Value, - _args: &HashMap, - ) -> tera::Result { - use prettytable::format::TableFormat; - use prettytable::{Cell, Row, Table}; - - if let Some(csv) = value.as_str() { - let mut csv = Table::from_csv_string(csv) - .map_err(|e| tera::Error::msg(format!("Failed to parse CSV: {e}")))?; - let mut format = TableFormat::new(); - format.column_separator('|'); - format.left_border('|'); - format.right_border('|'); - - csv.set_format(format); - - let num_columns = csv.get_row(0).unwrap().len(); - let header_cells = vec![Cell::new(" :---: "); num_columns]; - csv.insert_row(1, Row::new(header_cells)); - let mut md = Vec::new(); - csv.print(&mut md) - .map_err(|e| tera::Error::msg(format!("Failed to write markdown table: {e}")))?; - let md = String::from_utf8(md) - .map_err(|e| tera::Error::msg(format!("Failed to convert table to string: {e}")))?; - Ok(tera::Value::String(md)) - } else { - Err(tera::Error::msg(format!( - "Unable to parse CSV from non-string {value:?}", - ))) - } - } -} - -/// Wraps a string with the necessary formatting to be a warning block. -pub(super) struct WarningBlockQuote; - -impl tera::Filter for WarningBlockQuote { - fn filter( - &self, - value: &tera::Value, - _args: &HashMap, - ) -> tera::Result { - if let Some(content) = value.as_str() { - // Can grow, but allocate enough space for at least a few lines. - let mut result = String::with_capacity(content.len() + 64); - result.push_str("> 🚧 Warning\n"); - for line in content.lines() { - result.push_str("> "); - result.push_str(line); - result.push('\n'); - } - Ok(tera::Value::String(result)) - } else { - Err(tera::Error::msg(format!( - "Unable to format non-string as warning-block: {value:?}" - ))) - } - } -} - pub(super) struct LinkFenlTypes; impl tera::Filter for LinkFenlTypes { diff --git a/crates/sparrow-catalog/src/update/execute_example.rs b/crates/sparrow-catalog/src/update/execute_example.rs index baa2836e1..bf0672f27 100644 --- a/crates/sparrow-catalog/src/update/execute_example.rs +++ b/crates/sparrow-catalog/src/update/execute_example.rs @@ -1,7 +1,10 @@ use std::fs::File; +use arrow::record_batch::RecordBatchWriter; use error_stack::{IntoReportCompat, ResultExt}; use futures::{StreamExt, TryStreamExt}; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use parquet::arrow::ProjectionMask; use sparrow_api::kaskada::v1alpha::compile_request::ExpressionKind; use sparrow_api::kaskada::v1alpha::destination; use sparrow_api::kaskada::v1alpha::source_data; @@ -88,7 +91,7 @@ pub(super) async fn execute_example( let destination = ObjectStoreDestination { output_prefix_uri: format!("file:///{}", tempdir.path().display()), - file_type: FileType::Csv.into(), + file_type: FileType::Parquet.into(), output_paths: None, }; let output_to = Destination { @@ -130,23 +133,29 @@ pub(super) async fn execute_example( let output_path = output_paths[0].strip_prefix("file://").unwrap(); let output_path = std::path::Path::new(output_path); - // Drop the first four (key columns). - // - // Note: This currently writes to CSV and then parses it. - // There may be faster ways, but this lets us re-use the existing functionality - // to write to CSV rather than creating special functionality just for examples. - // We could also consider other options for removing the rows (regex, etc.) - // but this works. - let mut table = prettytable::Table::from_csv_file(output_path).unwrap(); - for row in table.row_iter_mut() { - row.remove_cell(0); - row.remove_cell(0); - row.remove_cell(0); - row.remove_cell(0); + let file = File::open(output_path).unwrap(); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let fields = builder + .schema() + .fields() + .iter() + .enumerate() + .filter_map(|(index, field)| match field.name().as_ref() { + "_time" | "_key" | "_key_hash" | "_subsort" => None, + _ => Some(index), + }); + let mask = ProjectionMask::roots(builder.parquet_schema(), fields); + let reader = builder.with_projection(mask).build().unwrap(); + + let mut content = Vec::new(); + let mut writer = arrow::csv::WriterBuilder::new().build(&mut content); + for batch in reader { + let batch = batch.unwrap(); + writer.write(&batch).unwrap(); } + writer.close().unwrap(); - let content = - String::from_utf8(table.to_csv(Vec::new()).unwrap().into_inner().unwrap()).unwrap(); + let content = String::from_utf8(content).unwrap(); tempdir.close().unwrap(); diff --git a/crates/sparrow-catalog/templates/catalog.md b/crates/sparrow-catalog/templates/catalog.md deleted file mode 100644 index ea78c9c20..000000000 --- a/crates/sparrow-catalog/templates/catalog.md +++ /dev/null @@ -1,50 +0,0 @@ -# Functions - -| Function | Summary | -|:--------------------------------------------|:-------------------------------| -{% for function in functions -%} -| [{{ function.name }}](#{{ function.name }}) | {{function.short_doc | trim }} | -{% endfor %} - -# Function Categories -## Operators -| Function | Summary | -|:--------------------------------------------|:-------------------------------| -{% for function in functions | filter(attribute="operator") -%} -| [{{ function.operator }}](#{{ function.name }}) | {{function.short_doc | trim }} | -{% endfor -%} - -{% for tag in tags | sort %} -## {{ tag | capitalize }} Functions -{% if tag == 'aggregation' -%} -Aggregation functions provide the mechanism for computing across rows. -The result of an aggregation represents the aggregate result for each -key up to and including the current row. This approximately corresponds to -the result you would get if you ran a SQL aggregation over the values -available at the time of that row. - -Aggregations may be configured to operate in a specific window by providing -a [window function](#window-functions) as the optional `window` argument. -If no window is provided, the aggregation is over all rows for the entity, -up to and including the current time. If a window is provided, the result -of an aggregation is the result for that entity in the current window up -to and including the current time. The current window is often not yet -complete. - -NOTE: All aggregations in Fenl are implicitly scoped to the entity key. -This would be equivalent to performing a grouped aggregation in SQL. -{%- endif %} - -| Function | Summary | -|:--------------------------------------------|:-------------------------------| -{% for function in functions -%} -{% if function.tags is containing(tag) -%} -| [{{ function.name }}](#{{ function.name }}) | {{function.short_doc | trim }} | -{% endif -%} -{% endfor -%} -{% endfor %} -# Function Details - -{% for function in functions -%} -{% include "partials/function.md" %} -{% endfor -%} \ No newline at end of file diff --git a/crates/sparrow-catalog/templates/category.adoc b/crates/sparrow-catalog/templates/category.adoc new file mode 100644 index 000000000..0a32910a1 --- /dev/null +++ b/crates/sparrow-catalog/templates/category.adoc @@ -0,0 +1,28 @@ += {{ tag | capitalize }} Functions +{% if tag == 'aggregation' -%} +Aggregation functions provide the mechanism for computing across rows. +The result of an aggregation represents the aggregate result for each +key up to and including the current row. This approximately corresponds to +the result you would get if you ran a SQL aggregation over the values +available at the time of that row. + +Aggregations may be configured to operate in a specific window by providing +a [window function](#window-functions) as the optional `window` argument. +If no window is provided, the aggregation is over all rows for the entity, +up to and including the current time. If a window is provided, the result +of an aggregation is the result for that entity in the current window up +to and including the current time. The current window is often not yet +complete. + +NOTE: All aggregations in Fenl are implicitly scoped to the entity key. +This would be equivalent to performing a grouped aggregation in SQL. +{%- endif %} +|=== +| Function | Summary +{% for function in functions -%} +{%- if function.tags is containing(tag) %} +| [{{ function.name }}](#{{ function.name }}) +| {{function.short_doc | trim }} +{% endif -%} +{% endfor -%} +|=== diff --git a/crates/sparrow-catalog/templates/partials/function.md b/crates/sparrow-catalog/templates/function.adoc similarity index 80% rename from crates/sparrow-catalog/templates/partials/function.md rename to crates/sparrow-catalog/templates/function.adoc index 1e690eab6..c537c944e 100644 --- a/crates/sparrow-catalog/templates/partials/function.md +++ b/crates/sparrow-catalog/templates/function.adoc @@ -1,12 +1,11 @@ -## {{ function.name }} += {{ function.name }} {{ function.signature | link_fenl_types }} {{ function.short_doc }} {%- if function.experimental %} - -{{ function.experimental | warning_block | trim }} +WARNING: {{ function.experimental }} {%- endif %} {%- if function.long_doc %} @@ -19,6 +18,6 @@ {%- if function.examples -%} {%- for example in function.examples %} -{% include "partials/example.md" %} +{% include "partials/example.adoc" %} {%- endfor -%} {%- endif %} diff --git a/crates/sparrow-catalog/templates/index.adoc b/crates/sparrow-catalog/templates/index.adoc new file mode 100644 index 000000000..047628430 --- /dev/null +++ b/crates/sparrow-catalog/templates/index.adoc @@ -0,0 +1,9 @@ +# Functions + +|=== +| Function | Summary +{% for function in functions %} +| [{{ function.name }}](#{{ function.name }}) +| {{function.short_doc | trim }} +{% endfor -%} +|=== diff --git a/crates/sparrow-catalog/templates/nav.adoc b/crates/sparrow-catalog/templates/nav.adoc new file mode 100644 index 000000000..203794712 --- /dev/null +++ b/crates/sparrow-catalog/templates/nav.adoc @@ -0,0 +1,11 @@ +* xref:index.adoc[] + +* Categories +** xref:category/operators.adoc[] +{% for tag in tags | sort -%} +** xref:category/{{ tag }}.adoc[] +{% endfor %} +* Functions +{% for function in functions -%} +** xref:function/{{ function.name }}.adoc[] +{% endfor %} diff --git a/crates/sparrow-catalog/templates/operators.adoc b/crates/sparrow-catalog/templates/operators.adoc new file mode 100644 index 000000000..26ea4edb9 --- /dev/null +++ b/crates/sparrow-catalog/templates/operators.adoc @@ -0,0 +1,9 @@ +## Operators + +|=== +| Function | Summary +{% for function in functions | filter(attribute="operator") %} +| [{{ function.name }}](#{{ function.name }}) +| {{function.short_doc | trim }} +{% endfor -%} +|=== diff --git a/crates/sparrow-catalog/templates/partials/example.md b/crates/sparrow-catalog/templates/partials/example.adoc similarity index 69% rename from crates/sparrow-catalog/templates/partials/example.md rename to crates/sparrow-catalog/templates/partials/example.adoc index f970ab109..5b423c2ef 100644 --- a/crates/sparrow-catalog/templates/partials/example.md +++ b/crates/sparrow-catalog/templates/partials/example.adoc @@ -1,13 +1,15 @@ {%- if example.name -%} -### Example: {{ example.name }} +.Example: {{ example.name }} {% else -%} -### Example +.Example {% endif %} +==== + {%- if example.description -%} {{ example.description }} {% endif -%} -#### Query += Query ``` {%- if example.full_expression %} {{ example.full_expression | trim }} @@ -16,23 +18,36 @@ {% endif -%} ``` {% if example.input_csv %} -#### Table: Input += Table: Input * **Name**: `Input` * **Time Column**: `time` * **Group Column**: `key` * **Grouping**: `grouping` -{{ example.input_csv | csv2md | trim }} +[%header,format=csv] +|=== +{{ example.input_csv }} +|=== + {% endif %} {%- for table in example.tables | default(value=[]) %} -#### Table: {{ table.name }} += Table: {{ table.name }} * **Name**: `{{ table.name }}` * **Time Column**: `{{ table.time_column_name }}` * **Group Column**: `{{ table.group_column_name }}` * **Grouping**: `{{ table.grouping }}` -{{ table.input_csv | csv2md | trim }} +[%header,format=csv] +|=== +{{ table.input_csv }} +|=== + {% endfor %} -#### Output CSV -{{ example.output_csv | csv2md | trim }} \ No newline at end of file += Output CSV +[header,format=csv] +|=== +{{ example.output_csv }} +|=== + +==== diff --git a/crates/sparrow-main/tests/e2e/collection_tests.rs b/crates/sparrow-main/tests/e2e/collection_tests.rs index 63170ada1..9fccc2d51 100644 --- a/crates/sparrow-main/tests/e2e/collection_tests.rs +++ b/crates/sparrow-main/tests/e2e/collection_tests.rs @@ -25,7 +25,7 @@ pub(crate) async fn collection_data_fixture() -> DataFixture { "key", "", ), - &[&"parquet/data_with_map.parquet"], + &["parquet/data_with_map.parquet"], ) .await .unwrap()