Skip to content

Commit

Permalink
Add primary key support to stream table (#8467)
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafasrepo authored Dec 8, 2023
1 parent cd02c40 commit 34b0445
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
14 changes: 13 additions & 1 deletion datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use async_trait::async_trait;
use futures::StreamExt;
use tokio::task::spawn_blocking;

use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_common::{plan_err, Constraints, DataFusionError, Result};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_physical_plan::common::AbortOnDropSingle;
Expand Down Expand Up @@ -100,6 +100,7 @@ pub struct StreamConfig {
encoding: StreamEncoding,
header: bool,
order: Vec<Vec<Expr>>,
constraints: Constraints,
}

impl StreamConfig {
Expand All @@ -118,6 +119,7 @@ impl StreamConfig {
encoding: StreamEncoding::Csv,
order: vec![],
header: false,
constraints: Constraints::empty(),
}
}

Expand Down Expand Up @@ -145,6 +147,12 @@ impl StreamConfig {
self
}

/// Assign constraints
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
self.constraints = constraints;
self
}

fn reader(&self) -> Result<Box<dyn RecordBatchReader>> {
let file = File::open(&self.location)?;
let schema = self.schema.clone();
Expand Down Expand Up @@ -215,6 +223,10 @@ impl TableProvider for StreamTable {
self.0.schema.clone()
}

fn constraints(&self) -> Option<&Constraints> {
Some(&self.0.constraints)
}

fn table_type(&self) -> TableType {
TableType::Base
}
Expand Down
31 changes: 31 additions & 0 deletions datafusion/sqllogictest/test_files/groupby.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4248,3 +4248,34 @@ set datafusion.sql_parser.dialect = 'Generic';

statement ok
drop table aggregate_test_100;


# Create an unbounded external table with primary key
# column c
statement ok
CREATE EXTERNAL TABLE unbounded_multiple_ordered_table_with_pk (
a0 INTEGER,
a INTEGER,
b INTEGER,
c INTEGER primary key,
d INTEGER
)
STORED AS CSV
WITH HEADER ROW
WITH ORDER (a ASC, b ASC)
WITH ORDER (c ASC)
LOCATION '../core/tests/data/window_2.csv';

# Query below can be executed, since c is primary key.
query III rowsort
SELECT c, a, SUM(d)
FROM unbounded_multiple_ordered_table_with_pk
GROUP BY c
ORDER BY c
LIMIT 5
----
0 0 0
1 0 2
2 0 0
3 0 0
4 0 1

0 comments on commit 34b0445

Please sign in to comment.