From 5b3d1d3306eb645964f521d89255382624266ab1 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 26 Jul 2024 21:59:26 +0800 Subject: [PATCH] chore: Enable new rust code format settings (#483) * chore: Enable new format settings Signed-off-by: Xuanwo * Format Signed-off-by: Xuanwo --------- Signed-off-by: Xuanwo --- crates/catalog/glue/src/catalog.rs | 5 +- crates/catalog/glue/src/error.rs | 6 +- crates/catalog/glue/src/schema.rs | 13 +- crates/catalog/glue/src/utils.rs | 21 +- crates/catalog/hms/src/catalog.rs | 23 +- crates/catalog/hms/src/error.rs | 5 +- crates/catalog/hms/src/schema.rs | 3 +- crates/catalog/hms/src/utils.rs | 12 +- crates/catalog/hms/tests/hms_catalog_test.rs | 8 +- crates/catalog/rest/src/catalog.rs | 23 +- crates/catalog/rest/src/client.rs | 15 +- crates/catalog/rest/src/types.rs | 3 +- .../catalog/rest/tests/rest_catalog_test.rs | 5 +- crates/examples/src/rest_catalog_namespace.rs | 3 +- crates/examples/src/rest_catalog_table.rs | 3 +- crates/iceberg/src/arrow/reader.rs | 16 +- crates/iceberg/src/arrow/schema.rs | 29 +- crates/iceberg/src/avro/schema.rs | 19 +- crates/iceberg/src/catalog/mod.rs | 30 +- crates/iceberg/src/error.rs | 4 +- crates/iceberg/src/expr/accessor.rs | 3 +- crates/iceberg/src/expr/mod.rs | 3 +- crates/iceberg/src/expr/predicate.rs | 32 +- crates/iceberg/src/expr/term.rs | 5 +- .../expr/visitors/bound_predicate_visitor.rs | 11 +- .../src/expr/visitors/expression_evaluator.rs | 35 +- .../visitors/inclusive_metrics_evaluator.rs | 13 +- .../src/expr/visitors/inclusive_projection.rs | 9 +- .../src/expr/visitors/manifest_evaluator.rs | 14 +- crates/iceberg/src/io/file_io.rs | 15 +- crates/iceberg/src/io/storage.rs | 3 +- crates/iceberg/src/io/storage_fs.rs | 6 +- crates/iceberg/src/io/storage_memory.rs | 6 +- crates/iceberg/src/io/storage_s3.rs | 8 +- crates/iceberg/src/lib.rs | 16 +- crates/iceberg/src/scan.rs | 44 +- crates/iceberg/src/spec/datatypes.rs | 61 +-- crates/iceberg/src/spec/manifest.rs | 64 +-- crates/iceberg/src/spec/manifest_list.rs | 49 +- crates/iceberg/src/spec/partition.rs | 10 +- crates/iceberg/src/spec/schema.rs | 37 +- crates/iceberg/src/spec/snapshot.rs | 18 +- crates/iceberg/src/spec/sort.rs | 9 +- crates/iceberg/src/spec/table_metadata.rs | 159 +++--- crates/iceberg/src/spec/transform.rs | 21 +- crates/iceberg/src/spec/values.rs | 117 ++--- crates/iceberg/src/table.rs | 6 +- crates/iceberg/src/transaction.rs | 14 +- crates/iceberg/src/transform/bucket.rs | 135 ++--- crates/iceberg/src/transform/identity.rs | 6 +- crates/iceberg/src/transform/mod.rs | 22 +- crates/iceberg/src/transform/temporal.rs | 496 ++++++++---------- crates/iceberg/src/transform/truncate.rs | 176 +++---- crates/iceberg/src/transform/void.rs | 6 +- .../writer/base_writer/data_file_writer.rs | 34 +- .../writer/file_writer/location_generator.rs | 20 +- crates/iceberg/src/writer/file_writer/mod.rs | 6 +- .../src/writer/file_writer/parquet_writer.rs | 87 ++- .../src/writer/file_writer/track_writer.rs | 4 +- crates/iceberg/src/writer/mod.rs | 11 +- crates/iceberg/tests/file_io_s3_test.rs | 3 +- crates/integrations/datafusion/src/catalog.rs | 7 +- .../datafusion/src/physical_plan/scan.rs | 20 +- crates/integrations/datafusion/src/schema.rs | 7 +- crates/integrations/datafusion/src/table.rs | 21 +- crates/test_utils/src/docker.rs | 3 +- rustfmt.toml | 7 + 67 files changed, 941 insertions(+), 1164 deletions(-) diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 147d86ac9..16acaa719 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; +use std::fmt::Debug; + use async_trait::async_trait; use aws_sdk_glue::types::TableInput; use iceberg::io::FileIO; @@ -24,8 +27,6 @@ use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; -use std::{collections::HashMap, fmt::Debug}; - use typed_builder::TypedBuilder; use crate::error::{from_aws_build_error, from_aws_sdk_error}; diff --git a/crates/catalog/glue/src/error.rs b/crates/catalog/glue/src/error.rs index 64a8fe936..a94f6c220 100644 --- a/crates/catalog/glue/src/error.rs +++ b/crates/catalog/glue/src/error.rs @@ -15,16 +15,14 @@ // specific language governing permissions and limitations // under the License. -use anyhow::anyhow; use std::fmt::Debug; +use anyhow::anyhow; use iceberg::{Error, ErrorKind}; /// Format AWS SDK error into iceberg error pub(crate) fn from_aws_sdk_error(error: aws_sdk_glue::error::SdkError) -> Error -where - T: Debug, -{ +where T: Debug { Error::new( ErrorKind::Unexpected, "Operation failed for hitting aws skd error".to_string(), diff --git a/crates/catalog/glue/src/schema.rs b/crates/catalog/glue/src/schema.rs index a126f2f29..c349219f6 100644 --- a/crates/catalog/glue/src/schema.rs +++ b/crates/catalog/glue/src/schema.rs @@ -24,12 +24,9 @@ pub(crate) const ICEBERG_FIELD_CURRENT: &str = "iceberg.field.current"; use std::collections::HashMap; -use iceberg::{ - spec::{visit_schema, PrimitiveType, SchemaVisitor, TableMetadata}, - Error, ErrorKind, Result, -}; - use aws_sdk_glue::types::Column; +use iceberg::spec::{visit_schema, PrimitiveType, SchemaVisitor, TableMetadata}; +use iceberg::{Error, ErrorKind, Result}; use crate::error::from_aws_build_error; @@ -188,10 +185,8 @@ impl SchemaVisitor for GlueSchemaBuilder { #[cfg(test)] mod tests { - use iceberg::{ - spec::{Schema, TableMetadataBuilder}, - TableCreation, - }; + use iceberg::spec::{Schema, TableMetadataBuilder}; + use iceberg::TableCreation; use super::*; diff --git a/crates/catalog/glue/src/utils.rs b/crates/catalog/glue/src/utils.rs index dcf6bef46..a99fb19c7 100644 --- a/crates/catalog/glue/src/utils.rs +++ b/crates/catalog/glue/src/utils.rs @@ -18,15 +18,14 @@ use std::collections::HashMap; use aws_config::{BehaviorVersion, Region, SdkConfig}; -use aws_sdk_glue::{ - config::Credentials, - types::{Database, DatabaseInput, StorageDescriptor, TableInput}, -}; +use aws_sdk_glue::config::Credentials; +use aws_sdk_glue::types::{Database, DatabaseInput, StorageDescriptor, TableInput}; use iceberg::spec::TableMetadata; use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result}; use uuid::Uuid; -use crate::{error::from_aws_build_error, schema::GlueSchemaBuilder}; +use crate::error::from_aws_build_error; +use crate::schema::GlueSchemaBuilder; /// Property aws profile name pub const AWS_PROFILE_NAME: &str = "profile_name"; @@ -286,15 +285,13 @@ macro_rules! with_catalog_id { #[cfg(test)] mod tests { - use aws_sdk_glue::{config::ProvideCredentials, types::Column}; - use iceberg::{ - spec::{NestedField, PrimitiveType, Schema, TableMetadataBuilder, Type}, - Namespace, Result, TableCreation, - }; - - use crate::schema::{ICEBERG_FIELD_CURRENT, ICEBERG_FIELD_ID, ICEBERG_FIELD_OPTIONAL}; + use aws_sdk_glue::config::ProvideCredentials; + use aws_sdk_glue::types::Column; + use iceberg::spec::{NestedField, PrimitiveType, Schema, TableMetadataBuilder, Type}; + use iceberg::{Namespace, Result, TableCreation}; use super::*; + use crate::schema::{ICEBERG_FIELD_CURRENT, ICEBERG_FIELD_ID, ICEBERG_FIELD_OPTIONAL}; fn create_metadata(schema: Schema) -> Result { let table_creation = TableCreation::builder() diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 7a292c51b..524eceec7 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -15,30 +15,29 @@ // specific language governing permissions and limitations // under the License. -use crate::error::from_thrift_error; -use crate::error::{from_io_error, from_thrift_exception}; +use std::collections::HashMap; +use std::fmt::{Debug, Formatter}; +use std::net::ToSocketAddrs; -use super::utils::*; use anyhow::anyhow; use async_trait::async_trait; -use hive_metastore::ThriftHiveMetastoreClient; -use hive_metastore::ThriftHiveMetastoreClientBuilder; -use hive_metastore::ThriftHiveMetastoreGetDatabaseException; -use hive_metastore::ThriftHiveMetastoreGetTableException; +use hive_metastore::{ + ThriftHiveMetastoreClient, ThriftHiveMetastoreClientBuilder, + ThriftHiveMetastoreGetDatabaseException, ThriftHiveMetastoreGetTableException, +}; use iceberg::io::FileIO; -use iceberg::spec::TableMetadata; -use iceberg::spec::TableMetadataBuilder; +use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; -use std::collections::HashMap; -use std::fmt::{Debug, Formatter}; -use std::net::ToSocketAddrs; use typed_builder::TypedBuilder; use volo_thrift::MaybeException; +use super::utils::*; +use crate::error::{from_io_error, from_thrift_error, from_thrift_exception}; + /// Which variant of the thrift transport to communicate with HMS /// See: #[derive(Debug, Default)] diff --git a/crates/catalog/hms/src/error.rs b/crates/catalog/hms/src/error.rs index cee5e462f..15da3eaf6 100644 --- a/crates/catalog/hms/src/error.rs +++ b/crates/catalog/hms/src/error.rs @@ -15,10 +15,11 @@ // specific language governing permissions and limitations // under the License. -use anyhow::anyhow; -use iceberg::{Error, ErrorKind}; use std::fmt::Debug; use std::io; + +use anyhow::anyhow; +use iceberg::{Error, ErrorKind}; use volo_thrift::MaybeException; /// Format a thrift error into iceberg error. diff --git a/crates/catalog/hms/src/schema.rs b/crates/catalog/hms/src/schema.rs index 77caaf715..fa7819d62 100644 --- a/crates/catalog/hms/src/schema.rs +++ b/crates/catalog/hms/src/schema.rs @@ -142,7 +142,8 @@ impl SchemaVisitor for HiveSchemaBuilder { #[cfg(test)] mod tests { - use iceberg::{spec::Schema, Result}; + use iceberg::spec::Schema; + use iceberg::Result; use super::*; diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs index baaa004ed..1e48d3fbd 100644 --- a/crates/catalog/hms/src/utils.rs +++ b/crates/catalog/hms/src/utils.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; + use chrono::Utc; use hive_metastore::{Database, PrincipalType, SerDeInfo, StorageDescriptor}; -use iceberg::{spec::Schema, Error, ErrorKind, Namespace, NamespaceIdent, Result}; +use iceberg::spec::Schema; +use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result}; use pilota::{AHashMap, FastStr}; -use std::collections::HashMap; use uuid::Uuid; use crate::schema::HiveSchemaBuilder; @@ -336,10 +338,8 @@ fn get_current_time() -> Result { #[cfg(test)] mod tests { - use iceberg::{ - spec::{NestedField, PrimitiveType, Type}, - Namespace, NamespaceIdent, - }; + use iceberg::spec::{NestedField, PrimitiveType, Type}; + use iceberg::{Namespace, NamespaceIdent}; use super::*; diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index a109757fe..e4974171f 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -235,10 +235,10 @@ async fn test_list_tables() -> Result<()> { catalog.create_table(ns.name(), creation).await?; let result = catalog.list_tables(ns.name()).await?; - assert_eq!( - result, - vec![TableIdent::new(ns.name().clone(), "my_table".to_string())] - ); + assert_eq!(result, vec![TableIdent::new( + ns.name().clone(), + "my_table".to_string() + )]); Ok(()) } diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 594e2a99c..aab615cd6 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -21,8 +21,16 @@ use std::collections::HashMap; use std::str::FromStr; use async_trait::async_trait; +use iceberg::io::FileIO; +use iceberg::table::Table; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, + TableIdent, +}; use itertools::Itertools; -use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue}; +use reqwest::header::{ + HeaderMap, HeaderName, HeaderValue, {self}, +}; use reqwest::{Method, StatusCode, Url}; use tokio::sync::OnceCell; use typed_builder::TypedBuilder; @@ -33,12 +41,6 @@ use crate::types::{ ListNamespaceResponse, ListTableResponse, LoadTableResponse, NamespaceSerde, RenameTableRequest, NO_CONTENT, OK, }; -use iceberg::io::FileIO; -use iceberg::table::Table; -use iceberg::Result; -use iceberg::{ - Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, -}; const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -670,6 +672,10 @@ impl Catalog for RestCatalog { #[cfg(test)] mod tests { + use std::fs::File; + use std::io::BufReader; + use std::sync::Arc; + use chrono::{TimeZone, Utc}; use iceberg::spec::{ FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, @@ -679,9 +685,6 @@ mod tests { use iceberg::transaction::Transaction; use mockito::{Mock, Server, ServerGuard}; use serde_json::json; - use std::fs::File; - use std::io::BufReader; - use std::sync::Arc; use uuid::uuid; use super::*; diff --git a/crates/catalog/rest/src/client.rs b/crates/catalog/rest/src/client.rs index 43e14c731..53dcd4cee 100644 --- a/crates/catalog/rest/src/client.rs +++ b/crates/catalog/rest/src/client.rs @@ -15,17 +15,18 @@ // specific language governing permissions and limitations // under the License. -use crate::types::{ErrorResponse, TokenResponse, OK}; -use crate::RestCatalogConfig; -use iceberg::Result; -use iceberg::{Error, ErrorKind}; -use reqwest::header::HeaderMap; -use reqwest::{Client, IntoUrl, Method, Request, RequestBuilder, Response}; -use serde::de::DeserializeOwned; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::sync::Mutex; +use iceberg::{Error, ErrorKind, Result}; +use reqwest::header::HeaderMap; +use reqwest::{Client, IntoUrl, Method, Request, RequestBuilder, Response}; +use serde::de::DeserializeOwned; + +use crate::types::{ErrorResponse, TokenResponse, OK}; +use crate::RestCatalogConfig; + pub(crate) struct HttpClient { client: Client, diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs index c8d704b26..11833a562 100644 --- a/crates/catalog/rest/src/types.rs +++ b/crates/catalog/rest/src/types.rs @@ -17,12 +17,11 @@ use std::collections::HashMap; -use serde_derive::{Deserialize, Serialize}; - use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec}; use iceberg::{ Error, ErrorKind, Namespace, NamespaceIdent, TableIdent, TableRequirement, TableUpdate, }; +use serde_derive::{Deserialize, Serialize}; pub(super) const OK: u16 = 200u16; pub(super) const NO_CONTENT: u16 = 204u16; diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index 621536a73..3c9ec6937 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -17,6 +17,9 @@ //! Integration tests for rest catalog. +use std::collections::HashMap; +use std::sync::RwLock; + use ctor::{ctor, dtor}; use iceberg::spec::{FormatVersion, NestedField, PrimitiveType, Schema, Type}; use iceberg::transaction::Transaction; @@ -25,8 +28,6 @@ use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; use iceberg_test_utils::docker::DockerCompose; use iceberg_test_utils::{normalize_test_name, set_up}; use port_scanner::scan_port_addr; -use std::collections::HashMap; -use std::sync::RwLock; use tokio::time::sleep; const REST_CATALOG_PORT: u16 = 8181; diff --git a/crates/examples/src/rest_catalog_namespace.rs b/crates/examples/src/rest_catalog_namespace.rs index 3716899cb..0a508a7d8 100644 --- a/crates/examples/src/rest_catalog_namespace.rs +++ b/crates/examples/src/rest_catalog_namespace.rs @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; + use iceberg::{Catalog, NamespaceIdent}; use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; -use std::collections::HashMap; #[tokio::main] async fn main() { diff --git a/crates/examples/src/rest_catalog_table.rs b/crates/examples/src/rest_catalog_table.rs index f25ce45f6..a0a672f15 100644 --- a/crates/examples/src/rest_catalog_table.rs +++ b/crates/examples/src/rest_catalog_table.rs @@ -15,10 +15,11 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; + use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::{Catalog, TableCreation, TableIdent}; use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; -use std::collections::HashMap; #[tokio::main] async fn main() { diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index f63817f9c..fe8e357c5 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -17,7 +17,11 @@ //! Parquet file data reader -use crate::error::Result; +use std::collections::{HashMap, HashSet}; +use std::ops::Range; +use std::str::FromStr; +use std::sync::Arc; + use arrow_arith::boolean::{and, is_not_null, is_null, not, or}; use arrow_array::{ArrayRef, BooleanArray, RecordBatch}; use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; @@ -34,12 +38,9 @@ use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader}; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY}; use parquet::file::metadata::ParquetMetaData; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; -use std::collections::{HashMap, HashSet}; -use std::ops::Range; -use std::str::FromStr; -use std::sync::Arc; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; +use crate::error::Result; use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; @@ -859,12 +860,13 @@ impl AsyncFileReader for ArrowFileReader { #[cfg(test)] mod tests { + use std::collections::HashSet; + use std::sync::Arc; + use crate::arrow::reader::CollectFieldIdVisitor; use crate::expr::visitors::bound_predicate_visitor::visit; use crate::expr::{Bind, Reference}; use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type}; - use std::collections::HashSet; - use std::sync::Arc; fn table_schema_simple() -> SchemaRef { Arc::new( diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index 3102f6d33..c92706953 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -17,12 +17,9 @@ //! Conversion between Arrow schema and Iceberg schema. -use crate::error::Result; -use crate::spec::{ - Datum, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral, PrimitiveType, Schema, - SchemaVisitor, StructType, Type, -}; -use crate::{Error, ErrorKind}; +use std::collections::HashMap; +use std::sync::Arc; + use arrow_array::types::{validate_decimal_precision_and_scale, Decimal128Type}; use arrow_array::{ BooleanArray, Datum as ArrowDatum, Float32Array, Float64Array, Int32Array, Int64Array, @@ -32,8 +29,13 @@ use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit}; use bitvec::macros::internal::funty::Fundamental; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use rust_decimal::prelude::ToPrimitive; -use std::collections::HashMap; -use std::sync::Arc; + +use crate::error::Result; +use crate::spec::{ + Datum, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral, PrimitiveType, Schema, + SchemaVisitor, StructType, Type, +}; +use crate::{Error, ErrorKind}; /// When iceberg map type convert to Arrow map type, the default map field name is "key_value". pub(crate) const DEFAULT_MAP_FIELD_NAME: &str = "key_value"; @@ -639,15 +641,14 @@ impl TryFrom<&crate::spec::Schema> for ArrowSchema { #[cfg(test)] mod tests { - use super::*; - use crate::spec::Schema; - use arrow_schema::DataType; - use arrow_schema::Field; - use arrow_schema::Schema as ArrowSchema; - use arrow_schema::TimeUnit; use std::collections::HashMap; use std::sync::Arc; + use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit}; + + use super::*; + use crate::spec::Schema; + fn arrow_schema_for_arrow_schema_to_schema_test() -> ArrowSchema { let fields = Fields::from(vec![ Field::new("key", DataType::Int32, false).with_metadata(HashMap::from([( diff --git a/crates/iceberg/src/avro/schema.rs b/crates/iceberg/src/avro/schema.rs index 4195d4f54..653f52aec 100644 --- a/crates/iceberg/src/avro/schema.rs +++ b/crates/iceberg/src/avro/schema.rs @@ -18,11 +18,6 @@ //! Conversion between iceberg and avro schema. use std::collections::BTreeMap; -use crate::spec::{ - visit_schema, ListType, MapType, NestedFieldRef, PrimitiveType, Schema, SchemaVisitor, - StructType, -}; -use crate::{Error, ErrorKind, Result}; use apache_avro::schema::{ DecimalSchema, FixedSchema, Name, RecordField as AvroRecordField, RecordFieldOrder, RecordSchema, UnionSchema, @@ -31,6 +26,12 @@ use apache_avro::Schema as AvroSchema; use itertools::{Either, Itertools}; use serde_json::{Number, Value}; +use crate::spec::{ + visit_schema, ListType, MapType, NestedFieldRef, PrimitiveType, Schema, SchemaVisitor, + StructType, +}; +use crate::{Error, ErrorKind, Result}; + const FILED_ID_PROP: &str = "field-id"; const UUID_BYTES: usize = 16; const UUID_LOGICAL_TYPE: &str = "uuid"; @@ -286,12 +287,14 @@ fn avro_optional(avro_schema: AvroSchema) -> Result { #[cfg(test)] mod tests { + use std::fs::read_to_string; + + use apache_avro::schema::{Namespace, UnionSchema}; + use apache_avro::Schema as AvroSchema; + use super::*; use crate::ensure_data_valid; use crate::spec::{ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type}; - use apache_avro::schema::{Namespace, UnionSchema}; - use apache_avro::Schema as AvroSchema; - use std::fs::read_to_string; fn is_avro_optional(avro_schema: &AvroSchema) -> bool { match avro_schema { diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index b49a80c84..5c63e1e77 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -17,21 +17,23 @@ //! Catalog API for Apache Iceberg -use crate::spec::{ - FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder, TableMetadataBuilder, - UnboundPartitionSpec, -}; -use crate::table::Table; -use crate::{Error, ErrorKind, Result}; -use async_trait::async_trait; -use serde_derive::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt::Debug; use std::mem::take; use std::ops::Deref; + +use async_trait::async_trait; +use serde_derive::{Deserialize, Serialize}; use typed_builder::TypedBuilder; use uuid::Uuid; +use crate::spec::{ + FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder, TableMetadataBuilder, + UnboundPartitionSpec, +}; +use crate::table::Table; +use crate::{Error, ErrorKind, Result}; + /// The catalog API for Iceberg Rust. #[async_trait] pub trait Catalog: Debug + Sync + Send { @@ -439,17 +441,19 @@ impl TableUpdate { #[cfg(test)] mod tests { + use std::collections::HashMap; + use std::fmt::Debug; + + use serde::de::DeserializeOwned; + use serde::Serialize; + use uuid::uuid; + use crate::spec::{ FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary, TableMetadataBuilder, Transform, Type, UnboundPartitionField, UnboundPartitionSpec, }; use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, TableUpdate}; - use serde::de::DeserializeOwned; - use serde::Serialize; - use std::collections::HashMap; - use std::fmt::Debug; - use uuid::uuid; #[test] fn test_create_table_id() { diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index f920fa99b..6270b4347 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -17,9 +17,7 @@ use std::backtrace::{Backtrace, BacktraceStatus}; use std::fmt; -use std::fmt::Debug; -use std::fmt::Display; -use std::fmt::Formatter; +use std::fmt::{Debug, Display, Formatter}; /// Result that is a wrapper of `Result` pub type Result = std::result::Result; diff --git a/crates/iceberg/src/expr/accessor.rs b/crates/iceberg/src/expr/accessor.rs index 2e2258fa6..51bfa7d39 100644 --- a/crates/iceberg/src/expr/accessor.rs +++ b/crates/iceberg/src/expr/accessor.rs @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. -use serde_derive::{Deserialize, Serialize}; use std::sync::Arc; +use serde_derive::{Deserialize, Serialize}; + use crate::spec::{Datum, Literal, PrimitiveType, Struct}; use crate::{Error, ErrorKind, Result}; diff --git a/crates/iceberg/src/expr/mod.rs b/crates/iceberg/src/expr/mod.rs index 16f75b090..5771aac5e 100644 --- a/crates/iceberg/src/expr/mod.rs +++ b/crates/iceberg/src/expr/mod.rs @@ -23,10 +23,11 @@ pub use term::*; pub(crate) mod accessor; mod predicate; pub(crate) mod visitors; +use std::fmt::{Display, Formatter}; + pub use predicate::*; use crate::spec::SchemaRef; -use std::fmt::{Display, Formatter}; /// Predicate operators used in expressions. /// diff --git a/crates/iceberg/src/expr/predicate.rs b/crates/iceberg/src/expr/predicate.rs index 270772615..acf21a5b1 100644 --- a/crates/iceberg/src/expr/predicate.rs +++ b/crates/iceberg/src/expr/predicate.rs @@ -40,18 +40,14 @@ pub struct LogicalExpression { impl Serialize for LogicalExpression { fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { + where S: serde::Serializer { self.inputs.serialize(serializer) } } impl<'de, T: Deserialize<'de>, const N: usize> Deserialize<'de> for LogicalExpression { fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { + where D: serde::Deserializer<'de> { let inputs = Vec::>::deserialize(deserializer)?; Ok(LogicalExpression::new( array_init::from_iter(inputs.into_iter()).ok_or_else(|| { @@ -85,8 +81,7 @@ impl LogicalExpression { } impl Bind for LogicalExpression -where - T::Bound: Sized, +where T::Bound: Sized { type Bound = LogicalExpression; @@ -478,10 +473,11 @@ impl Predicate { /// # Example /// /// ```rust + /// use std::ops::Bound::Unbounded; + /// /// use iceberg::expr::BoundPredicate::Unary; /// use iceberg::expr::Reference; /// use iceberg::spec::Datum; - /// use std::ops::Bound::Unbounded; /// let expr1 = Reference::new("a").less_than(Datum::long(10)); /// /// let expr2 = Reference::new("b").less_than(Datum::long(20)); @@ -505,10 +501,11 @@ impl Predicate { /// # Example /// /// ```rust + /// use std::ops::Bound::Unbounded; + /// /// use iceberg::expr::BoundPredicate::Unary; /// use iceberg::expr::Reference; /// use iceberg::spec::Datum; - /// use std::ops::Bound::Unbounded; /// let expr1 = Reference::new("a").less_than(Datum::long(10)); /// /// let expr2 = Reference::new("b").less_than(Datum::long(20)); @@ -534,10 +531,11 @@ impl Predicate { /// # Example /// /// ```rust + /// use std::ops::Bound::Unbounded; + /// /// use iceberg::expr::BoundPredicate::Unary; /// use iceberg::expr::{LogicalExpression, Predicate, Reference}; /// use iceberg::spec::Datum; - /// use std::ops::Bound::Unbounded; /// let expr1 = Reference::new("a").less_than(Datum::long(10)); /// let expr2 = Reference::new("b") /// .less_than(Datum::long(5)) @@ -587,9 +585,10 @@ impl Predicate { /// # Example /// /// ```rust + /// use std::ops::Not; + /// /// use iceberg::expr::{LogicalExpression, Predicate, Reference}; /// use iceberg::spec::Datum; - /// use std::ops::Not; /// /// let expression = Reference::new("a").less_than(Datum::long(5)).not(); /// let result = expression.rewrite_not(); @@ -634,10 +633,11 @@ impl Not for Predicate { /// # Example /// ///```rust + /// use std::ops::Bound::Unbounded; + /// /// use iceberg::expr::BoundPredicate::Unary; /// use iceberg::expr::Reference; /// use iceberg::spec::Datum; - /// use std::ops::Bound::Unbounded; /// let expr1 = Reference::new("a").less_than(Datum::long(10)); /// /// let expr = !expr1; @@ -707,10 +707,8 @@ mod tests { use std::sync::Arc; use crate::expr::Predicate::{AlwaysFalse, AlwaysTrue}; - use crate::expr::Reference; - use crate::expr::{Bind, BoundPredicate}; - use crate::spec::Datum; - use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type}; + use crate::expr::{Bind, BoundPredicate, Reference}; + use crate::spec::{Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type}; #[test] fn test_logical_or_rewrite_not() { diff --git a/crates/iceberg/src/expr/term.rs b/crates/iceberg/src/expr/term.rs index 00ace5717..f83cebd99 100644 --- a/crates/iceberg/src/expr/term.rs +++ b/crates/iceberg/src/expr/term.rs @@ -23,8 +23,9 @@ use fnv::FnvHashSet; use serde::{Deserialize, Serialize}; use crate::expr::accessor::{StructAccessor, StructAccessorRef}; -use crate::expr::Bind; -use crate::expr::{BinaryExpression, Predicate, PredicateOperator, SetExpression, UnaryExpression}; +use crate::expr::{ + BinaryExpression, Bind, Predicate, PredicateOperator, SetExpression, UnaryExpression, +}; use crate::spec::{Datum, NestedField, NestedFieldRef, SchemaRef}; use crate::{Error, ErrorKind}; diff --git a/crates/iceberg/src/expr/visitors/bound_predicate_visitor.rs b/crates/iceberg/src/expr/visitors/bound_predicate_visitor.rs index 31baf91a1..0858d1dcf 100644 --- a/crates/iceberg/src/expr/visitors/bound_predicate_visitor.rs +++ b/crates/iceberg/src/expr/visitors/bound_predicate_visitor.rs @@ -15,10 +15,11 @@ // specific language governing permissions and limitations // under the License. +use fnv::FnvHashSet; + use crate::expr::{BoundPredicate, BoundReference, PredicateOperator}; use crate::spec::Datum; use crate::Result; -use fnv::FnvHashSet; /// A visitor for [`BoundPredicate`]s. Visits in post-order. pub trait BoundPredicateVisitor { @@ -228,15 +229,17 @@ pub(crate) fn visit( #[cfg(test)] mod tests { + use std::ops::Not; + use std::sync::Arc; + + use fnv::FnvHashSet; + use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; use crate::expr::{ BinaryExpression, Bind, BoundPredicate, BoundReference, Predicate, PredicateOperator, Reference, SetExpression, UnaryExpression, }; use crate::spec::{Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type}; - use fnv::FnvHashSet; - use std::ops::Not; - use std::sync::Arc; struct TestEvaluator {} impl BoundPredicateVisitor for TestEvaluator { diff --git a/crates/iceberg/src/expr/visitors/expression_evaluator.rs b/crates/iceberg/src/expr/visitors/expression_evaluator.rs index 81e91f3ee..3700a9b3d 100644 --- a/crates/iceberg/src/expr/visitors/expression_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/expression_evaluator.rs @@ -17,13 +17,10 @@ use fnv::FnvHashSet; -use crate::{ - expr::{BoundPredicate, BoundReference}, - spec::{DataFile, Datum, PrimitiveLiteral, Struct}, - Error, ErrorKind, Result, -}; - use super::bound_predicate_visitor::{visit, BoundPredicateVisitor}; +use crate::expr::{BoundPredicate, BoundReference}; +use crate::spec::{DataFile, Datum, PrimitiveLiteral, Struct}; +use crate::{Error, ErrorKind, Result}; /// Evaluates a [`DataFile`]'s partition [`Struct`] to check /// if the partition tuples match the given [`BoundPredicate`]. @@ -248,25 +245,23 @@ impl BoundPredicateVisitor for ExpressionEvaluatorVisitor<'_> { #[cfg(test)] mod tests { - use std::{collections::HashMap, sync::Arc}; + use std::collections::HashMap; + use std::sync::Arc; use fnv::FnvHashSet; use predicate::SetExpression; - use crate::{ - expr::{ - predicate, visitors::inclusive_projection::InclusiveProjection, BinaryExpression, Bind, - BoundPredicate, Predicate, PredicateOperator, Reference, UnaryExpression, - }, - spec::{ - DataContentType, DataFile, DataFileFormat, Datum, Literal, NestedField, PartitionField, - PartitionSpec, PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform, - Type, - }, - Result, - }; - use super::ExpressionEvaluator; + use crate::expr::visitors::inclusive_projection::InclusiveProjection; + use crate::expr::{ + predicate, BinaryExpression, Bind, BoundPredicate, Predicate, PredicateOperator, Reference, + UnaryExpression, + }; + use crate::spec::{ + DataContentType, DataFile, DataFileFormat, Datum, Literal, NestedField, PartitionField, + PartitionSpec, PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform, Type, + }; + use crate::Result; fn create_schema_and_partition_spec( r#type: PrimitiveType, diff --git a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs index 8d45fa29d..430ebfc1a 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs @@ -15,11 +15,12 @@ // specific language governing permissions and limitations // under the License. +use fnv::FnvHashSet; + use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; use crate::expr::{BoundPredicate, BoundReference}; use crate::spec::{DataFile, Datum, PrimitiveLiteral}; use crate::{Error, ErrorKind}; -use fnv::FnvHashSet; const IN_PREDICATE_LIMIT: usize = 200; const ROWS_MIGHT_MATCH: crate::Result = Ok(true); @@ -478,6 +479,12 @@ impl BoundPredicateVisitor for InclusiveMetricsEvaluator<'_> { #[cfg(test)] mod test { + use std::collections::HashMap; + use std::ops::Not; + use std::sync::Arc; + + use fnv::FnvHashSet; + use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::PredicateOperator::{ Eq, GreaterThan, GreaterThanOrEq, In, IsNan, IsNull, LessThan, LessThanOrEq, NotEq, NotIn, @@ -491,10 +498,6 @@ mod test { DataContentType, DataFile, DataFileFormat, Datum, NestedField, PartitionField, PartitionSpec, PrimitiveType, Schema, Struct, Transform, Type, }; - use fnv::FnvHashSet; - use std::collections::HashMap; - use std::ops::Not; - use std::sync::Arc; const INT_MIN_VALUE: i32 = 30; const INT_MAX_VALUE: i32 = 79; diff --git a/crates/iceberg/src/expr/visitors/inclusive_projection.rs b/crates/iceberg/src/expr/visitors/inclusive_projection.rs index 0014b4976..9cfbb4fd8 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_projection.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_projection.rs @@ -15,12 +15,14 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; + +use fnv::FnvHashSet; + use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; use crate::expr::{BoundPredicate, BoundReference, Predicate}; use crate::spec::{Datum, PartitionField, PartitionSpecRef}; use crate::Error; -use fnv::FnvHashSet; -use std::collections::HashMap; pub(crate) struct InclusiveProjection { partition_spec: PartitionSpecRef, @@ -228,12 +230,13 @@ impl BoundPredicateVisitor for InclusiveProjection { #[cfg(test)] mod tests { + use std::sync::Arc; + use crate::expr::visitors::inclusive_projection::InclusiveProjection; use crate::expr::{Bind, Predicate, Reference}; use crate::spec::{ Datum, NestedField, PartitionField, PartitionSpec, PrimitiveType, Schema, Transform, Type, }; - use std::sync::Arc; fn build_test_schema() -> Schema { Schema::builder() diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs index eb770ea2c..3554d57a0 100644 --- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. +use fnv::FnvHashSet; + use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; use crate::expr::{BoundPredicate, BoundReference}; use crate::spec::{Datum, FieldSummary, ManifestFile, PrimitiveLiteral, Type}; -use crate::Result; -use crate::{Error, ErrorKind}; -use fnv::FnvHashSet; +use crate::{Error, ErrorKind, Result}; /// Evaluates a [`ManifestFile`] to see if the partition summaries /// match a provided [`BoundPredicate`]. @@ -418,6 +418,11 @@ impl ManifestFilterVisitor<'_> { #[cfg(test)] mod test { + use std::ops::Not; + use std::sync::Arc; + + use fnv::FnvHashSet; + use crate::expr::visitors::manifest_evaluator::ManifestEvaluator; use crate::expr::{ BinaryExpression, Bind, Predicate, PredicateOperator, Reference, SetExpression, @@ -428,9 +433,6 @@ mod test { SchemaRef, Type, }; use crate::Result; - use fnv::FnvHashSet; - use std::ops::Not; - use std::sync::Arc; const INT_MIN_VALUE: i32 = 30; const INT_MAX_VALUE: i32 = 79; diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 54b2cd487..9af398270 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -15,15 +15,17 @@ // specific language governing permissions and limitations // under the License. -use super::storage::Storage; -use crate::{Error, ErrorKind, Result}; -use bytes::Bytes; -use opendal::Operator; use std::collections::HashMap; use std::ops::Range; use std::sync::Arc; + +use bytes::Bytes; +use opendal::Operator; use url::Url; +use super::storage::Storage; +use crate::{Error, ErrorKind, Result}; + /// FileIO implementation, used to manipulate files in underlying storage. /// /// # Note @@ -326,13 +328,12 @@ impl OutputFile { #[cfg(test)] mod tests { + use std::fs::File; use std::io::Write; - - use std::{fs::File, path::Path}; + use std::path::Path; use futures::io::AllowStdIo; use futures::AsyncReadExt; - use tempfile::TempDir; use super::{FileIO, FileIOBuilder}; diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index ad0dcb457..7383b8f1b 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use opendal::{Operator, Scheme}; + use super::FileIOBuilder; #[cfg(feature = "storage-fs")] use super::FsConfig; @@ -23,7 +25,6 @@ use super::MemoryConfig; #[cfg(feature = "storage-s3")] use super::S3Config; use crate::{Error, ErrorKind}; -use opendal::{Operator, Scheme}; /// The storage carries all supported storage services in iceberg #[derive(Debug)] diff --git a/crates/iceberg/src/io/storage_fs.rs b/crates/iceberg/src/io/storage_fs.rs index 38c3fa129..0dc5b9dea 100644 --- a/crates/iceberg/src/io/storage_fs.rs +++ b/crates/iceberg/src/io/storage_fs.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. -use crate::Result; -use opendal::{Operator, Scheme}; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; +use opendal::{Operator, Scheme}; + +use crate::Result; + /// # TODO /// /// opendal has a plan to introduce native config support. diff --git a/crates/iceberg/src/io/storage_memory.rs b/crates/iceberg/src/io/storage_memory.rs index eca39a7e8..ed0cfadfe 100644 --- a/crates/iceberg/src/io/storage_memory.rs +++ b/crates/iceberg/src/io/storage_memory.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. -use crate::Result; -use opendal::{Operator, Scheme}; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; +use opendal::{Operator, Scheme}; + +use crate::Result; + #[derive(Default, Clone)] pub(crate) struct MemoryConfig {} diff --git a/crates/iceberg/src/io/storage_s3.rs b/crates/iceberg/src/io/storage_s3.rs index d001e06cd..df843188f 100644 --- a/crates/iceberg/src/io/storage_s3.rs +++ b/crates/iceberg/src/io/storage_s3.rs @@ -15,13 +15,15 @@ // specific language governing permissions and limitations // under the License. -use crate::io::storage::redact_secret; -use crate::{Error, ErrorKind, Result}; -use opendal::{Operator, Scheme}; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; + +use opendal::{Operator, Scheme}; use url::Url; +use crate::io::storage::redact_secret; +use crate::{Error, ErrorKind, Result}; + /// Following are arguments for [s3 file io](https://py.iceberg.apache.org/configuration/#s3). /// S3 endpoint. pub const S3_ENDPOINT: &str = "s3.endpoint"; diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 3985884c0..35d59323c 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -23,20 +23,14 @@ extern crate derive_builder; mod error; -pub use error::Error; -pub use error::ErrorKind; -pub use error::Result; +pub use error::{Error, ErrorKind, Result}; mod catalog; -pub use catalog::Catalog; -pub use catalog::Namespace; -pub use catalog::NamespaceIdent; -pub use catalog::TableCommit; -pub use catalog::TableCreation; -pub use catalog::TableIdent; -pub use catalog::TableRequirement; -pub use catalog::TableUpdate; +pub use catalog::{ + Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, + TableUpdate, +}; pub mod table; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 730e3dadf..18489b721 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -17,6 +17,16 @@ //! Table scan api. +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::RecordBatch; +use async_stream::try_stream; +use futures::stream::BoxStream; +use futures::StreamExt; +use serde::{Deserialize, Serialize}; + use crate::arrow::ArrowReaderBuilder; use crate::expr::visitors::expression_evaluator::ExpressionEvaluator; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; @@ -30,14 +40,6 @@ use crate::spec::{ }; use crate::table::Table; use crate::{Error, ErrorKind, Result}; -use arrow_array::RecordBatch; -use async_stream::try_stream; -use futures::stream::BoxStream; -use futures::StreamExt; -use serde::{Deserialize, Serialize}; -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::sync::Arc; /// A stream of [`FileScanTask`]. pub type FileScanTaskStream = BoxStream<'static, Result>; @@ -528,6 +530,20 @@ impl FileScanTask { #[cfg(test)] mod tests { + use std::collections::HashMap; + use std::fs; + use std::fs::File; + use std::sync::Arc; + + use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray}; + use futures::{stream, TryStreamExt}; + use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; + use parquet::basic::Compression; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + use tera::{Context, Tera}; + use uuid::Uuid; + use crate::arrow::ArrowReaderBuilder; use crate::expr::{BoundPredicate, Reference}; use crate::io::{FileIO, OutputFile}; @@ -540,18 +556,6 @@ mod tests { }; use crate::table::Table; use crate::TableIdent; - use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray}; - use futures::{stream, TryStreamExt}; - use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; - use parquet::basic::Compression; - use parquet::file::properties::WriterProperties; - use std::collections::HashMap; - use std::fs; - use std::fs::File; - use std::sync::Arc; - use tempfile::TempDir; - use tera::{Context, Tera}; - use uuid::Uuid; struct TableTestFixture { table_location: String, diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index a79b8f57a..cb51db6fc 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -18,19 +18,21 @@ /*! * Data Types */ -use crate::ensure_data_valid; -use crate::error::Result; -use crate::spec::datatypes::_decimal::{MAX_PRECISION, REQUIRED_LENGTH}; +use std::collections::HashMap; +use std::convert::identity; +use std::fmt; +use std::ops::Index; +use std::sync::{Arc, OnceLock}; + use ::serde::de::{MapAccess, Visitor}; use serde::de::{Error, IntoDeserializer}; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use serde_json::Value as JsonValue; -use std::convert::identity; -use std::sync::Arc; -use std::sync::OnceLock; -use std::{collections::HashMap, fmt, ops::Index}; use super::values::Literal; +use crate::ensure_data_valid; +use crate::error::Result; +use crate::spec::datatypes::_decimal::{MAX_PRECISION, REQUIRED_LENGTH}; /// Field name for list type. pub(crate) const LIST_FILED_NAME: &str = "element"; @@ -234,9 +236,7 @@ pub enum PrimitiveType { impl Serialize for Type { fn serialize(&self, serializer: S) -> std::result::Result - where - S: Serializer, - { + where S: Serializer { let type_serde = _serde::SerdeType::from(self); type_serde.serialize(serializer) } @@ -244,9 +244,7 @@ impl Serialize for Type { impl<'de> Deserialize<'de> for Type { fn deserialize(deserializer: D) -> std::result::Result - where - D: Deserializer<'de>, - { + where D: Deserializer<'de> { let type_serde = _serde::SerdeType::deserialize(deserializer)?; Ok(Type::from(type_serde)) } @@ -254,9 +252,7 @@ impl<'de> Deserialize<'de> for Type { impl<'de> Deserialize<'de> for PrimitiveType { fn deserialize(deserializer: D) -> std::result::Result - where - D: Deserializer<'de>, - { + where D: Deserializer<'de> { let s = String::deserialize(deserializer)?; if s.starts_with("decimal") { deserialize_decimal(s.into_deserializer()) @@ -270,9 +266,7 @@ impl<'de> Deserialize<'de> for PrimitiveType { impl Serialize for PrimitiveType { fn serialize(&self, serializer: S) -> std::result::Result - where - S: Serializer, - { + where S: Serializer { match self { PrimitiveType::Decimal { precision, scale } => { serialize_decimal(precision, scale, serializer) @@ -284,9 +278,7 @@ impl Serialize for PrimitiveType { } fn deserialize_decimal<'de, D>(deserializer: D) -> std::result::Result -where - D: Deserializer<'de>, -{ +where D: Deserializer<'de> { let s = String::deserialize(deserializer)?; let (precision, scale) = s .trim_start_matches(r"decimal(") @@ -312,9 +304,7 @@ where } fn deserialize_fixed<'de, D>(deserializer: D) -> std::result::Result -where - D: Deserializer<'de>, -{ +where D: Deserializer<'de> { let fixed = String::deserialize(deserializer)? .trim_start_matches(r"fixed[") .trim_end_matches(']') @@ -327,9 +317,7 @@ where } fn serialize_fixed(value: &u64, serializer: S) -> std::result::Result -where - S: Serializer, -{ +where S: Serializer { serializer.serialize_str(&format!("fixed[{value}]")) } @@ -371,9 +359,7 @@ pub struct StructType { impl<'de> Deserialize<'de> for StructType { fn deserialize(deserializer: D) -> std::result::Result - where - D: Deserializer<'de>, - { + where D: Deserializer<'de> { #[derive(Deserialize)] #[serde(field_identifier, rename_all = "lowercase")] enum Field { @@ -391,9 +377,7 @@ impl<'de> Deserialize<'de> for StructType { } fn visit_map(self, mut map: V) -> std::result::Result - where - V: MapAccess<'de>, - { + where V: MapAccess<'de> { let mut fields = None; while let Some(key) = map.next_key()? { match key { @@ -675,12 +659,14 @@ impl ListType { /// Module for type serialization/deserialization. pub(super) mod _serde { + use std::borrow::Cow; + + use serde_derive::{Deserialize, Serialize}; + use crate::spec::datatypes::Type::Map; use crate::spec::datatypes::{ ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, StructType, Type, }; - use serde_derive::{Deserialize, Serialize}; - use std::borrow::Cow; /// List type for serialization and deserialization #[derive(Serialize, Deserialize)] @@ -803,9 +789,8 @@ mod tests { use pretty_assertions::assert_eq; use uuid::Uuid; - use crate::spec::values::PrimitiveLiteral; - use super::*; + use crate::spec::values::PrimitiveLiteral; fn check_type_serde(json: &str, expected_type: Type) { let desered_type: Type = serde_json::from_str(json).unwrap(); diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index f4b933175..e08591f9e 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -16,25 +16,25 @@ // under the License. //! Manifest for Iceberg. -use self::_const_schema::{manifest_schema_v1, manifest_schema_v2}; +use std::cmp::min; +use std::collections::HashMap; +use std::str::FromStr; +use std::sync::Arc; + +use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as AvroWriter}; +use bytes::Bytes; +use serde_json::to_vec; +use typed_builder::TypedBuilder; -use super::UNASSIGNED_SEQUENCE_NUMBER; +use self::_const_schema::{manifest_schema_v1, manifest_schema_v2}; use super::{ Datum, FieldSummary, FormatVersion, ManifestContentType, ManifestFile, PartitionSpec, Schema, - SchemaId, Struct, INITIAL_SEQUENCE_NUMBER, + SchemaId, Struct, INITIAL_SEQUENCE_NUMBER, UNASSIGNED_SEQUENCE_NUMBER, }; use crate::error::Result; use crate::io::OutputFile; use crate::spec::PartitionField; use crate::{Error, ErrorKind}; -use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as AvroWriter}; -use bytes::Bytes; -use serde_json::to_vec; -use std::cmp::min; -use std::collections::HashMap; -use std::str::FromStr; -use std::sync::Arc; -use typed_builder::TypedBuilder; /// A manifest contains metadata and a list of entries. #[derive(Debug, PartialEq, Eq, Clone)] @@ -325,13 +325,11 @@ mod _const_schema { use apache_avro::Schema as AvroSchema; use once_cell::sync::Lazy; - use crate::{ - avro::schema_to_avro_schema, - spec::{ - ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type, - }, - Error, + use crate::avro::schema_to_avro_schema; + use crate::spec::{ + ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type, }; + use crate::Error; static STATUS: Lazy = { Lazy::new(|| { @@ -1206,17 +1204,9 @@ mod _serde { use serde_derive::{Deserialize, Serialize}; use serde_with::serde_as; - use crate::spec::Datum; - use crate::spec::Literal; - use crate::spec::RawLiteral; - use crate::spec::Schema; - use crate::spec::Struct; - use crate::spec::StructType; - use crate::spec::Type; - use crate::Error; - use crate::ErrorKind; - use super::ManifestEntry; + use crate::spec::{Datum, Literal, RawLiteral, Schema, Struct, StructType, Type}; + use crate::{Error, ErrorKind}; #[derive(Serialize, Deserialize)] pub(super) struct ManifestEntryV2 { @@ -1478,15 +1468,16 @@ mod _serde { #[cfg(test)] mod tests { - use crate::spec::manifest::_serde::{parse_i64_entry, I64Entry}; use std::collections::HashMap; + use crate::spec::manifest::_serde::{parse_i64_entry, I64Entry}; + #[test] fn test_parse_negative_manifest_entry() { - let entries = vec![ - I64Entry { key: 1, value: -1 }, - I64Entry { key: 2, value: 3 }, - ]; + let entries = vec![I64Entry { key: 1, value: -1 }, I64Entry { + key: 2, + value: 3, + }]; let ret = parse_i64_entry(entries).unwrap(); @@ -1499,18 +1490,13 @@ mod _serde { #[cfg(test)] mod tests { use std::fs; + use std::sync::Arc; use tempfile::TempDir; use super::*; use crate::io::FileIOBuilder; - use crate::spec::Literal; - use crate::spec::NestedField; - use crate::spec::PrimitiveType; - use crate::spec::Struct; - use crate::spec::Transform; - use crate::spec::Type; - use std::sync::Arc; + use crate::spec::{Literal, NestedField, PrimitiveType, Struct, Transform, Type}; #[tokio::test] async fn test_parse_manifest_v2_unpartition() { diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 688bdef7d..e81889068 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -17,20 +17,19 @@ //! ManifestList for Iceberg. -use std::{collections::HashMap, str::FromStr}; +use std::collections::HashMap; +use std::str::FromStr; -use crate::io::FileIO; -use crate::{io::OutputFile, Error, ErrorKind}; -use apache_avro::{from_value, types::Value, Reader, Writer}; +use apache_avro::types::Value; +use apache_avro::{from_value, Reader, Writer}; use bytes::Bytes; -use self::{ - _const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEMA_V2}, - _serde::{ManifestFileV1, ManifestFileV2}, -}; - +use self::_const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEMA_V2}; +use self::_serde::{ManifestFileV1, ManifestFileV2}; use super::{Datum, FormatVersion, Manifest, StructType}; use crate::error::Result; +use crate::io::{FileIO, OutputFile}; +use crate::{Error, ErrorKind}; /// Placeholder for sequence number. The field with this value must be replaced with the actual sequence number before it write. pub const UNASSIGNED_SEQUENCE_NUMBER: i64 = -1; @@ -225,9 +224,9 @@ mod _const_schema { use apache_avro::Schema as AvroSchema; use once_cell::sync::Lazy; - use crate::{ - avro::schema_to_avro_schema, - spec::{ListType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type}, + use crate::avro::schema_to_avro_schema; + use crate::spec::{ + ListType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type, }; static MANIFEST_PATH: Lazy = { @@ -674,15 +673,13 @@ pub struct FieldSummary { /// and then converted into the [ManifestFile] struct. Serialization works the other way around. /// [ManifestFileV1] and [ManifestFileV2] are internal struct that are only used for serialization and deserialization. pub(super) mod _serde { - use crate::{ - spec::{Datum, PrimitiveType, StructType}, - Error, - }; pub use serde_bytes::ByteBuf; use serde_derive::{Deserialize, Serialize}; use super::ManifestFile; use crate::error::Result; + use crate::spec::{Datum, PrimitiveType, StructType}; + use crate::Error; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(transparent)] @@ -1096,20 +1093,20 @@ pub(super) mod _serde { #[cfg(test)] mod test { + use std::collections::HashMap; + use std::fs; + use std::sync::Arc; + use apache_avro::{Reader, Schema}; - use std::{collections::HashMap, fs, sync::Arc}; use tempfile::TempDir; - use crate::{ - io::FileIOBuilder, - spec::{ - manifest_list::_serde::ManifestListV1, Datum, FieldSummary, ManifestContentType, - ManifestFile, ManifestList, ManifestListWriter, NestedField, PrimitiveType, StructType, - Type, UNASSIGNED_SEQUENCE_NUMBER, - }, - }; - use super::_serde::ManifestListV2; + use crate::io::FileIOBuilder; + use crate::spec::manifest_list::_serde::ManifestListV1; + use crate::spec::{ + Datum, FieldSummary, ManifestContentType, ManifestFile, ManifestList, ManifestListWriter, + NestedField, PrimitiveType, StructType, Type, UNASSIGNED_SEQUENCE_NUMBER, + }; #[tokio::test] async fn test_parse_manifest_list_v1() { diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index 5b5e1ce0b..f1244e4e9 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -18,14 +18,15 @@ /*! * Partitioning */ -use serde::{Deserialize, Serialize}; use std::sync::Arc; + +use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; +use super::transform::Transform; +use super::{NestedField, Schema, StructType}; use crate::{Error, ErrorKind}; -use super::{transform::Transform, NestedField, Schema, StructType}; - /// Reference to [`PartitionSpec`]. pub type PartitionSpecRef = Arc; /// Partition fields capture the transform from table data to partition values. @@ -137,9 +138,8 @@ impl UnboundPartitionSpec { #[cfg(test)] mod tests { - use crate::spec::Type; - use super::*; + use crate::spec::Type; #[test] fn test_partition_spec() { diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index c76701b0c..106bfb1d8 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -17,23 +17,23 @@ //! This module defines schema in iceberg. -use crate::error::Result; -use crate::expr::accessor::StructAccessor; -use crate::spec::datatypes::{ - ListType, MapType, NestedFieldRef, PrimitiveType, StructType, Type, LIST_FILED_NAME, - MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, -}; -use crate::{ensure_data_valid, Error, ErrorKind}; -use bimap::BiHashMap; -use itertools::{zip_eq, Itertools}; -use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::fmt::{Display, Formatter}; use std::sync::Arc; use _serde::SchemaEnum; +use bimap::BiHashMap; +use itertools::{zip_eq, Itertools}; +use serde::{Deserialize, Serialize}; use super::NestedField; +use crate::error::Result; +use crate::expr::accessor::StructAccessor; +use crate::spec::datatypes::{ + ListType, MapType, NestedFieldRef, PrimitiveType, StructType, Type, LIST_FILED_NAME, + MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, +}; +use crate::{ensure_data_valid, Error, ErrorKind}; /// Type alias for schema id. pub type SchemaId = i32; @@ -949,11 +949,16 @@ pub(super) mod _serde { /// For deserialization the input first gets read into either the [SchemaV1] or [SchemaV2] struct /// and then converted into the [Schema] struct. Serialization works the other way around. /// [SchemaV1] and [SchemaV2] are internal struct that are only used for serialization and deserialization. - use serde::{Deserialize, Serialize}; - - use crate::{spec::StructType, Error, Result}; + use serde::Deserialize; + /// This is a helper module that defines types to help with serialization/deserialization. + /// For deserialization the input first gets read into either the [SchemaV1] or [SchemaV2] struct + /// and then converted into the [Schema] struct. Serialization works the other way around. + /// [SchemaV1] and [SchemaV2] are internal struct that are only used for serialization and deserialization. + use serde::Serialize; use super::{Schema, DEFAULT_SCHEMA_ID}; + use crate::spec::StructType; + use crate::{Error, Result}; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(untagged)] @@ -1056,6 +1061,9 @@ pub(super) mod _serde { #[cfg(test)] mod tests { + use std::collections::{HashMap, HashSet}; + + use super::DEFAULT_SCHEMA_ID; use crate::spec::datatypes::Type::{List, Map, Primitive, Struct}; use crate::spec::datatypes::{ ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, StructType, Type, @@ -1064,9 +1072,6 @@ mod tests { use crate::spec::schema::_serde::{SchemaEnum, SchemaV1, SchemaV2}; use crate::spec::values::Map as MapValue; use crate::spec::{prune_columns, Datum, Literal}; - use std::collections::{HashMap, HashSet}; - - use super::DEFAULT_SCHEMA_ID; fn check_schema_serde(json: &str, expected_type: Schema, _expected_enum: SchemaEnum) { let desered_type: Schema = serde_json::from_str(json).unwrap(); diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 72df801ae..704a43b5f 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -18,18 +18,19 @@ /*! * Snapshots */ -use crate::error::Result; -use chrono::{DateTime, TimeZone, Utc}; -use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; + +use _serde::SnapshotV2; +use chrono::{DateTime, TimeZone, Utc}; +use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; use super::table_metadata::SnapshotLog; +use crate::error::Result; use crate::io::FileIO; use crate::spec::{ManifestList, SchemaId, SchemaRef, StructType, TableMetadata}; use crate::{Error, ErrorKind}; -use _serde::SnapshotV2; /// Reference to [`Snapshot`]. pub type SnapshotRef = Arc; @@ -200,11 +201,10 @@ pub(super) mod _serde { use serde::{Deserialize, Serialize}; + use super::{Operation, Snapshot, Summary}; use crate::spec::SchemaId; use crate::Error; - use super::{Operation, Snapshot, Summary}; - #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] /// Defines the structure of a v2 snapshot for serialization/deserialization @@ -358,10 +358,12 @@ pub enum SnapshotRetention { #[cfg(test)] mod tests { - use chrono::{TimeZone, Utc}; use std::collections::HashMap; - use crate::spec::snapshot::{Operation, Snapshot, Summary, _serde::SnapshotV1}; + use chrono::{TimeZone, Utc}; + + use crate::spec::snapshot::_serde::SnapshotV1; + use crate::spec::snapshot::{Operation, Snapshot, Summary}; #[test] fn schema() { diff --git a/crates/iceberg/src/spec/sort.rs b/crates/iceberg/src/spec/sort.rs index a1311f7bf..82909344a 100644 --- a/crates/iceberg/src/spec/sort.rs +++ b/crates/iceberg/src/spec/sort.rs @@ -18,16 +18,17 @@ /*! * Sorting */ -use crate::error::Result; -use crate::spec::Schema; -use crate::{Error, ErrorKind}; use core::fmt; -use serde::{Deserialize, Serialize}; use std::fmt::Formatter; use std::sync::Arc; + +use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; use super::transform::Transform; +use crate::error::Result; +use crate::spec::Schema; +use crate::{Error, ErrorKind}; /// Reference to [`SortOrder`]. pub type SortOrderRef = Arc; diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 4892c2623..d9a09d860 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -18,24 +18,23 @@ //! Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata). //! The main struct here is [TableMetadataV2] which defines the data for a table. -use serde::{Deserialize, Serialize}; -use serde_repr::{Deserialize_repr, Serialize_repr}; use std::cmp::Ordering; +use std::collections::HashMap; use std::fmt::{Display, Formatter}; -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; + +use _serde::TableMetadataEnum; +use chrono::{DateTime, TimeZone, Utc}; +use serde::{Deserialize, Serialize}; +use serde_repr::{Deserialize_repr, Serialize_repr}; use uuid::Uuid; +use super::snapshot::{Snapshot, SnapshotReference, SnapshotRetention}; use super::{ - snapshot::{Snapshot, SnapshotReference, SnapshotRetention}, - PartitionSpecRef, SchemaId, SchemaRef, SnapshotRef, SortOrderRef, + PartitionSpec, PartitionSpecRef, SchemaId, SchemaRef, SnapshotRef, SortOrder, SortOrderRef, }; -use super::{PartitionSpec, SortOrder}; - -use _serde::TableMetadataEnum; - use crate::error::Result; use crate::{Error, ErrorKind, TableCreation}; -use chrono::{DateTime, TimeZone, Utc}; static MAIN_BRANCH: &str = "main"; static DEFAULT_SPEC_ID: i32 = 0; @@ -262,14 +261,11 @@ impl TableMetadata { s.snapshot_id = snapshot.snapshot_id(); }) .or_insert_with(|| { - SnapshotReference::new( - snapshot.snapshot_id(), - SnapshotRetention::Branch { - min_snapshots_to_keep: None, - max_snapshot_age_ms: None, - max_ref_age_ms: None, - }, - ) + SnapshotReference::new(snapshot.snapshot_id(), SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }) }); self.snapshot_log.push(snapshot.log()); @@ -377,26 +373,28 @@ pub(super) mod _serde { /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct /// and then converted into the [TableMetadata] struct. Serialization works the other way around. /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization. - use std::{collections::HashMap, sync::Arc}; + use std::collections::HashMap; + /// This is a helper module that defines types to help with serialization/deserialization. + /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct + /// and then converted into the [TableMetadata] struct. Serialization works the other way around. + /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization. + use std::sync::Arc; use itertools::Itertools; use serde::{Deserialize, Serialize}; use uuid::Uuid; - use crate::spec::{Snapshot, EMPTY_SNAPSHOT_ID}; - use crate::{ - spec::{ - schema::_serde::{SchemaV1, SchemaV2}, - snapshot::_serde::{SnapshotV1, SnapshotV2}, - PartitionField, PartitionSpec, Schema, SnapshotReference, SnapshotRetention, SortOrder, - }, - Error, ErrorKind, - }; - use super::{ FormatVersion, MetadataLog, SnapshotLog, TableMetadata, DEFAULT_SORT_ORDER_ID, DEFAULT_SPEC_ID, MAIN_BRANCH, }; + use crate::spec::schema::_serde::{SchemaV1, SchemaV2}; + use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2}; + use crate::spec::{ + PartitionField, PartitionSpec, Schema, Snapshot, SnapshotReference, SnapshotRetention, + SortOrder, EMPTY_SNAPSHOT_ID, + }; + use crate::{Error, ErrorKind}; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(untagged)] @@ -478,9 +476,7 @@ pub(super) mod _serde { impl Serialize for TableMetadata { fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { + where S: serde::Serializer { // we must do a clone here let table_metadata_enum: TableMetadataEnum = self.clone().try_into().map_err(serde::ser::Error::custom)?; @@ -491,18 +487,14 @@ pub(super) mod _serde { impl Serialize for VersionNumber { fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { + where S: serde::Serializer { serializer.serialize_u8(V) } } impl<'de, const V: u8> Deserialize<'de> for VersionNumber { fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { + where D: serde::Deserializer<'de> { let value = u8::deserialize(deserializer)?; if value == V { Ok(VersionNumber::) @@ -597,17 +589,14 @@ pub(super) mod _serde { default_sort_order_id: value.default_sort_order_id, refs: value.refs.unwrap_or_else(|| { if let Some(snapshot_id) = current_snapshot_id { - HashMap::from_iter(vec![( - MAIN_BRANCH.to_string(), - SnapshotReference { - snapshot_id, - retention: SnapshotRetention::Branch { - min_snapshots_to_keep: None, - max_snapshot_age_ms: None, - max_ref_age_ms: None, - }, + HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference { + snapshot_id, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, }, - )]) + })]) } else { HashMap::new() } @@ -705,17 +694,14 @@ pub(super) mod _serde { None => HashMap::new(), }, default_sort_order_id: value.default_sort_order_id.unwrap_or(DEFAULT_SORT_ORDER_ID), - refs: HashMap::from_iter(vec![( - MAIN_BRANCH.to_string(), - SnapshotReference { - snapshot_id: value.current_snapshot_id.unwrap_or_default(), - retention: SnapshotRetention::Branch { - min_snapshots_to_keep: None, - max_snapshot_age_ms: None, - max_ref_age_ms: None, - }, + refs: HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference { + snapshot_id: value.current_snapshot_id.unwrap_or_default(), + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, }, - )]), + })]), }) } } @@ -924,23 +910,22 @@ impl SnapshotLog { #[cfg(test)] mod tests { - use std::{collections::HashMap, fs, sync::Arc}; + use std::collections::HashMap; + use std::fs; + use std::sync::Arc; use anyhow::Result; - use uuid::Uuid; - use pretty_assertions::assert_eq; - - use crate::{ - spec::{ - table_metadata::TableMetadata, NestedField, NullOrder, Operation, PartitionField, - PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, - SortDirection, SortField, SortOrder, Summary, Transform, Type, - }, - TableCreation, - }; + use uuid::Uuid; use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder}; + use crate::spec::table_metadata::TableMetadata; + use crate::spec::{ + NestedField, NullOrder, Operation, PartitionField, PartitionSpec, PrimitiveType, Schema, + Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, + Summary, Transform, Type, + }; + use crate::TableCreation; fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) { let desered_type: TableMetadata = serde_json::from_str(json).unwrap(); @@ -1378,17 +1363,14 @@ mod tests { }, ], metadata_log: Vec::new(), - refs: HashMap::from_iter(vec![( - "main".to_string(), - SnapshotReference { - snapshot_id: 3055729675574597004, - retention: SnapshotRetention::Branch { - min_snapshots_to_keep: None, - max_snapshot_age_ms: None, - max_ref_age_ms: None, - }, + refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { + snapshot_id: 3055729675574597004, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, }, - )]), + })]), }; check_table_metadata_serde(&metadata, expected); @@ -1529,17 +1511,14 @@ mod tests { properties: HashMap::new(), snapshot_log: vec![], metadata_log: Vec::new(), - refs: HashMap::from_iter(vec![( - "main".to_string(), - SnapshotReference { - snapshot_id: -1, - retention: SnapshotRetention::Branch { - min_snapshots_to_keep: None, - max_snapshot_age_ms: None, - max_ref_age_ms: None, - }, + refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { + snapshot_id: -1, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, }, - )]), + })]), }; check_table_metadata_serde(&metadata, expected); diff --git a/crates/iceberg/src/spec/transform.rs b/crates/iceberg/src/spec/transform.rs index 03c12c7ea..54e2105ff 100644 --- a/crates/iceberg/src/spec/transform.rs +++ b/crates/iceberg/src/spec/transform.rs @@ -17,6 +17,13 @@ //! Transforms in iceberg. +use std::fmt::{Display, Formatter}; +use std::str::FromStr; + +use fnv::FnvHashSet; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +use super::{Datum, PrimitiveLiteral}; use crate::error::{Error, Result}; use crate::expr::{ BinaryExpression, BoundPredicate, BoundReference, Predicate, PredicateOperator, Reference, @@ -25,12 +32,6 @@ use crate::expr::{ use crate::spec::datatypes::{PrimitiveType, Type}; use crate::transform::{create_transform_function, BoxedTransformFunction}; use crate::ErrorKind; -use fnv::FnvHashSet; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use std::fmt::{Display, Formatter}; -use std::str::FromStr; - -use super::{Datum, PrimitiveLiteral}; /// Transform is used to transform predicates to partition predicates, /// in addition to transforming data values. @@ -664,18 +665,14 @@ impl FromStr for Transform { impl Serialize for Transform { fn serialize(&self, serializer: S) -> std::result::Result - where - S: Serializer, - { + where S: Serializer { serializer.serialize_str(format!("{self}").as_str()) } } impl<'de> Deserialize<'de> for Transform { fn deserialize(deserializer: D) -> std::result::Result - where - D: Deserializer<'de>, - { + where D: Deserializer<'de> { let s = String::deserialize(deserializer)?; s.parse().map_err(::custom) } diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index c9a99a0f8..a8817fbbb 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -26,19 +26,21 @@ use std::hash::Hash; use std::ops::Index; use std::str::FromStr; +pub use _serde::RawLiteral; use bitvec::vec::BitVec; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc}; use ordered_float::OrderedFloat; use rust_decimal::Decimal; -use serde::de::{self, MapAccess}; +use serde::de::{ + MapAccess, {self}, +}; use serde::ser::SerializeStruct; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use serde_json::{Map as JsonMap, Number, Value as JsonValue}; use uuid::Uuid; -pub use _serde::RawLiteral; - +use super::datatypes::{PrimitiveType, Type}; use crate::error::Result; use crate::spec::values::date::{date_from_naive_date, days_to_date, unix_epoch}; use crate::spec::values::time::microseconds_to_time; @@ -47,8 +49,6 @@ use crate::spec::values::timestamptz::microseconds_to_datetimetz; use crate::spec::MAX_DECIMAL_PRECISION; use crate::{ensure_data_valid, Error, ErrorKind}; -use super::datatypes::{PrimitiveType, Type}; - /// Maximum value for [`PrimitiveType::Time`] type in microseconds, e.g. 23 hours 59 minutes 59 seconds 999999 microseconds. const MAX_TIME_VALUE: i64 = 24 * 60 * 60 * 1_000_000i64 - 1; @@ -154,9 +154,7 @@ impl<'de> Deserialize<'de> for Datum { } fn visit_seq(self, mut seq: A) -> std::result::Result - where - A: serde::de::SeqAccess<'de>, - { + where A: serde::de::SeqAccess<'de> { let r#type = seq .next_element::()? .ok_or_else(|| serde::de::Error::invalid_length(0, &self))?; @@ -175,9 +173,7 @@ impl<'de> Deserialize<'de> for Datum { } fn visit_map(self, mut map: V) -> std::result::Result - where - V: MapAccess<'de>, - { + where V: MapAccess<'de> { let mut raw_primitive: Option = None; let mut r#type: Option = None; while let Some(key) = map.next_key()? { @@ -1956,8 +1952,7 @@ mod timestamp { } mod timestamptz { - use chrono::DateTime; - use chrono::Utc; + use chrono::{DateTime, Utc}; pub(crate) fn datetimetz_to_microseconds(time: &DateTime) -> i64 { time.timestamp_micros() @@ -1971,21 +1966,15 @@ mod timestamptz { } mod _serde { - use serde::{ - de::Visitor, - ser::{SerializeMap, SerializeSeq, SerializeStruct}, - Deserialize, Serialize, - }; + use serde::de::Visitor; + use serde::ser::{SerializeMap, SerializeSeq, SerializeStruct}; + use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; - use serde_derive::Deserialize as DeserializeDerive; - use serde_derive::Serialize as SerializeDerive; - - use crate::{ - spec::{PrimitiveType, Type, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME}, - Error, ErrorKind, - }; + use serde_derive::{Deserialize as DeserializeDerive, Serialize as SerializeDerive}; use super::{Literal, Map, PrimitiveLiteral}; + use crate::spec::{PrimitiveType, Type, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME}; + use crate::{Error, ErrorKind}; #[derive(SerializeDerive, DeserializeDerive, Debug)] #[serde(transparent)] @@ -2028,9 +2017,7 @@ mod _serde { impl Serialize for Record { fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { + where S: serde::Serializer { let len = self.required.len() + self.optional.len(); let mut record = serializer.serialize_struct("", len)?; for (k, v) in &self.required { @@ -2051,9 +2038,7 @@ mod _serde { impl Serialize for List { fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { + where S: serde::Serializer { let mut seq = serializer.serialize_seq(Some(self.list.len()))?; for value in &self.list { if self.required { @@ -2078,9 +2063,7 @@ mod _serde { impl Serialize for StringMap { fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { + where S: serde::Serializer { let mut map = serializer.serialize_map(Some(self.raw.len()))?; for (k, v) in &self.raw { if self.required { @@ -2102,9 +2085,7 @@ mod _serde { impl<'de> Deserialize<'de> for RawLiteralEnum { fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { + where D: serde::Deserializer<'de> { struct RawLiteralVisitor; impl<'de> Visitor<'de> for RawLiteralVisitor { type Value = RawLiteralEnum; @@ -2114,80 +2095,58 @@ mod _serde { } fn visit_bool(self, v: bool) -> Result - where - E: serde::de::Error, - { + where E: serde::de::Error { Ok(RawLiteralEnum::Boolean(v)) } fn visit_i32(self, v: i32) -> Result - where - E: serde::de::Error, - { + where E: serde::de::Error { Ok(RawLiteralEnum::Int(v)) } fn visit_i64(self, v: i64) -> Result - where - E: serde::de::Error, - { + where E: serde::de::Error { Ok(RawLiteralEnum::Long(v)) } /// Used in json fn visit_u64(self, v: u64) -> Result - where - E: serde::de::Error, - { + where E: serde::de::Error { Ok(RawLiteralEnum::Long(v as i64)) } fn visit_f32(self, v: f32) -> Result - where - E: serde::de::Error, - { + where E: serde::de::Error { Ok(RawLiteralEnum::Float(v)) } fn visit_f64(self, v: f64) -> Result - where - E: serde::de::Error, - { + where E: serde::de::Error { Ok(RawLiteralEnum::Double(v)) } fn visit_str(self, v: &str) -> Result - where - E: serde::de::Error, - { + where E: serde::de::Error { Ok(RawLiteralEnum::String(v.to_string())) } fn visit_bytes(self, v: &[u8]) -> Result - where - E: serde::de::Error, - { + where E: serde::de::Error { Ok(RawLiteralEnum::Bytes(ByteBuf::from(v))) } fn visit_borrowed_str(self, v: &'de str) -> Result - where - E: serde::de::Error, - { + where E: serde::de::Error { Ok(RawLiteralEnum::String(v.to_string())) } fn visit_unit(self) -> Result - where - E: serde::de::Error, - { + where E: serde::de::Error { Ok(RawLiteralEnum::Null) } fn visit_map(self, mut map: A) -> Result - where - A: serde::de::MapAccess<'de>, - { + where A: serde::de::MapAccess<'de> { let mut required = Vec::new(); while let Some(key) = map.next_key::()? { let value = map.next_value::()?; @@ -2200,9 +2159,7 @@ mod _serde { } fn visit_seq(self, mut seq: A) -> Result - where - A: serde::de::SeqAccess<'de>, - { + where A: serde::de::SeqAccess<'de> { let mut list = Vec::new(); while let Some(value) = seq.next_element::()? { list.push(Some(value)); @@ -2650,17 +2607,13 @@ mod _serde { #[cfg(test)] mod tests { - use apache_avro::{to_value, types::Value}; - - use crate::{ - avro::schema_to_avro_schema, - spec::{ - datatypes::{ListType, MapType, NestedField, StructType}, - Schema, - }, - }; + use apache_avro::to_value; + use apache_avro::types::Value; use super::*; + use crate::avro::schema_to_avro_schema; + use crate::spec::datatypes::{ListType, MapType, NestedField, StructType}; + use crate::spec::Schema; fn check_json_serde(json: &str, expected_literal: Literal, expected_type: &Type) { let raw_json_value = serde_json::from_str::(json).unwrap(); diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index e804878ce..b9a701193 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -16,13 +16,13 @@ // under the License. //! Table API for Apache Iceberg +use typed_builder::TypedBuilder; + use crate::arrow::ArrowReaderBuilder; use crate::io::FileIO; use crate::scan::TableScanBuilder; use crate::spec::{TableMetadata, TableMetadataRef}; -use crate::Result; -use crate::TableIdent; -use typed_builder::TypedBuilder; +use crate::{Result, TableIdent}; /// Table represents a table in the catalog. #[derive(TypedBuilder, Debug, Clone)] diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 165fb8950..966a021fb 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -17,14 +17,15 @@ //! This module contains transaction api. +use std::cmp::Ordering; +use std::collections::HashMap; +use std::mem::discriminant; + use crate::error::Result; use crate::spec::{FormatVersion, NullOrder, SortDirection, SortField, SortOrder, Transform}; use crate::table::Table; use crate::TableUpdate::UpgradeFormatVersion; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; -use std::cmp::Ordering; -use std::collections::HashMap; -use std::mem::discriminant; /// Table transaction. pub struct Transaction<'a> { @@ -207,14 +208,15 @@ impl<'a> ReplaceSortOrderAction<'a> { #[cfg(test)] mod tests { + use std::collections::HashMap; + use std::fs::File; + use std::io::BufReader; + use crate::io::FileIO; use crate::spec::{FormatVersion, TableMetadata}; use crate::table::Table; use crate::transaction::Transaction; use crate::{TableIdent, TableRequirement, TableUpdate}; - use std::collections::HashMap; - use std::fs::File; - use std::io::BufReader; fn make_v1_table() -> Table { let file = File::open(format!( diff --git a/crates/iceberg/src/transform/bucket.rs b/crates/iceberg/src/transform/bucket.rs index d454c697f..c67051691 100644 --- a/crates/iceberg/src/transform/bucket.rs +++ b/crates/iceberg/src/transform/bucket.rs @@ -20,9 +20,8 @@ use std::sync::Arc; use arrow_array::ArrayRef; use arrow_schema::{DataType, TimeUnit}; -use crate::spec::{Datum, PrimitiveLiteral}; - use super::TransformFunction; +use crate::spec::{Datum, PrimitiveLiteral}; #[derive(Debug)] pub struct Bucket { @@ -251,23 +250,17 @@ impl TransformFunction for Bucket { mod test { use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime}; + use super::Bucket; + use crate::expr::PredicateOperator; use crate::spec::PrimitiveType::{ Binary, Date, Decimal, Fixed, Int, Long, String as StringType, Time, Timestamp, Timestamptz, Uuid, }; - use crate::spec::StructType; use crate::spec::Type::{Primitive, Struct}; - use crate::{ - expr::PredicateOperator, - spec::{Datum, NestedField, PrimitiveType, Transform, Type}, - transform::{ - test::{TestProjectionFixture, TestTransformFixture}, - TransformFunction, - }, - Result, - }; - - use super::Bucket; + use crate::spec::{Datum, NestedField, PrimitiveType, StructType, Transform, Type}; + use crate::transform::test::{TestProjectionFixture, TestTransformFixture}; + use crate::transform::TransformFunction; + use crate::Result; #[test] fn test_bucket_transform() { @@ -359,18 +352,18 @@ mod test { )?; fixture.assert_projection( - &fixture.set_predicate( - PredicateOperator::In, - vec![Datum::uuid(value), Datum::uuid(another)], - ), + &fixture.set_predicate(PredicateOperator::In, vec![ + Datum::uuid(value), + Datum::uuid(another), + ]), Some("name IN (4, 6)"), )?; fixture.assert_projection( - &fixture.set_predicate( - PredicateOperator::NotIn, - vec![Datum::uuid(value), Datum::uuid(another)], - ), + &fixture.set_predicate(PredicateOperator::NotIn, vec![ + Datum::uuid(value), + Datum::uuid(another), + ]), None, )?; @@ -426,18 +419,18 @@ mod test { )?; fixture.assert_projection( - &fixture.set_predicate( - PredicateOperator::In, - vec![Datum::fixed(value.clone()), Datum::fixed(another.clone())], - ), + &fixture.set_predicate(PredicateOperator::In, vec![ + Datum::fixed(value.clone()), + Datum::fixed(another.clone()), + ]), Some("name IN (4, 6)"), )?; fixture.assert_projection( - &fixture.set_predicate( - PredicateOperator::NotIn, - vec![Datum::fixed(value.clone()), Datum::fixed(another.clone())], - ), + &fixture.set_predicate(PredicateOperator::NotIn, vec![ + Datum::fixed(value.clone()), + Datum::fixed(another.clone()), + ]), None, )?; @@ -486,18 +479,18 @@ mod test { )?; fixture.assert_projection( - &fixture.set_predicate( - PredicateOperator::In, - vec![Datum::string(value), Datum::string(another)], - ), + &fixture.set_predicate(PredicateOperator::In, vec![ + Datum::string(value), + Datum::string(another), + ]), Some("name IN (9, 4)"), )?; fixture.assert_projection( - &fixture.set_predicate( - PredicateOperator::NotIn, - vec![Datum::string(value), Datum::string(another)], - ), + &fixture.set_predicate(PredicateOperator::NotIn, vec![ + Datum::string(value), + Datum::string(another), + ]), None, )?; @@ -563,25 +556,19 @@ mod test { )?; fixture.assert_projection( - &fixture.set_predicate( - PredicateOperator::In, - vec![ - Datum::decimal_from_str(next)?, - Datum::decimal_from_str(curr)?, - Datum::decimal_from_str(prev)?, - ], - ), + &fixture.set_predicate(PredicateOperator::In, vec![ + Datum::decimal_from_str(next)?, + Datum::decimal_from_str(curr)?, + Datum::decimal_from_str(prev)?, + ]), Some("name IN (6, 2)"), )?; fixture.assert_projection( - &fixture.set_predicate( - PredicateOperator::NotIn, - vec![ - Datum::decimal_from_str(curr)?, - Datum::decimal_from_str(next)?, - ], - ), + &fixture.set_predicate(PredicateOperator::NotIn, vec![ + Datum::decimal_from_str(curr)?, + Datum::decimal_from_str(next)?, + ]), None, )?; @@ -628,22 +615,19 @@ mod test { )?; fixture.assert_projection( - &fixture.set_predicate( - PredicateOperator::In, - vec![ - Datum::long(value - 1), - Datum::long(value), - Datum::long(value + 1), - ], - ), + &fixture.set_predicate(PredicateOperator::In, vec![ + Datum::long(value - 1), + Datum::long(value), + Datum::long(value + 1), + ]), Some("name IN (8, 7, 6)"), )?; fixture.assert_projection( - &fixture.set_predicate( - PredicateOperator::NotIn, - vec![Datum::long(value), Datum::long(value + 1)], - ), + &fixture.set_predicate(PredicateOperator::NotIn, vec![ + Datum::long(value), + Datum::long(value + 1), + ]), None, )?; @@ -691,22 +675,19 @@ mod test { )?; fixture.assert_projection( - &fixture.set_predicate( - PredicateOperator::In, - vec![ - Datum::int(value - 1), - Datum::int(value), - Datum::int(value + 1), - ], - ), + &fixture.set_predicate(PredicateOperator::In, vec![ + Datum::int(value - 1), + Datum::int(value), + Datum::int(value + 1), + ]), Some("name IN (8, 7, 6)"), )?; fixture.assert_projection( - &fixture.set_predicate( - PredicateOperator::NotIn, - vec![Datum::int(value), Datum::int(value + 1)], - ), + &fixture.set_predicate(PredicateOperator::NotIn, vec![ + Datum::int(value), + Datum::int(value + 1), + ]), None, )?; diff --git a/crates/iceberg/src/transform/identity.rs b/crates/iceberg/src/transform/identity.rs index 0f6f234c8..e23ccffa9 100644 --- a/crates/iceberg/src/transform/identity.rs +++ b/crates/iceberg/src/transform/identity.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::Result; use arrow_array::ArrayRef; use super::TransformFunction; +use crate::Result; /// Return identity array. #[derive(Debug)] @@ -40,12 +40,10 @@ mod test { Binary, Date, Decimal, Fixed, Int, Long, String as StringType, Time, Timestamp, Timestamptz, Uuid, }; - use crate::spec::StructType; use crate::spec::Type::{Primitive, Struct}; + use crate::spec::{NestedField, StructType, Transform}; use crate::transform::test::TestTransformFixture; - use crate::spec::{NestedField, Transform}; - #[test] fn test_identity_transform() { let trans = Transform::Identity; diff --git a/crates/iceberg/src/transform/mod.rs b/crates/iceberg/src/transform/mod.rs index cb221a29f..72b179754 100644 --- a/crates/iceberg/src/transform/mod.rs +++ b/crates/iceberg/src/transform/mod.rs @@ -17,12 +17,11 @@ //! Transform function used to compute partition values. -use crate::{ - spec::{Datum, Transform}, - Error, ErrorKind, Result, -}; use arrow_array::ArrayRef; +use crate::spec::{Datum, Transform}; +use crate::{Error, ErrorKind, Result}; + mod bucket; mod identity; mod temporal; @@ -72,16 +71,15 @@ pub fn create_transform_function(transform: &Transform) -> Result { @@ -110,25 +110,19 @@ impl CurrentFileStatus for DataFileWriter { mod test { use std::sync::Arc; - use crate::{ - spec::{DataContentType, Schema, Struct}, - Result, - }; use parquet::file::properties::WriterProperties; use tempfile::TempDir; - use crate::{ - io::FileIOBuilder, - spec::DataFileFormat, - writer::{ - base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig}, - file_writer::{ - location_generator::{test::MockLocationGenerator, DefaultFileNameGenerator}, - ParquetWriterBuilder, - }, - IcebergWriter, IcebergWriterBuilder, - }, + use crate::io::FileIOBuilder; + use crate::spec::{DataContentType, DataFileFormat, Schema, Struct}; + use crate::writer::base_writer::data_file_writer::{ + DataFileWriterBuilder, DataFileWriterConfig, }; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::{IcebergWriter, IcebergWriterBuilder}; + use crate::Result; #[tokio::test] async fn test_parquet_writer() -> Result<()> { diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs b/crates/iceberg/src/writer/file_writer/location_generator.rs index a86f53ad1..44326190d 100644 --- a/crates/iceberg/src/writer/file_writer/location_generator.rs +++ b/crates/iceberg/src/writer/file_writer/location_generator.rs @@ -17,12 +17,11 @@ //! This module contains the location generator and file name generator for generating path of data file. -use std::sync::{atomic::AtomicU64, Arc}; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; -use crate::{ - spec::{DataFileFormat, TableMetadata}, - Error, ErrorKind, Result, -}; +use crate::spec::{DataFileFormat, TableMetadata}; +use crate::{Error, ErrorKind, Result}; /// `LocationGenerator` used to generate the location of data file. pub trait LocationGenerator: Clone + Send + 'static { @@ -132,14 +131,11 @@ pub(crate) mod test { use uuid::Uuid; - use crate::{ - spec::{FormatVersion, TableMetadata}, - writer::file_writer::location_generator::{ - FileNameGenerator, WRITE_DATA_LOCATION, WRITE_FOLDER_STORAGE_LOCATION, - }, - }; - use super::LocationGenerator; + use crate::spec::{FormatVersion, TableMetadata}; + use crate::writer::file_writer::location_generator::{ + FileNameGenerator, WRITE_DATA_LOCATION, WRITE_FOLDER_STORAGE_LOCATION, + }; #[derive(Clone)] pub(crate) struct MockLocationGenerator { diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index 0340df681..4a0fffcc1 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -17,11 +17,13 @@ //! This module contains the writer for data file format supported by iceberg: parquet, orc. -use super::CurrentFileStatus; -use crate::{spec::DataFileBuilder, Result}; use arrow_array::RecordBatch; use futures::Future; +use super::CurrentFileStatus; +use crate::spec::DataFileBuilder; +use crate::Result; + mod parquet_writer; pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder}; mod track_writer; diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 5f50e417d..ef21f9d33 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -17,47 +17,37 @@ //! The module contains the file writer for parquet file format. -use super::{ - location_generator::{FileNameGenerator, LocationGenerator}, - track_writer::TrackWriter, - FileWriter, FileWriterBuilder, -}; -use crate::arrow::DEFAULT_MAP_FIELD_NAME; -use crate::spec::{ - visit_schema, Datum, ListType, MapType, NestedFieldRef, PrimitiveLiteral, PrimitiveType, - Schema, SchemaRef, SchemaVisitor, StructType, Type, -}; -use crate::ErrorKind; -use crate::{io::FileIO, io::FileWrite, Result}; -use crate::{ - io::OutputFile, - spec::{DataFileBuilder, DataFileFormat}, - writer::CurrentFileStatus, - Error, -}; +use std::collections::HashMap; +use std::sync::atomic::AtomicI64; +use std::sync::Arc; + use arrow_schema::SchemaRef as ArrowSchemaRef; use bytes::Bytes; use futures::future::BoxFuture; use itertools::Itertools; +use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter; +use parquet::arrow::AsyncArrowWriter; use parquet::data_type::{ - BoolType, ByteArrayType, DataType as ParquetDataType, DoubleType, FixedLenByteArrayType, - FloatType, Int32Type, Int64Type, + BoolType, ByteArray, ByteArrayType, DataType as ParquetDataType, DoubleType, FixedLenByteArray, + FixedLenByteArrayType, FloatType, Int32Type, Int64Type, }; use parquet::file::properties::WriterProperties; -use parquet::file::statistics::TypedStatistics; -use parquet::{ - arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter, arrow::AsyncArrowWriter, - format::FileMetaData, -}; -use parquet::{ - data_type::{ByteArray, FixedLenByteArray}, - file::statistics::{from_thrift, Statistics}, -}; -use std::collections::HashMap; -use std::sync::atomic::AtomicI64; -use std::sync::Arc; +use parquet::file::statistics::{from_thrift, Statistics, TypedStatistics}; +use parquet::format::FileMetaData; use uuid::Uuid; +use super::location_generator::{FileNameGenerator, LocationGenerator}; +use super::track_writer::TrackWriter; +use super::{FileWriter, FileWriterBuilder}; +use crate::arrow::DEFAULT_MAP_FIELD_NAME; +use crate::io::{FileIO, FileWrite, OutputFile}; +use crate::spec::{ + visit_schema, DataFileBuilder, DataFileFormat, Datum, ListType, MapType, NestedFieldRef, + PrimitiveLiteral, PrimitiveType, Schema, SchemaRef, SchemaVisitor, StructType, Type, +}; +use crate::writer::CurrentFileStatus; +use crate::{Error, ErrorKind, Result}; + /// ParquetWriterBuilder is used to builder a [`ParquetWriter`] #[derive(Clone)] pub struct ParquetWriterBuilder { @@ -606,23 +596,17 @@ mod tests { use anyhow::Result; use arrow_array::types::Int64Type; - use arrow_array::ArrayRef; - use arrow_array::BooleanArray; - use arrow_array::Int32Array; - use arrow_array::Int64Array; - use arrow_array::ListArray; - use arrow_array::RecordBatch; - use arrow_array::StructArray; - use arrow_schema::DataType; - use arrow_schema::SchemaRef as ArrowSchemaRef; + use arrow_array::{ + ArrayRef, BooleanArray, Int32Array, Int64Array, ListArray, RecordBatch, StructArray, + }; + use arrow_schema::{DataType, SchemaRef as ArrowSchemaRef}; use arrow_select::concat::concat_batches; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use tempfile::TempDir; use super::*; use crate::io::FileIOBuilder; - use crate::spec::*; - use crate::spec::{PrimitiveLiteral, Struct}; + use crate::spec::{PrimitiveLiteral, Struct, *}; use crate::writer::file_writer::location_generator::test::MockLocationGenerator; use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; use crate::writer::tests::check_parquet_data_file; @@ -966,10 +950,9 @@ mod tests { ordered, ) }) as ArrayRef; - let to_write = RecordBatch::try_new( - arrow_schema.clone(), - vec![col0, col1, col2, col3, col4, col5], - ) + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![ + col0, col1, col2, col3, col4, col5, + ]) .unwrap(); // write data @@ -1141,13 +1124,9 @@ mod tests { ) .unwrap(), ) as ArrayRef; - let to_write = RecordBatch::try_new( - arrow_schema.clone(), - vec![ - col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, - col13, - ], - ) + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![ + col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13, + ]) .unwrap(); // write data diff --git a/crates/iceberg/src/writer/file_writer/track_writer.rs b/crates/iceberg/src/writer/file_writer/track_writer.rs index 8d0e490d4..6c60a1aa7 100644 --- a/crates/iceberg/src/writer/file_writer/track_writer.rs +++ b/crates/iceberg/src/writer/file_writer/track_writer.rs @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. +use std::sync::atomic::AtomicI64; +use std::sync::Arc; + use bytes::Bytes; -use std::sync::{atomic::AtomicI64, Arc}; use crate::io::FileWrite; use crate::Result; diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 06b763d6e..6cb9aaee6 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -48,9 +48,11 @@ pub mod base_writer; pub mod file_writer; -use crate::{spec::DataFile, Result}; use arrow_array::RecordBatch; +use crate::spec::DataFile; +use crate::Result; + type DefaultInput = RecordBatch; type DefaultOutput = Vec; @@ -97,12 +99,9 @@ mod tests { use arrow_select::concat::concat_batches; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; - use crate::{ - io::FileIO, - spec::{DataFile, DataFileFormat}, - }; - use super::IcebergWriter; + use crate::io::FileIO; + use crate::spec::{DataFile, DataFileFormat}; // This function is used to guarantee the trait can be used as a object safe trait. async fn _guarantee_object_safe(mut w: Box) { diff --git a/crates/iceberg/tests/file_io_s3_test.rs b/crates/iceberg/tests/file_io_s3_test.rs index 6d62a0416..efa9128a3 100644 --- a/crates/iceberg/tests/file_io_s3_test.rs +++ b/crates/iceberg/tests/file_io_s3_test.rs @@ -17,13 +17,14 @@ //! Integration tests for FileIO S3. +use std::sync::RwLock; + use ctor::{ctor, dtor}; use iceberg::io::{ FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, }; use iceberg_test_utils::docker::DockerCompose; use iceberg_test_utils::{normalize_test_name, set_up}; -use std::sync::RwLock; static DOCKER_COMPOSE_ENV: RwLock> = RwLock::new(None); diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index deddde9fd..44ede99c3 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -15,9 +15,12 @@ // specific language governing permissions and limitations // under the License. -use std::{any::Any, collections::HashMap, sync::Arc}; +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; -use datafusion::catalog::{schema::SchemaProvider, CatalogProvider}; +use datafusion::catalog::schema::SchemaProvider; +use datafusion::catalog::CatalogProvider; use futures::future::try_join_all; use iceberg::{Catalog, NamespaceIdent, Result}; diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 8a74caa6a..aca40c4b1 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -15,18 +15,18 @@ // specific language governing permissions and limitations // under the License. -use std::{any::Any, pin::Pin, sync::Arc}; +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use datafusion::arrow::array::RecordBatch; +use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; use datafusion::error::Result as DFResult; - -use datafusion::{ - arrow::{array::RecordBatch, datatypes::SchemaRef as ArrowSchemaRef}, - execution::{SendableRecordBatchStream, TaskContext}, - physical_expr::EquivalenceProperties, - physical_plan::{ - stream::RecordBatchStreamAdapter, DisplayAs, ExecutionMode, ExecutionPlan, Partitioning, - PlanProperties, - }, +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + DisplayAs, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, }; use futures::{Stream, TryStreamExt}; use iceberg::table::Table; diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index f7b1a21d2..0d7fcbf59 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -15,11 +15,14 @@ // specific language governing permissions and limitations // under the License. -use std::{any::Any, collections::HashMap, sync::Arc}; +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; use async_trait::async_trait; +use datafusion::catalog::schema::SchemaProvider; +use datafusion::datasource::TableProvider; use datafusion::error::Result as DFResult; -use datafusion::{catalog::schema::SchemaProvider, datasource::TableProvider}; use futures::future::try_join_all; use iceberg::{Catalog, NamespaceIdent, Result}; diff --git a/crates/integrations/datafusion/src/table.rs b/crates/integrations/datafusion/src/table.rs index 46a15f67a..da38ccc6f 100644 --- a/crates/integrations/datafusion/src/table.rs +++ b/crates/integrations/datafusion/src/table.rs @@ -15,20 +15,19 @@ // specific language governing permissions and limitations // under the License. -use std::{any::Any, sync::Arc}; +use std::any::Any; +use std::sync::Arc; use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result as DFResult; -use datafusion::{ - arrow::datatypes::SchemaRef as ArrowSchemaRef, - datasource::{TableProvider, TableType}, - execution::context, - logical_expr::Expr, - physical_plan::ExecutionPlan, -}; -use iceberg::{ - arrow::schema_to_arrow_schema, table::Table, Catalog, NamespaceIdent, Result, TableIdent, -}; +use datafusion::execution::context; +use datafusion::logical_expr::Expr; +use datafusion::physical_plan::ExecutionPlan; +use iceberg::arrow::schema_to_arrow_schema; +use iceberg::table::Table; +use iceberg::{Catalog, NamespaceIdent, Result, TableIdent}; use crate::physical_plan::scan::IcebergTableScan; diff --git a/crates/test_utils/src/docker.rs b/crates/test_utils/src/docker.rs index 3247fc96e..2902c49ed 100644 --- a/crates/test_utils/src/docker.rs +++ b/crates/test_utils/src/docker.rs @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::cmd::{get_cmd_output, run_command}; use std::process::Command; +use crate::cmd::{get_cmd_output, run_command}; + /// A utility to manage the lifecycle of `docker compose`. /// /// It will start `docker compose` when calling the `run` method and will be stopped via [`Drop`]. diff --git a/rustfmt.toml b/rustfmt.toml index 39be343c6..49be5742b 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -17,3 +17,10 @@ edition = "2021" reorder_imports = true + +format_code_in_doc_comments = true +group_imports = "StdExternalCrate" +imports_granularity = "Module" +overflow_delimited_expr = true +trailing_comma = "Vertical" +where_single_line = true \ No newline at end of file