Skip to content

Commit

Permalink
refactor: rename "value" semantic type to "field" (#1326)
Browse files Browse the repository at this point in the history
* global replace

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change desc table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
waynexia authored Apr 4, 2023
1 parent a2d8804 commit 99353c6
Show file tree
Hide file tree
Showing 32 changed files with 200 additions and 200 deletions.
8 changes: 4 additions & 4 deletions src/promql/src/extension_plan/empty_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl EmptyMetric {
end: Millisecond,
interval: Millisecond,
time_index_column_name: String,
value_column_name: String,
field_column_name: String,
) -> DataFusionResult<Self> {
let schema = Arc::new(DFSchema::new_with_metadata(
vec![
Expand All @@ -61,7 +61,7 @@ impl EmptyMetric {
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
DFField::new(Some(""), &value_column_name, DataType::Float64, true),
DFField::new(Some(""), &field_column_name, DataType::Float64, true),
],
HashMap::new(),
)?);
Expand Down Expand Up @@ -253,11 +253,11 @@ mod test {
end: Millisecond,
interval: Millisecond,
time_column_name: String,
value_column_name: String,
field_column_name: String,
expected: String,
) {
let empty_metric =
EmptyMetric::new(start, end, interval, time_column_name, value_column_name).unwrap();
EmptyMetric::new(start, end, interval, time_column_name, field_column_name).unwrap();
let empty_metric_exec = empty_metric.to_execution_plan();

let session_context = SessionContext::default();
Expand Down
4 changes: 2 additions & 2 deletions src/promql/src/extension_plan/instant_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,11 @@ mod test {
180_000, 240_000, // every 60s
241_000, 271_000, 291_000, // others
])) as _;
let value_column = Arc::new(Float64Array::from_slice([1.0; 10])) as _;
let field_column = Arc::new(Float64Array::from_slice([1.0; 10])) as _;
let path_column = Arc::new(StringArray::from_slice(["foo"; 10])) as _;
let data = RecordBatch::try_new(
schema.clone(),
vec![timestamp_column, value_column, path_column],
vec![timestamp_column, field_column, path_column],
)
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions src/promql/src/extension_plan/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,12 @@ mod test {
let timestamp_column = Arc::new(TimestampMillisecondArray::from_slice([
60_000, 120_000, 0, 30_000, 90_000,
])) as _;
let value_column = Arc::new(Float64Array::from_slice([0.0, 1.0, 10.0, 100.0, 1000.0])) as _;
let field_column = Arc::new(Float64Array::from_slice([0.0, 1.0, 10.0, 100.0, 1000.0])) as _;
let path_column =
Arc::new(StringArray::from_slice(["foo", "foo", "foo", "foo", "foo"])) as _;
let data = RecordBatch::try_new(
schema.clone(),
vec![timestamp_column, value_column, path_column],
vec![timestamp_column, field_column, path_column],
)
.unwrap();

Expand Down
44 changes: 22 additions & 22 deletions src/promql/src/extension_plan/range_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub struct RangeManipulate {
range: Millisecond,

time_index: String,
value_columns: Vec<String>,
field_columns: Vec<String>,
input: LogicalPlan,
output_schema: DFSchemaRef,
}
Expand All @@ -68,18 +68,18 @@ impl RangeManipulate {
interval: Millisecond,
range: Millisecond,
time_index: String,
value_columns: Vec<String>,
field_columns: Vec<String>,
input: LogicalPlan,
) -> DataFusionResult<Self> {
let output_schema =
Self::calculate_output_schema(input.schema(), &time_index, &value_columns)?;
Self::calculate_output_schema(input.schema(), &time_index, &field_columns)?;
Ok(Self {
start,
end,
interval,
range,
time_index,
value_columns,
field_columns,
input,
output_schema,
})
Expand All @@ -100,7 +100,7 @@ impl RangeManipulate {
fn calculate_output_schema(
input_schema: &DFSchemaRef,
time_index: &str,
value_columns: &[String],
field_columns: &[String],
) -> DataFusionResult<DFSchemaRef> {
let mut columns = input_schema.fields().clone();

Expand All @@ -118,7 +118,7 @@ impl RangeManipulate {
)));

// process value columns
for name in value_columns {
for name in field_columns {
let Some(index) = input_schema.index_of_column_by_name(None, name)? else {
return Err(datafusion::common::field_not_found(None::<TableReference>, name, input_schema.as_ref()))
};
Expand All @@ -139,7 +139,7 @@ impl RangeManipulate {
range: self.range,
time_index_column: self.time_index.clone(),
time_range_column: self.range_timestamp_name(),
value_columns: self.value_columns.clone(),
field_columns: self.field_columns.clone(),
input: exec_input,
output_schema: SchemaRef::new(self.output_schema.as_ref().into()),
metric: ExecutionPlanMetricsSet::new(),
Expand Down Expand Up @@ -168,7 +168,7 @@ impl UserDefinedLogicalNodeCore for RangeManipulate {
write!(
f,
"PromRangeManipulate: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}], values={:?}",
self.start, self.end, self.interval, self.range, self.time_index, self.value_columns
self.start, self.end, self.interval, self.range, self.time_index, self.field_columns
)
}

Expand All @@ -181,7 +181,7 @@ impl UserDefinedLogicalNodeCore for RangeManipulate {
interval: self.interval,
range: self.range,
time_index: self.time_index.clone(),
value_columns: self.value_columns.clone(),
field_columns: self.field_columns.clone(),
input: inputs[0].clone(),
output_schema: self.output_schema.clone(),
}
Expand All @@ -196,7 +196,7 @@ pub struct RangeManipulateExec {
range: Millisecond,
time_index_column: String,
time_range_column: String,
value_columns: Vec<String>,
field_columns: Vec<String>,

input: Arc<dyn ExecutionPlan>,
output_schema: SchemaRef,
Expand Down Expand Up @@ -240,7 +240,7 @@ impl ExecutionPlan for RangeManipulateExec {
range: self.range,
time_index_column: self.time_index_column.clone(),
time_range_column: self.time_range_column.clone(),
value_columns: self.value_columns.clone(),
field_columns: self.field_columns.clone(),
output_schema: self.output_schema.clone(),
input: children[0].clone(),
metric: self.metric.clone(),
Expand All @@ -260,8 +260,8 @@ impl ExecutionPlan for RangeManipulateExec {
.column_with_name(&self.time_index_column)
.unwrap_or_else(|| panic!("time index column {} not found", self.time_index_column))
.0;
let value_columns = self
.value_columns
let field_columns = self
.field_columns
.iter()
.map(|value_col| {
schema
Expand All @@ -276,7 +276,7 @@ impl ExecutionPlan for RangeManipulateExec {
interval: self.interval,
range: self.range,
time_index,
value_columns,
field_columns,
output_schema: self.output_schema.clone(),
input,
metric: baseline_metric,
Expand Down Expand Up @@ -325,7 +325,7 @@ pub struct RangeManipulateStream {
interval: Millisecond,
range: Millisecond,
time_index: usize,
value_columns: Vec<usize>,
field_columns: Vec<usize>,

output_schema: SchemaRef,
input: SendableRecordBatchStream,
Expand Down Expand Up @@ -364,7 +364,7 @@ impl RangeManipulateStream {

// transform columns
let mut new_columns = input.columns().to_vec();
for index in self.value_columns.iter() {
for index in self.field_columns.iter() {
other_columns.remove(index);
let column = input.column(*index);
let new_column = Arc::new(
Expand Down Expand Up @@ -460,14 +460,14 @@ mod test {
180_000, 240_000, // every 60s
241_000, 271_000, 291_000, // others
])) as _;
let value_column: ArrayRef = Arc::new(Float64Array::from_slice([1.0; 10])) as _;
let field_column: ArrayRef = Arc::new(Float64Array::from_slice([1.0; 10])) as _;
let path_column = Arc::new(StringArray::from_slice(["foo"; 10])) as _;
let data = RecordBatch::try_new(
schema.clone(),
vec![
timestamp_column,
value_column.clone(),
value_column,
field_column.clone(),
field_column,
path_column,
],
)
Expand All @@ -485,12 +485,12 @@ mod test {
) {
let memory_exec = Arc::new(prepare_test_data());
let time_index = TIME_INDEX_COLUMN.to_string();
let value_columns = vec!["value_1".to_string(), "value_2".to_string()];
let field_columns = vec!["value_1".to_string(), "value_2".to_string()];
let manipulate_output_schema = SchemaRef::new(
RangeManipulate::calculate_output_schema(
&memory_exec.schema().to_dfschema_ref().unwrap(),
&time_index,
&value_columns,
&field_columns,
)
.unwrap()
.as_ref()
Expand All @@ -501,7 +501,7 @@ mod test {
end,
interval,
range,
value_columns,
field_columns,
output_schema: manipulate_output_schema,
time_range_column: RangeManipulate::build_timestamp_range_name(&time_index),
time_index_column: time_index,
Expand Down
Loading

0 comments on commit 99353c6

Please sign in to comment.