Skip to content

Commit

Permalink
Remove Publish Time from non-prepare Pulsar Paths (#426)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinjnguyen authored Jun 9, 2023
2 parents e16e646 + 8bb3853 commit a8826e4
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 28 deletions.
45 changes: 30 additions & 15 deletions crates/sparrow-runtime/src/metadata/raw_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,17 @@ impl RawMetadata {
}
source_data::Source::PulsarSubscription(ps) => {
let config = ps.config.as_ref().ok_or(Error::PulsarSubscription)?;
Ok(Self::try_from_pulsar(config).await?.sparrow_metadata)
// The `_publish_time` is metadata on the pulsar message, and required
// by the `prepare` step. However, that is not part of the user's schema.
// The prepare path calls `try_from_pulsar` directly, so for all other cases
// we explicitly set the schema to not include the `_publish_time` column.
//
// The "prepare from pulsar" step is an experimental feature, and will
// likely change in the future, so we're okay with this hack for now.
let should_include_publish_time = false;
Ok(Self::try_from_pulsar(config, should_include_publish_time)
.await?
.sparrow_metadata)
}
}
}
Expand Down Expand Up @@ -171,6 +181,7 @@ impl RawMetadata {
/// Create a `RawMetadata` from a Pulsar topic.
pub(crate) async fn try_from_pulsar(
config: &PulsarConfig,
should_include_publish_time: bool,
) -> error_stack::Result<PulsarMetadata, Error> {
// the user-defined schema in the topic
let pulsar_schema = streams::pulsar::schema::get_pulsar_schema(
Expand All @@ -183,21 +194,25 @@ impl RawMetadata {
.await
.change_context_lazy(|| Error::PulsarSchema("unable to get schema".to_owned()))?;

// inject _publish_time field so that we have a consistent column to sort on
// (this will always be our time_column in Pulsar sources)
let publish_time = Arc::new(Field::new(
"_publish_time",
TimestampMillisecondType::DATA_TYPE,
false,
));
let new_fields: Vec<_> = pulsar_schema
.fields
.iter()
.cloned()
.chain(std::iter::once(publish_time))
.collect();
tracing::debug!("pulsar schema fields: {:?}", new_fields);
let new_fields = if should_include_publish_time {
// inject _publish_time field so that we have a consistent column to sort on
// (this will always be our time_column in Pulsar sources)
let publish_time = Arc::new(Field::new(
"_publish_time",
TimestampMillisecondType::DATA_TYPE,
false,
));
pulsar_schema
.fields
.iter()
.cloned()
.chain(std::iter::once(publish_time))
.collect()
} else {
pulsar_schema.fields.clone()
};

tracing::debug!("pulsar schema fields: {:?}", new_fields);
Ok(PulsarMetadata {
user_schema: Arc::new(pulsar_schema),
sparrow_metadata: Self::from_raw_schema(Arc::new(Schema::new(new_fields)))?,
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-runtime/src/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn reader_from_pulsar<'a>(
) -> error_stack::Result<BoxStream<'a, error_stack::Result<(RecordBatch, RecordBatch), Error>>, Error>
{
let pulsar_config = pulsar_subscription.config.as_ref().ok_or(Error::Internal)?;
let pm = RawMetadata::try_from_pulsar(pulsar_config)
let pm = RawMetadata::try_from_pulsar(pulsar_config, true)
.await
.change_context(Error::CreatePulsarReader)?;

Expand Down
22 changes: 12 additions & 10 deletions crates/sparrow-runtime/src/read/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,32 +65,34 @@ pub(crate) async fn stream_reader(
subscription_id: pulsar_subscription,
last_publish_time: 0,
};
let raw_metadata = RawMetadata::try_from_pulsar(pulsar_config)
let pulsar_metadata = RawMetadata::try_from_pulsar(pulsar_config, false)
.await
.change_context(Error::CreateStream)?;

// Verify the provided table schema matches the topic schema
verify_schema_match(
raw_metadata.user_schema.clone(),
pulsar_metadata.user_schema.clone(),
table_info.schema().clone(),
)?;

// The projected schema should come from the table_schema, which includes converted
// timestamp column, dropped decimal columns, etc.
// i.e. any changes we make to the raw schema to be able to process rows.
let projected_schema = if let Some(columns) = &projected_columns {
projected_schema(raw_metadata.sparrow_metadata.table_schema, columns)
projected_schema(pulsar_metadata.sparrow_metadata.table_schema, columns)
.change_context(Error::CreateStream)?
} else {
raw_metadata.sparrow_metadata.table_schema
pulsar_metadata.sparrow_metadata.table_schema
};

let consumer =
streams::pulsar::stream::consumer(&pulsar_subscription, raw_metadata.user_schema.clone())
.await
.change_context(Error::CreateStream)?;
let consumer = streams::pulsar::stream::consumer(
&pulsar_subscription,
pulsar_metadata.user_schema.clone(),
)
.await
.change_context(Error::CreateStream)?;
let stream = streams::pulsar::stream::execution_stream(
raw_metadata.sparrow_metadata.raw_schema.clone(),
pulsar_metadata.sparrow_metadata.raw_schema.clone(),
projected_schema.clone(),
consumer,
pulsar_subscription.last_publish_time,
Expand All @@ -106,7 +108,7 @@ pub(crate) async fn stream_reader(
let mut input_stream = prepare::execute_input_stream::prepare_input(
stream.boxed(),
table_config,
raw_metadata.user_schema.clone(),
pulsar_metadata.user_schema.clone(),
projected_schema,
0,
requested_slice,
Expand Down
4 changes: 2 additions & 2 deletions crates/sparrow-runtime/src/streams/pulsar/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ impl PulsarReader {
.map_err(|e| ArrowError::from_external_error(Box::new(e)))?;
let batch = RecordBatch::try_new(self.raw_schema.clone(), arrow_data)?;

// Note that the _last_publish_time is dropped here. This field is added for the purposes of
// prepare, where the `time` column is automatically set to the `_last_publish_time`.
// Note that the _publish_time is dropped here. This field is added for the purposes of
// prepare, where the `time` column is automatically set to the `_publish_time`.
let columns_to_read = get_columns_to_read(&self.raw_schema, &self.projected_schema);
let columns: Vec<_> = columns_to_read
.iter()
Expand Down

0 comments on commit a8826e4

Please sign in to comment.