diff --git a/crates/sparrow-execution/src/lib.rs b/crates/sparrow-execution/src/lib.rs index 1c1ac8c63..2800b7afb 100644 --- a/crates/sparrow-execution/src/lib.rs +++ b/crates/sparrow-execution/src/lib.rs @@ -13,7 +13,8 @@ use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; use hashbrown::HashMap; -use sparrow_interfaces::{ExecutionOptions, Source}; +use sparrow_interfaces::source::Source; +use sparrow_interfaces::ExecutionOptions; use sparrow_physical::StepId; use sparrow_transforms::TransformPipeline; use std::sync::Arc; diff --git a/crates/sparrow-execution/src/source_tasks.rs b/crates/sparrow-execution/src/source_tasks.rs index 1bc8fdb87..2f6aa0d18 100644 --- a/crates/sparrow-execution/src/source_tasks.rs +++ b/crates/sparrow-execution/src/source_tasks.rs @@ -2,7 +2,7 @@ use error_stack::ResultExt; use futures::stream::{BoxStream, FuturesUnordered}; use futures::{StreamExt, TryStreamExt}; use sparrow_batch::Batch; -use sparrow_interfaces::SourceError; +use sparrow_interfaces::source::SourceError; use sparrow_scheduler::{Injector, InputHandles}; use tracing::Instrument; use uuid::Uuid; diff --git a/crates/sparrow-execution/src/tests.rs b/crates/sparrow-execution/src/tests.rs index 853c58e32..81763ddba 100644 --- a/crates/sparrow-execution/src/tests.rs +++ b/crates/sparrow-execution/src/tests.rs @@ -1,6 +1,6 @@ use arrow_array::cast::AsArray; use arrow_array::{Int64Array, RecordBatch, TimestampNanosecondArray, UInt64Array}; -use sparrow_interfaces::{Source, SourceExt}; +use sparrow_interfaces::source::{Source, SourceExt}; use sparrow_logical::ExprRef; use sparrow_session::partitioned::Session; use sparrow_sources::InMemory; diff --git a/crates/sparrow-interfaces/src/lib.rs b/crates/sparrow-interfaces/src/lib.rs index a2afbd79f..02e4eb741 100644 --- a/crates/sparrow-interfaces/src/lib.rs +++ b/crates/sparrow-interfaces/src/lib.rs @@ -8,9 +8,6 @@ )] mod execution_options; -mod source; -mod source_error; +pub mod source; pub use execution_options::*; -pub use source::*; -pub use source_error::SourceError; diff --git a/crates/sparrow-interfaces/src/source.rs b/crates/sparrow-interfaces/src/source.rs index bbb3a2bb1..5df71cbe7 100644 --- a/crates/sparrow-interfaces/src/source.rs +++ b/crates/sparrow-interfaces/src/source.rs @@ -4,7 +4,7 @@ use arrow_schema::{DataType, SchemaRef}; use futures::stream::BoxStream; use sparrow_batch::Batch; -use crate::{ExecutionOptions, SourceError}; +use crate::ExecutionOptions; /// Trait implemented by sources. pub trait Source: Send + Sync { @@ -30,6 +30,29 @@ pub trait Source: Send + Sync { fn as_any(&self) -> &dyn std::any::Any; } +#[non_exhaustive] +#[derive(derive_more::Display, Debug)] +pub enum SourceError { + #[display(fmt = "internal error: {}", _0)] + Internal(&'static str), + #[display(fmt = "failed to add in-memory batch")] + Add, + #[display(fmt = "receiver lagged")] + ReceiverLagged, +} + +impl error_stack::Context for SourceError {} + +impl SourceError { + pub fn internal() -> Self { + SourceError::Internal("no additional context") + } + + pub fn internal_msg(msg: &'static str) -> Self { + SourceError::Internal(msg) + } +} + pub trait SourceExt { fn downcast_source_opt(&self) -> Option<&T>; fn downcast_source(&self) -> &T { diff --git a/crates/sparrow-interfaces/src/source_error.rs b/crates/sparrow-interfaces/src/source_error.rs deleted file mode 100644 index 815087a6f..000000000 --- a/crates/sparrow-interfaces/src/source_error.rs +++ /dev/null @@ -1,22 +0,0 @@ -#[non_exhaustive] -#[derive(derive_more::Display, Debug)] -pub enum SourceError { - #[display(fmt = "internal error: {}", _0)] - Internal(&'static str), - #[display(fmt = "failed to add in-memory batch")] - Add, - #[display(fmt = "receiver lagged")] - ReceiverLagged, -} - -impl error_stack::Context for SourceError {} - -impl SourceError { - pub fn internal() -> Self { - SourceError::Internal("no additional context") - } - - pub fn internal_msg(msg: &'static str) -> Self { - SourceError::Internal(msg) - } -} diff --git a/crates/sparrow-session/src/partitioned/session.rs b/crates/sparrow-session/src/partitioned/session.rs index ee77f44e4..129f4de82 100644 --- a/crates/sparrow-session/src/partitioned/session.rs +++ b/crates/sparrow-session/src/partitioned/session.rs @@ -5,7 +5,8 @@ use error_stack::ResultExt; use futures::FutureExt; use hashbrown::HashMap; use sparrow_compiler::NearestMatches; -use sparrow_interfaces::{ExecutionOptions, Source}; +use sparrow_interfaces::source::Source; +use sparrow_interfaces::ExecutionOptions; use sparrow_logical::{ExprRef, Literal}; use uuid::Uuid; diff --git a/crates/sparrow-sources/src/in_memory.rs b/crates/sparrow-sources/src/in_memory.rs index 6c6d40d14..e37cd6106 100644 --- a/crates/sparrow-sources/src/in_memory.rs +++ b/crates/sparrow-sources/src/in_memory.rs @@ -6,11 +6,10 @@ use error_stack::{IntoReportCompat, ResultExt}; use futures::{Stream, StreamExt, TryStreamExt}; use sparrow_batch::Batch; -use sparrow_interfaces::{ExecutionOptions, Source}; +use sparrow_interfaces::source::{Source, SourceError}; +use sparrow_interfaces::ExecutionOptions; use sparrow_merge::old::homogeneous_merge; -use sparrow_interfaces::SourceError; - /// A shared, synchronized container for in-memory batches. pub struct InMemory { /// The prepared schema.