-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use btree to search fields in DFSchema #7870
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -18,7 +18,10 @@ | |||||||||||||||||||||||
//! DFSchema is an extended schema struct that DataFusion uses to provide support for | ||||||||||||||||||||||||
//! fields with optional relation names. | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
use std::collections::{HashMap, HashSet}; | ||||||||||||||||||||||||
use std::borrow::Cow; | ||||||||||||||||||||||||
use std::cmp::Ordering; | ||||||||||||||||||||||||
use std::collections::btree_map::Entry; | ||||||||||||||||||||||||
use std::collections::{BTreeMap, HashMap, HashSet}; | ||||||||||||||||||||||||
use std::convert::TryFrom; | ||||||||||||||||||||||||
use std::fmt::{Display, Formatter}; | ||||||||||||||||||||||||
use std::hash::Hash; | ||||||||||||||||||||||||
|
@@ -35,11 +38,122 @@ use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; | |||||||||||||||||||||||
/// A reference-counted reference to a `DFSchema`. | ||||||||||||||||||||||||
pub type DFSchemaRef = Arc<DFSchema>; | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
/// [`FieldReference`]s represent a multi part identifier (path) to a | ||||||||||||||||||||||||
/// field that may require further resolution. | ||||||||||||||||||||||||
#[derive(Debug, Clone, PartialEq, Eq)] | ||||||||||||||||||||||||
struct FieldReference<'a> { | ||||||||||||||||||||||||
/// The field name | ||||||||||||||||||||||||
name: Cow<'a, str>, | ||||||||||||||||||||||||
/// Optional qualifier (usually a table or relation name) | ||||||||||||||||||||||||
qualifier: Option<TableReference<'a>>, | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
impl<'a> PartialOrd for FieldReference<'a> { | ||||||||||||||||||||||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> { | ||||||||||||||||||||||||
Some(self.cmp(other)) | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
impl<'a> Ord for FieldReference<'a> { | ||||||||||||||||||||||||
fn cmp(&self, other: &Self) -> Ordering { | ||||||||||||||||||||||||
if self == other { | ||||||||||||||||||||||||
return Ordering::Equal; | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
match self.field().cmp(other.field()) { | ||||||||||||||||||||||||
Ordering::Less => return Ordering::Less, | ||||||||||||||||||||||||
Ordering::Greater => return Ordering::Greater, | ||||||||||||||||||||||||
Ordering::Equal => {} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
match (self.table(), other.table()) { | ||||||||||||||||||||||||
(Some(lhs), Some(rhs)) => match lhs.cmp(rhs) { | ||||||||||||||||||||||||
Ordering::Less => return Ordering::Less, | ||||||||||||||||||||||||
Ordering::Greater => return Ordering::Greater, | ||||||||||||||||||||||||
Ordering::Equal => {} | ||||||||||||||||||||||||
}, | ||||||||||||||||||||||||
Comment on lines
+70
to
+74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||
(Some(_), None) => return Ordering::Greater, | ||||||||||||||||||||||||
(None, Some(_)) => return Ordering::Less, | ||||||||||||||||||||||||
_ => {} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
match (self.schema(), other.schema()) { | ||||||||||||||||||||||||
(Some(lhs), Some(rhs)) => match lhs.cmp(rhs) { | ||||||||||||||||||||||||
Ordering::Less => return Ordering::Less, | ||||||||||||||||||||||||
Ordering::Greater => return Ordering::Greater, | ||||||||||||||||||||||||
Ordering::Equal => {} | ||||||||||||||||||||||||
}, | ||||||||||||||||||||||||
Comment on lines
+81
to
+85
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||
(Some(_), None) => return Ordering::Greater, | ||||||||||||||||||||||||
(None, Some(_)) => return Ordering::Less, | ||||||||||||||||||||||||
_ => {} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
match (self.catalog(), other.catalog()) { | ||||||||||||||||||||||||
(Some(lhs), Some(rhs)) => match lhs.cmp(rhs) { | ||||||||||||||||||||||||
Ordering::Less => return Ordering::Less, | ||||||||||||||||||||||||
Ordering::Greater => return Ordering::Greater, | ||||||||||||||||||||||||
Ordering::Equal => {} | ||||||||||||||||||||||||
}, | ||||||||||||||||||||||||
Comment on lines
+92
to
+96
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||
(Some(_), None) => return Ordering::Greater, | ||||||||||||||||||||||||
(None, Some(_)) => return Ordering::Less, | ||||||||||||||||||||||||
_ => {} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
Ordering::Equal | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
/// This is a [`FieldReference`] that has 'static lifetime (aka it | ||||||||||||||||||||||||
/// owns the underlying strings) | ||||||||||||||||||||||||
type OwnedFieldReference = FieldReference<'static>; | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
impl<'a> FieldReference<'a> { | ||||||||||||||||||||||||
/// Convenience method for creating a [`FieldReference`]. | ||||||||||||||||||||||||
pub fn new( | ||||||||||||||||||||||||
name: impl Into<Cow<'a, str>>, | ||||||||||||||||||||||||
qualifier: Option<TableReference<'a>>, | ||||||||||||||||||||||||
) -> Self { | ||||||||||||||||||||||||
Self { | ||||||||||||||||||||||||
name: name.into(), | ||||||||||||||||||||||||
qualifier, | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
/// Compare with another [`FieldReference`] as if both are resolved. | ||||||||||||||||||||||||
/// This allows comparing across variants, where if a field is not present | ||||||||||||||||||||||||
/// in both variants being compared then it is ignored in the comparison. | ||||||||||||||||||||||||
pub fn resolved_eq(&self, other: &Self) -> bool { | ||||||||||||||||||||||||
self.name == other.name | ||||||||||||||||||||||||
&& match (&self.qualifier, &other.qualifier) { | ||||||||||||||||||||||||
(Some(lhs), Some(rhs)) => lhs.resolved_eq(rhs), | ||||||||||||||||||||||||
_ => true, | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
fn field(&self) -> &str { | ||||||||||||||||||||||||
&self.name | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
fn table(&self) -> Option<&str> { | ||||||||||||||||||||||||
self.qualifier.as_ref().map(|q| q.table()) | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
fn schema(&self) -> Option<&str> { | ||||||||||||||||||||||||
self.qualifier.as_ref().and_then(|q| q.schema()) | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
fn catalog(&self) -> Option<&str> { | ||||||||||||||||||||||||
self.qualifier.as_ref().and_then(|q| q.catalog()) | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
/// DFSchema wraps an Arrow schema and adds relation names | ||||||||||||||||||||||||
#[derive(Debug, Clone, PartialEq, Eq)] | ||||||||||||||||||||||||
pub struct DFSchema { | ||||||||||||||||||||||||
/// Fields | ||||||||||||||||||||||||
fields: Vec<DFField>, | ||||||||||||||||||||||||
/// Fields index | ||||||||||||||||||||||||
fields_index: BTreeMap<OwnedFieldReference, Vec<usize>>, | ||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we use BTree here because it ensures the order of the index 🤔. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we do you care about the index order? You either iterate over the fields in order (use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair enough |
||||||||||||||||||||||||
/// Additional metadata in form of key value pairs | ||||||||||||||||||||||||
metadata: HashMap<String, String>, | ||||||||||||||||||||||||
/// Stores functional dependencies in the schema. | ||||||||||||||||||||||||
|
@@ -51,6 +165,7 @@ impl DFSchema { | |||||||||||||||||||||||
pub fn empty() -> Self { | ||||||||||||||||||||||||
Self { | ||||||||||||||||||||||||
fields: vec![], | ||||||||||||||||||||||||
fields_index: BTreeMap::new(), | ||||||||||||||||||||||||
metadata: HashMap::new(), | ||||||||||||||||||||||||
functional_dependencies: FunctionalDependencies::empty(), | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
@@ -102,8 +217,12 @@ impl DFSchema { | |||||||||||||||||||||||
)); | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
let fields_index = build_index(&fields); | ||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the index is built for all DFSchema that are created, I wonder if that will be too much overhead. Maybe we could consider creating it on first use 🤔 or finding some way to canonicalize / cache the map There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this use case might indeed be a good call of interior mutability, i.e. use an RWLock and init the lookup table on the first use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thinking about this more: instead of |
||||||||||||||||||||||||
|
||||||||||||||||||||||||
Ok(Self { | ||||||||||||||||||||||||
fields, | ||||||||||||||||||||||||
fields_index, | ||||||||||||||||||||||||
metadata, | ||||||||||||||||||||||||
functional_dependencies: FunctionalDependencies::empty(), | ||||||||||||||||||||||||
}) | ||||||||||||||||||||||||
|
@@ -159,6 +278,20 @@ impl DFSchema { | |||||||||||||||||||||||
}; | ||||||||||||||||||||||||
if !duplicated_field { | ||||||||||||||||||||||||
self.fields.push(field.clone()); | ||||||||||||||||||||||||
let idx = self.fields.len() - 1; | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
let field_ref = OwnedFieldReference::new( | ||||||||||||||||||||||||
field.name().clone(), | ||||||||||||||||||||||||
field.qualifier().map(|q| q.to_owned_reference()), | ||||||||||||||||||||||||
); | ||||||||||||||||||||||||
match self.fields_index.entry(field_ref) { | ||||||||||||||||||||||||
Entry::Vacant(entry) => { | ||||||||||||||||||||||||
entry.insert(vec![idx]); | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
Entry::Occupied(mut entry) => { | ||||||||||||||||||||||||
entry.get_mut().push(idx); | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
self.metadata.extend(other_schema.metadata.clone()) | ||||||||||||||||||||||||
|
@@ -206,34 +339,15 @@ impl DFSchema { | |||||||||||||||||||||||
qualifier: Option<&TableReference>, | ||||||||||||||||||||||||
name: &str, | ||||||||||||||||||||||||
) -> Result<Option<usize>> { | ||||||||||||||||||||||||
let field_ref = FieldReference::new(name, qualifier.cloned()); | ||||||||||||||||||||||||
let mut matches = self | ||||||||||||||||||||||||
.fields | ||||||||||||||||||||||||
.iter() | ||||||||||||||||||||||||
.enumerate() | ||||||||||||||||||||||||
.filter(|(_, field)| match (qualifier, &field.qualifier) { | ||||||||||||||||||||||||
// field to lookup is qualified. | ||||||||||||||||||||||||
// current field is qualified and not shared between relations, compare both | ||||||||||||||||||||||||
// qualifier and name. | ||||||||||||||||||||||||
(Some(q), Some(field_q)) => { | ||||||||||||||||||||||||
q.resolved_eq(field_q) && field.name() == name | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
// field to lookup is qualified but current field is unqualified. | ||||||||||||||||||||||||
(Some(qq), None) => { | ||||||||||||||||||||||||
// the original field may now be aliased with a name that matches the | ||||||||||||||||||||||||
// original qualified name | ||||||||||||||||||||||||
let column = Column::from_qualified_name(field.name()); | ||||||||||||||||||||||||
match column { | ||||||||||||||||||||||||
Column { | ||||||||||||||||||||||||
relation: Some(r), | ||||||||||||||||||||||||
name: column_name, | ||||||||||||||||||||||||
} => &r == qq && column_name == name, | ||||||||||||||||||||||||
_ => false, | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
// field to lookup is unqualified, no need to compare qualifier | ||||||||||||||||||||||||
(None, Some(_)) | (None, None) => field.name() == name, | ||||||||||||||||||||||||
.fields_index | ||||||||||||||||||||||||
.range(field_ref..) | ||||||||||||||||||||||||
.take_while(|(q, _indices)| { | ||||||||||||||||||||||||
q.resolved_eq(&FieldReference::new(name, qualifier.cloned())) | ||||||||||||||||||||||||
}) | ||||||||||||||||||||||||
.map(|(idx, _)| idx); | ||||||||||||||||||||||||
.flat_map(|(_q, indices)| indices) | ||||||||||||||||||||||||
.copied(); | ||||||||||||||||||||||||
Ok(matches.next()) | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
|
@@ -272,9 +386,11 @@ impl DFSchema { | |||||||||||||||||||||||
|
||||||||||||||||||||||||
/// Find all fields match the given name | ||||||||||||||||||||||||
pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> { | ||||||||||||||||||||||||
self.fields | ||||||||||||||||||||||||
.iter() | ||||||||||||||||||||||||
.filter(|field| field.name() == name) | ||||||||||||||||||||||||
self.fields_index | ||||||||||||||||||||||||
.range(FieldReference::new(name, None)..) | ||||||||||||||||||||||||
.take_while(|(q, _indices)| q.field() == name) | ||||||||||||||||||||||||
.flat_map(|(_q, indices)| indices) | ||||||||||||||||||||||||
.map(|idx| self.field(*idx)) | ||||||||||||||||||||||||
.collect() | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
|
@@ -454,25 +570,31 @@ impl DFSchema { | |||||||||||||||||||||||
|
||||||||||||||||||||||||
/// Strip all field qualifier in schema | ||||||||||||||||||||||||
pub fn strip_qualifiers(self) -> Self { | ||||||||||||||||||||||||
let fields: Vec<DFField> = self | ||||||||||||||||||||||||
.fields | ||||||||||||||||||||||||
.into_iter() | ||||||||||||||||||||||||
.map(|f| f.strip_qualifier()) | ||||||||||||||||||||||||
.collect(); | ||||||||||||||||||||||||
let fields_index = build_index(&fields); | ||||||||||||||||||||||||
DFSchema { | ||||||||||||||||||||||||
fields: self | ||||||||||||||||||||||||
.fields | ||||||||||||||||||||||||
.into_iter() | ||||||||||||||||||||||||
.map(|f| f.strip_qualifier()) | ||||||||||||||||||||||||
.collect(), | ||||||||||||||||||||||||
fields, | ||||||||||||||||||||||||
fields_index, | ||||||||||||||||||||||||
..self | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
/// Replace all field qualifier with new value in schema | ||||||||||||||||||||||||
pub fn replace_qualifier(self, qualifier: impl Into<OwnedTableReference>) -> Self { | ||||||||||||||||||||||||
let qualifier = qualifier.into(); | ||||||||||||||||||||||||
let fields: Vec<DFField> = self | ||||||||||||||||||||||||
.fields | ||||||||||||||||||||||||
.into_iter() | ||||||||||||||||||||||||
.map(|f| DFField::from_qualified(qualifier.clone(), f.field)) | ||||||||||||||||||||||||
.collect(); | ||||||||||||||||||||||||
let fields_index = build_index(&fields); | ||||||||||||||||||||||||
DFSchema { | ||||||||||||||||||||||||
fields: self | ||||||||||||||||||||||||
.fields | ||||||||||||||||||||||||
.into_iter() | ||||||||||||||||||||||||
.map(|f| DFField::from_qualified(qualifier.clone(), f.field)) | ||||||||||||||||||||||||
.collect(), | ||||||||||||||||||||||||
fields, | ||||||||||||||||||||||||
fields_index, | ||||||||||||||||||||||||
..self | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
@@ -496,6 +618,30 @@ impl DFSchema { | |||||||||||||||||||||||
} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
fn build_index(fields: &[DFField]) -> BTreeMap<OwnedFieldReference, Vec<usize>> { | ||||||||||||||||||||||||
let mut index = BTreeMap::new(); | ||||||||||||||||||||||||
let iter = fields | ||||||||||||||||||||||||
.iter() | ||||||||||||||||||||||||
.map(|field| { | ||||||||||||||||||||||||
OwnedFieldReference::new( | ||||||||||||||||||||||||
field.name().clone(), | ||||||||||||||||||||||||
field.qualifier().map(|q| q.to_owned_reference()), | ||||||||||||||||||||||||
) | ||||||||||||||||||||||||
}) | ||||||||||||||||||||||||
.enumerate(); | ||||||||||||||||||||||||
for (idx, field) in iter { | ||||||||||||||||||||||||
match index.entry(field) { | ||||||||||||||||||||||||
Entry::Vacant(entry) => { | ||||||||||||||||||||||||
entry.insert(vec![idx]); | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
Entry::Occupied(mut entry) => { | ||||||||||||||||||||||||
entry.get_mut().push(idx); | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
index | ||||||||||||||||||||||||
} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
impl From<DFSchema> for Schema { | ||||||||||||||||||||||||
/// Convert DFSchema into a Schema | ||||||||||||||||||||||||
fn from(df_schema: DFSchema) -> Self { | ||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.