Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use btree to search fields in DFSchema #7870

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 186 additions & 40 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
//! DFSchema is an extended schema struct that DataFusion uses to provide support for
//! fields with optional relation names.

use std::collections::{HashMap, HashSet};
use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::convert::TryFrom;
use std::fmt::{Display, Formatter};
use std::hash::Hash;
Expand All @@ -35,11 +38,122 @@ use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
/// A reference-counted reference to a `DFSchema`.
pub type DFSchemaRef = Arc<DFSchema>;

/// [`FieldReference`]s represent a multi part identifier (path) to a
/// field that may require further resolution.
#[derive(Debug, Clone, PartialEq, Eq)]
struct FieldReference<'a> {
/// The field name
name: Cow<'a, str>,
/// Optional qualifier (usually a table or relation name)
qualifier: Option<TableReference<'a>>,
}

impl<'a> PartialOrd for FieldReference<'a> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl<'a> Ord for FieldReference<'a> {
fn cmp(&self, other: &Self) -> Ordering {
if self == other {
return Ordering::Equal;
}

match self.field().cmp(other.field()) {
Ordering::Less => return Ordering::Less,
Ordering::Greater => return Ordering::Greater,
Ordering::Equal => {}
}
Comment on lines +63 to +67
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
match self.field().cmp(other.field()) {
Ordering::Less => return Ordering::Less,
Ordering::Greater => return Ordering::Greater,
Ordering::Equal => {}
}
let field_cmp = self.field().cmp(other.field());
if field_cmp != Ordering::Equal {
return field_cmp;
}


match (self.table(), other.table()) {
(Some(lhs), Some(rhs)) => match lhs.cmp(rhs) {
Ordering::Less => return Ordering::Less,
Ordering::Greater => return Ordering::Greater,
Ordering::Equal => {}
},
Comment on lines +70 to +74
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(Some(lhs), Some(rhs)) => match lhs.cmp(rhs) {
Ordering::Less => return Ordering::Less,
Ordering::Greater => return Ordering::Greater,
Ordering::Equal => {}
},
(Some(lhs), Some(rhs)) => {
let cmp = lhs.cmp(rhs);
if cmp != Ordering::Equal {
return cmp;
}
}

(Some(_), None) => return Ordering::Greater,
(None, Some(_)) => return Ordering::Less,
_ => {}
}

match (self.schema(), other.schema()) {
(Some(lhs), Some(rhs)) => match lhs.cmp(rhs) {
Ordering::Less => return Ordering::Less,
Ordering::Greater => return Ordering::Greater,
Ordering::Equal => {}
},
Comment on lines +81 to +85
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(Some(lhs), Some(rhs)) => match lhs.cmp(rhs) {
Ordering::Less => return Ordering::Less,
Ordering::Greater => return Ordering::Greater,
Ordering::Equal => {}
},
(Some(lhs), Some(rhs)) => {
let cmp = lhs.cmp(rhs);
if cmp != Ordering::Equal {
return cmp;
}
}

(Some(_), None) => return Ordering::Greater,
(None, Some(_)) => return Ordering::Less,
_ => {}
}

match (self.catalog(), other.catalog()) {
(Some(lhs), Some(rhs)) => match lhs.cmp(rhs) {
Ordering::Less => return Ordering::Less,
Ordering::Greater => return Ordering::Greater,
Ordering::Equal => {}
},
Comment on lines +92 to +96
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(Some(lhs), Some(rhs)) => match lhs.cmp(rhs) {
Ordering::Less => return Ordering::Less,
Ordering::Greater => return Ordering::Greater,
Ordering::Equal => {}
},
(Some(lhs), Some(rhs)) => {
let cmp = lhs.cmp(rhs);
if cmp != Ordering::Equal {
return cmp;
}
}

(Some(_), None) => return Ordering::Greater,
(None, Some(_)) => return Ordering::Less,
_ => {}
}

Ordering::Equal
}
}

/// This is a [`FieldReference`] that has 'static lifetime (aka it
/// owns the underlying strings)
type OwnedFieldReference = FieldReference<'static>;

impl<'a> FieldReference<'a> {
/// Convenience method for creating a [`FieldReference`].
pub fn new(
name: impl Into<Cow<'a, str>>,
qualifier: Option<TableReference<'a>>,
) -> Self {
Self {
name: name.into(),
qualifier,
}
}

/// Compare with another [`FieldReference`] as if both are resolved.
/// This allows comparing across variants, where if a field is not present
/// in both variants being compared then it is ignored in the comparison.
pub fn resolved_eq(&self, other: &Self) -> bool {
self.name == other.name
&& match (&self.qualifier, &other.qualifier) {
(Some(lhs), Some(rhs)) => lhs.resolved_eq(rhs),
_ => true,
}
}

fn field(&self) -> &str {
&self.name
}

fn table(&self) -> Option<&str> {
self.qualifier.as_ref().map(|q| q.table())
}

fn schema(&self) -> Option<&str> {
self.qualifier.as_ref().and_then(|q| q.schema())
}

fn catalog(&self) -> Option<&str> {
self.qualifier.as_ref().and_then(|q| q.catalog())
}
}

/// DFSchema wraps an Arrow schema and adds relation names
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DFSchema {
/// Fields
fields: Vec<DFField>,
/// Fields index
fields_index: BTreeMap<OwnedFieldReference, Vec<usize>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we use BTree here because it ensures the order of the index 🤔.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do you care about the index order? You either iterate over the fields in order (use self.fields.iter()) or you lookup a field by name (use self.field_index.get(...)). The index is orderd by field name. So this argument would only be valid if we OFTEN iterate over the fields in name order, which I don't think we do.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough

/// Additional metadata in form of key value pairs
metadata: HashMap<String, String>,
/// Stores functional dependencies in the schema.
Expand All @@ -51,6 +165,7 @@ impl DFSchema {
pub fn empty() -> Self {
Self {
fields: vec![],
fields_index: BTreeMap::new(),
metadata: HashMap::new(),
functional_dependencies: FunctionalDependencies::empty(),
}
Expand Down Expand Up @@ -102,8 +217,12 @@ impl DFSchema {
));
}
}

let fields_index = build_index(&fields);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the index is built for all DFSchema that are created, I wonder if that will be too much overhead. Maybe we could consider creating it on first use 🤔 or finding some way to canonicalize / cache the map

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this use case might indeed be a good call of interior mutability, i.e. use an RWLock and init the lookup table on the first use

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more: instead of RwLock, this can be solved even more elegantly w/ OnceLock::get_or_init (this is usually used for static variables, but you can totally use that for struct members as well).


Ok(Self {
fields,
fields_index,
metadata,
functional_dependencies: FunctionalDependencies::empty(),
})
Expand Down Expand Up @@ -159,6 +278,20 @@ impl DFSchema {
};
if !duplicated_field {
self.fields.push(field.clone());
let idx = self.fields.len() - 1;

let field_ref = OwnedFieldReference::new(
field.name().clone(),
field.qualifier().map(|q| q.to_owned_reference()),
);
match self.fields_index.entry(field_ref) {
Entry::Vacant(entry) => {
entry.insert(vec![idx]);
}
Entry::Occupied(mut entry) => {
entry.get_mut().push(idx);
}
}
}
}
self.metadata.extend(other_schema.metadata.clone())
Expand Down Expand Up @@ -206,34 +339,15 @@ impl DFSchema {
qualifier: Option<&TableReference>,
name: &str,
) -> Result<Option<usize>> {
let field_ref = FieldReference::new(name, qualifier.cloned());
let mut matches = self
.fields
.iter()
.enumerate()
.filter(|(_, field)| match (qualifier, &field.qualifier) {
// field to lookup is qualified.
// current field is qualified and not shared between relations, compare both
// qualifier and name.
(Some(q), Some(field_q)) => {
q.resolved_eq(field_q) && field.name() == name
}
// field to lookup is qualified but current field is unqualified.
(Some(qq), None) => {
// the original field may now be aliased with a name that matches the
// original qualified name
let column = Column::from_qualified_name(field.name());
match column {
Column {
relation: Some(r),
name: column_name,
} => &r == qq && column_name == name,
_ => false,
}
}
// field to lookup is unqualified, no need to compare qualifier
(None, Some(_)) | (None, None) => field.name() == name,
.fields_index
.range(field_ref..)
.take_while(|(q, _indices)| {
q.resolved_eq(&FieldReference::new(name, qualifier.cloned()))
})
.map(|(idx, _)| idx);
.flat_map(|(_q, indices)| indices)
.copied();
Ok(matches.next())
}

Expand Down Expand Up @@ -272,9 +386,11 @@ impl DFSchema {

/// Find all fields match the given name
pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> {
self.fields
.iter()
.filter(|field| field.name() == name)
self.fields_index
.range(FieldReference::new(name, None)..)
.take_while(|(q, _indices)| q.field() == name)
.flat_map(|(_q, indices)| indices)
.map(|idx| self.field(*idx))
.collect()
}

Expand Down Expand Up @@ -454,25 +570,31 @@ impl DFSchema {

/// Strip all field qualifier in schema
pub fn strip_qualifiers(self) -> Self {
let fields: Vec<DFField> = self
.fields
.into_iter()
.map(|f| f.strip_qualifier())
.collect();
let fields_index = build_index(&fields);
DFSchema {
fields: self
.fields
.into_iter()
.map(|f| f.strip_qualifier())
.collect(),
fields,
fields_index,
..self
}
}

/// Replace all field qualifier with new value in schema
pub fn replace_qualifier(self, qualifier: impl Into<OwnedTableReference>) -> Self {
let qualifier = qualifier.into();
let fields: Vec<DFField> = self
.fields
.into_iter()
.map(|f| DFField::from_qualified(qualifier.clone(), f.field))
.collect();
let fields_index = build_index(&fields);
DFSchema {
fields: self
.fields
.into_iter()
.map(|f| DFField::from_qualified(qualifier.clone(), f.field))
.collect(),
fields,
fields_index,
..self
}
}
Expand All @@ -496,6 +618,30 @@ impl DFSchema {
}
}

fn build_index(fields: &[DFField]) -> BTreeMap<OwnedFieldReference, Vec<usize>> {
let mut index = BTreeMap::new();
let iter = fields
.iter()
.map(|field| {
OwnedFieldReference::new(
field.name().clone(),
field.qualifier().map(|q| q.to_owned_reference()),
)
})
.enumerate();
for (idx, field) in iter {
match index.entry(field) {
Entry::Vacant(entry) => {
entry.insert(vec![idx]);
}
Entry::Occupied(mut entry) => {
entry.get_mut().push(idx);
}
}
}
index
}

impl From<DFSchema> for Schema {
/// Convert DFSchema into a Schema
fn from(df_schema: DFSchema) -> Self {
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2327,7 +2327,9 @@ mod tests {
dict_id: 0, \
dict_is_ordered: false, \
metadata: {} } }\
], metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \
], fields_index: {\
FieldReference { name: \"a\", qualifier: None }: [0]\
}, metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \
ExecutionPlan schema: Schema { fields: [\
Field { \
name: \"b\", \
Expand Down
6 changes: 5 additions & 1 deletion datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,11 +499,15 @@ mod tests {
assert_eq!(
"Optimizer rule 'get table_scan rule' failed\ncaused by\nget table_scan rule\ncaused by\n\
Internal error: Failed due to generate a different schema, \
original schema: DFSchema { fields: [], metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \
original schema: DFSchema { fields: [], fields_index: {}, metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \
new schema: DFSchema { fields: [\
DFField { qualifier: Some(Bare { table: \"test\" }), field: Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, \
DFField { qualifier: Some(Bare { table: \"test\" }), field: Field { name: \"b\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, \
DFField { qualifier: Some(Bare { table: \"test\" }), field: Field { name: \"c\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }], \
fields_index: {\
FieldReference { name: \"a\", qualifier: Some(Bare { table: \"test\" }) }: [0], \
FieldReference { name: \"b\", qualifier: Some(Bare { table: \"test\" }) }: [1], \
FieldReference { name: \"c\", qualifier: Some(Bare { table: \"test\" }) }: [2]}, \
metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }.\
\nThis was likely caused by a bug in DataFusion's code \
and we would welcome that you file an bug report in our issue tracker",
Expand Down