Skip to content

Commit

Permalink
update FileStreamConfig with incremental props
Browse files Browse the repository at this point in the history
  • Loading branch information
flarco committed Aug 31, 2024
1 parent 3686304 commit 20f250e
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 41 deletions.
7 changes: 5 additions & 2 deletions core/dbio/filesys/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,14 @@ func (fs *BaseFileSysClient) GetDatastream(uri string, cfg ...iop.FileStreamConf

// no reader needed for iceberg, delta, duckdb will handle it
if g.In(fileFormat, dbio.FileTypeIceberg, dbio.FileTypeDelta) {
Cfg.Props = map[string]string{"fs_props": g.Marshal(fs.Props())}
switch fileFormat {
case dbio.FileTypeIceberg:
err = ds.ConsumeIcebergReader(uri, Cfg.Select, cast.ToUint64(Cfg.Limit), fs.Props())
err = ds.ConsumeIcebergReader(uri, Cfg)
case dbio.FileTypeDelta:
err = ds.ConsumeDeltaReader(uri, Cfg.Select, cast.ToUint64(Cfg.Limit), fs.Props())
err = ds.ConsumeDeltaReader(uri, Cfg)
case dbio.FileTypeParquet:
err = ds.ConsumeParquetReaderDuckDb(uri, Cfg)
}

if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions core/dbio/filesys/fs_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,14 @@ func (fs *LocalFileSysClient) GetDatastream(uri string, cfg ...iop.FileStreamCon
// no reader for iceberg, delta, duckdb will handle it
if g.In(fileFormat, dbio.FileTypeIceberg, dbio.FileTypeDelta) {
file.Close() // no need to keep the file open

Cfg.Props = map[string]string{"fs_props": g.Marshal(fs.Props())}
switch fileFormat {
case dbio.FileTypeIceberg:
err = ds.ConsumeIcebergReader("file://"+path, Cfg.Select, cast.ToUint64(Cfg.Limit), fs.Props())
err = ds.ConsumeIcebergReader("file://"+path, Cfg)
case dbio.FileTypeDelta:
err = ds.ConsumeDeltaReader("file://"+path, Cfg.Select, cast.ToUint64(Cfg.Limit), fs.Props())
err = ds.ConsumeDeltaReader("file://"+path, Cfg)
case dbio.FileTypeParquet:
err = ds.ConsumeParquetReaderDuckDb("file://"+path, Cfg)
}
if err != nil {
ds.Context.CaptureErr(g.Error(err, "Error consuming reader for %s", path))
Expand Down
32 changes: 19 additions & 13 deletions core/dbio/iop/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,13 @@ type schemaChg struct {
}

type FileStreamConfig struct {
Limit int
Select []string
Format dbio.FileType
Limit int `json:"limit"`
Select []string `json:"select"`
Format dbio.FileType `json:"format"`
IncrementalKey string `json:"incremental_key"`
IncrementalVal any `json:"incremental_val"`
IncrementalValStr string `json:"incremental_val_str"`
Props map[string]string `json:"props"`
}

type KeyValue struct {
Expand Down Expand Up @@ -1435,15 +1439,16 @@ func (ds *Datastream) ConsumeParquetReader(reader io.Reader) (err error) {
}

// ConsumeParquetReader uses the provided reader to stream rows
func (ds *Datastream) ConsumeParquetReaderDuckDb(uri string, fields []string, limit uint64, fsProps map[string]string) (err error) {
func (ds *Datastream) ConsumeParquetReaderDuckDb(uri string, sc FileStreamConfig) (err error) {

props := g.MapToKVArr(map[string]string{"fs_props": g.Marshal(fsProps)})
props := g.MapToKVArr(sc.Props)
p, err := NewParquetReaderDuckDb(uri, props...)
if err != nil {
return g.Error(err, "could not create ParquetDuckDb")
}

ds, err = p.Duck.Stream(p.MakeSelectQuery(fields, limit), g.M("datastream", ds))
sql := p.MakeSelectQuery(sc.Select, cast.ToUint64(sc.Limit), sc.IncrementalKey, sc.IncrementalValStr)
ds, err = p.Duck.Stream(sql, g.M("datastream", ds))
if err != nil {
return g.Error(err, "could not read parquet rows")
}
Expand All @@ -1455,15 +1460,15 @@ func (ds *Datastream) ConsumeParquetReaderDuckDb(uri string, fields []string, li
}

// ConsumeIcebergReader uses the provided reader to stream rows
func (ds *Datastream) ConsumeIcebergReader(uri string, fields []string, limit uint64, fsProps map[string]string) (err error) {
props := g.MapToKVArr(map[string]string{"fs_props": g.Marshal(fsProps)})

func (ds *Datastream) ConsumeIcebergReader(uri string, sc FileStreamConfig) (err error) {
props := g.MapToKVArr(sc.Props)
i, err := NewIcebergReader(uri, props...)
if err != nil {
return g.Error(err, "could not create IcebergDuckDb")
}

ds, err = i.Duck.Stream(i.MakeSelectQuery(fields, limit), g.M("datastream", ds))
sql := i.MakeSelectQuery(sc.Select, cast.ToUint64(sc.Limit), sc.IncrementalKey, sc.IncrementalValStr)
ds, err = i.Duck.Stream(sql, g.M("datastream", ds))
if err != nil {
return g.Error(err, "could not read iceberg rows")
}
Expand All @@ -1474,15 +1479,16 @@ func (ds *Datastream) ConsumeIcebergReader(uri string, fields []string, limit ui
}

// ConsumeDeltaReader uses the provided reader to stream rows
func (ds *Datastream) ConsumeDeltaReader(uri string, fields []string, limit uint64, fsProps map[string]string) (err error) {
func (ds *Datastream) ConsumeDeltaReader(uri string, sc FileStreamConfig) (err error) {

props := g.MapToKVArr(map[string]string{"fs_props": g.Marshal(fsProps)})
props := g.MapToKVArr(sc.Props)
d, err := NewDeltaReader(uri, props...)
if err != nil {
return g.Error(err, "could not create DeltaReader")
}

ds, err = d.Duck.Stream(d.MakeSelectQuery(fields, limit), g.M("datastream", ds))
sql := d.MakeSelectQuery(sc.Select, cast.ToUint64(sc.Limit), sc.IncrementalKey, sc.IncrementalValStr)
ds, err = d.Duck.Stream(sql, g.M("datastream", ds))
if err != nil {
return g.Error(err, "could not read delta rows")
}
Expand Down
6 changes: 3 additions & 3 deletions core/dbio/iop/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (d *DeltaReader) Columns() (Columns, error) {
}

var err error
d.columns, err = d.Duck.Describe(d.MakeSelectQuery(nil, 0))
d.columns, err = d.Duck.Describe(d.MakeSelectQuery(nil, 0, "", nil))
if err != nil {
return nil, g.Error(err, "could not get columns")
}
Expand All @@ -43,6 +43,6 @@ func (d *DeltaReader) Close() error {
return d.Duck.Close()
}

func (d *DeltaReader) MakeSelectQuery(fields []string, limit uint64) string {
return d.Duck.MakeScanSelectQuery("delta_scan", d.URI, fields, limit)
func (d *DeltaReader) MakeSelectQuery(fields []string, limit uint64, incrementalKey string, incrementalValue any) string {
return d.Duck.MakeScanSelectQuery("delta_scan", d.URI, fields, incrementalKey, incrementalValue, limit)
}
4 changes: 2 additions & 2 deletions core/dbio/iop/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func testDeltaReader(t *testing.T, d *DeltaReader) {
// Test MakeSelectQuery method with all fields
allFields := []string{"*"}
limit := uint64(0) // No limit
query := d.MakeSelectQuery(allFields, limit)
query := d.MakeSelectQuery(allFields, limit, "", nil)
expectedQuery := g.F("select * from delta_scan('%s')", d.URI)
assert.Equal(t, expectedQuery, query, "Generated query should match expected query for all fields")

Expand Down Expand Up @@ -111,7 +111,7 @@ func testDeltaReader(t *testing.T, d *DeltaReader) {
// Test MakeSelectQuery method
fields := []string{"first_name", "last_name", "country"}
limit := uint64(10)
query := d.MakeSelectQuery(fields, limit)
query := d.MakeSelectQuery(fields, limit, "", nil)
expectedQuery := g.F("select \"first_name\",\"last_name\",\"country\" from delta_scan('%s') limit 10", d.URI)
assert.Equal(t, expectedQuery, query, "Generated query should match expected query")

Expand Down
14 changes: 10 additions & 4 deletions core/dbio/iop/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,24 +798,30 @@ func (duck *DuckDb) Describe(query string) (columns Columns, err error) {
return
}

func (duck *DuckDb) MakeScanSelectQuery(scanFunc, uri string, fields []string, limit uint64) (sql string) {
func (duck *DuckDb) MakeScanSelectQuery(scanFunc, uri string, fields []string, incrementalKey string, incrementalValue any, limit uint64) (sql string) {
if len(fields) == 0 || fields[0] == "*" {
fields = []string{"*"}
} else {
fields = dbio.TypeDbDuckDb.QuoteNames(fields...)
}

where := ""
if incrementalKey != "" && incrementalValue != nil {
where = fmt.Sprintf("where %s > %v", dbio.TypeDbDuckDb.Quote(incrementalKey), incrementalValue)
}

templ := dbio.TypeDbDuckDb.GetTemplateValue("core." + scanFunc)
if templ == "" {
templ = "select {fields} from {scanFunc}('{uri}')"
templ = "select {fields} from {scanFunc}('{uri}') {where}"
}

sql = g.R(
sql = strings.TrimSpace(g.R(
templ,
"fields", strings.Join(fields, ","),
"scanFunc", scanFunc,
"uri", uri,
)
"where", where,
))

if limit > 0 {
sql += fmt.Sprintf(" limit %d", limit)
Expand Down
6 changes: 3 additions & 3 deletions core/dbio/iop/iceberg.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (i *IcebergReader) Columns() (Columns, error) {
}

var err error
i.columns, err = i.Duck.Describe(i.MakeSelectQuery(nil, 0))
i.columns, err = i.Duck.Describe(i.MakeSelectQuery(nil, 0, "", nil))
if err != nil {
return nil, g.Error(err, "could not get columns")
}
Expand All @@ -43,6 +43,6 @@ func (i *IcebergReader) Close() error {
return i.Duck.Close()
}

func (i *IcebergReader) MakeSelectQuery(fields []string, limit uint64) string {
return i.Duck.MakeScanSelectQuery("iceberg_scan", i.URI, fields, limit)
func (i *IcebergReader) MakeSelectQuery(fields []string, limit uint64, incrementalKey string, incrementalValue any) string {
return i.Duck.MakeScanSelectQuery("iceberg_scan", i.URI, fields, incrementalKey, incrementalValue, limit)
}
48 changes: 44 additions & 4 deletions core/dbio/iop/iceberg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"fmt"
"os"
"testing"
"time"

"github.com/flarco/g"
"github.com/spf13/cast"
"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v2"
)
Expand All @@ -21,8 +23,8 @@ func TestIcebergReaderLocal(t *testing.T) {
testIcebergReader(t, i)

// Test Close method
// err = i.Close()
// assert.NoError(t, err)
err = i.Close()
assert.NoError(t, err)
}

func TestIcebergReaderS3(t *testing.T) {
Expand Down Expand Up @@ -96,7 +98,7 @@ func testIcebergReader(t *testing.T, i *IcebergReader) {
// Test MakeSelectQuery method with all fields
allFields := []string{"*"}
limit := uint64(0) // No limit
query := i.MakeSelectQuery(allFields, limit)
query := i.MakeSelectQuery(allFields, limit, "", nil)
expectedQuery := fmt.Sprintf("select * from iceberg_scan('%s', allow_moved_paths = true)", i.URI)
assert.Equal(t, expectedQuery, query, "Generated query should match expected query for all fields")

Expand All @@ -123,7 +125,7 @@ func testIcebergReader(t *testing.T, i *IcebergReader) {
// Test MakeSelectQuery method
fields := []string{"l_orderkey", "l_quantity", "l_shipdate"}
limit := uint64(10)
query := i.MakeSelectQuery(fields, limit)
query := i.MakeSelectQuery(fields, limit, "", nil)
expectedQuery := fmt.Sprintf("select \"l_orderkey\",\"l_quantity\",\"l_shipdate\" from iceberg_scan('%s', allow_moved_paths = true) limit 10", i.URI)
assert.Equal(t, expectedQuery, query, "Generated query should match expected query")

Expand All @@ -141,4 +143,42 @@ func testIcebergReader(t *testing.T, i *IcebergReader) {
}
})

t.Run("Test MakeSelectQuery with incremental key and value", func(t *testing.T) {
// Test MakeSelectQuery method with incremental key and value
fields := []string{"l_orderkey", "l_quantity", "l_shipdate", "l_commitdate"}
limit := uint64(0) // No limit
incrementalKey := "l_commitdate"
incrementalValue := "'1996-01-01'"
query := i.MakeSelectQuery(fields, limit, incrementalKey, incrementalValue)
expectedQuery := fmt.Sprintf("select \"l_orderkey\",\"l_quantity\",\"l_shipdate\",\"l_commitdate\" from iceberg_scan('%s', allow_moved_paths = true) where \"l_commitdate\" > '1996-01-01'", i.URI)
assert.Equal(t, expectedQuery, query, "Generated query should match expected query with incremental key and value")

// Test actual query execution
ds, err := i.Duck.Stream(query)
assert.NoError(t, err, "Streaming query should not produce an error")
assert.NotNil(t, ds, "Datastream should not be nil")

if ds != nil {
data, err := ds.Collect(0)
assert.NoError(t, err, "Collecting data should not produce an error")
assert.NotEmpty(t, data.Rows, "Query should return non-empty result")
assert.Equal(t, 4, len(data.Columns), "Result should have 4 columns")

// Verify column names
expectedColumnNames := []string{"l_orderkey", "l_quantity", "l_shipdate", "l_commitdate"}
for i, expectedName := range expectedColumnNames {
assert.Equal(t, expectedName, data.Columns[i].Name, "Column name should match")
}

// Verify that all returned rows have l_commitdate > '1996-01-01'
for _, row := range data.Rows {
commitDate := cast.ToTime(row[3])
assert.NoError(t, err, "Parsing commit date should not produce an error")
if !assert.True(t, commitDate.After(time.Date(1996, 1, 1, 0, 0, 0, 0, time.UTC)), "All returned rows should have l_commitdate > '1996-01-01'") {
break
}
}
}
})

}
6 changes: 3 additions & 3 deletions core/dbio/iop/parquet_duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (p *ParquetDuckDb) Columns() (Columns, error) {
// query := fmt.Sprintf("SELECT path_in_schema as column_name, type as column_type, column_id, num_values, total_uncompressed_size FROM parquet_metadata('%s') order by column_id", p.URI)

var err error
p.columns, err = p.Duck.Describe(p.MakeSelectQuery(nil, 0))
p.columns, err = p.Duck.Describe(p.MakeSelectQuery(nil, 0, "", nil))
if err != nil {
return nil, g.Error(err, "could not get columns")
}
Expand All @@ -42,6 +42,6 @@ func (p *ParquetDuckDb) Close() error {
return p.Duck.Close()
}

func (p *ParquetDuckDb) MakeSelectQuery(fields []string, limit uint64) string {
return p.Duck.MakeScanSelectQuery("parquet_scan", p.URI, fields, limit)
func (p *ParquetDuckDb) MakeSelectQuery(fields []string, limit uint64, incrementalKey string, incrementalValue any) string {
return p.Duck.MakeScanSelectQuery("parquet_scan", p.URI, fields, incrementalKey, incrementalValue, limit)
}
6 changes: 3 additions & 3 deletions core/dbio/templates/duckdb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ core:
insert_option: ""
modify_column: 'alter {column} type {type}'

iceberg_scan: select {fields} from iceberg_scan('{uri}', allow_moved_paths = true)
delta_scan: select {fields} from delta_scan('{uri}')
parquet_scan: select {fields} from parquet_scan('{uri}')
iceberg_scan: select {fields} from iceberg_scan('{uri}', allow_moved_paths = true) {where}
delta_scan: select {fields} from delta_scan('{uri}') {where}
parquet_scan: select {fields} from parquet_scan('{uri}') {where}


metadata:
Expand Down
4 changes: 4 additions & 0 deletions core/dbio/templates/motherduck.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ core:
insert_option: ""
modify_column: 'alter {column} type {type}'

iceberg_scan: select {fields} from iceberg_scan('{uri}', allow_moved_paths = true) {where}
delta_scan: select {fields} from delta_scan('{uri}') {where}
parquet_scan: select {fields} from parquet_scan('{uri}') {where}


metadata:
databases: PRAGMA database_list
Expand Down
8 changes: 7 additions & 1 deletion core/sling/task_run_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,13 @@ func (t *TaskExecution) ReadFromFile(cfg *Config) (df *iop.Dataflow, err error)
return t.df, err
}

fsCfg := iop.FileStreamConfig{Select: cfg.Source.Select, Limit: cfg.Source.Limit()}
fsCfg := iop.FileStreamConfig{
Select: cfg.Source.Select,
Limit: cfg.Source.Limit(),
IncrementalKey: cfg.Source.UpdateKey,
IncrementalVal: cfg.IncrementalVal,
IncrementalValStr: cfg.IncrementalValStr,
}
if ffmt := cfg.Source.Options.Format; ffmt != nil {
fsCfg.Format = *ffmt
}
Expand Down

0 comments on commit 20f250e

Please sign in to comment.