Skip to content

Commit

Permalink
fix file incrmental delta/iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
flarco committed Sep 1, 2024
1 parent 1a5704e commit 3ee4116
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 43 deletions.
9 changes: 8 additions & 1 deletion cmd/sling/tests/replications/r.15.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@ streams:
sling_test/*iceberg:
source_options:
format: iceberg
limit: {iceberg_limit}
object: public.sling_test_{stream_file_name}
update_key: l_commitdate
mode: {iceberg_mode} # if blank, defaults to full-refresh

sling_test/delta:
source_options:
format: delta
object: public.sling_test_{stream_file_name}
object: public.sling_test_{stream_file_name}

env:
iceberg_limit: $ICEBERG_LIMIT
iceberg_mode: $ICEBERG_MODE
13 changes: 7 additions & 6 deletions cmd/sling/tests/suite.cli.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ n test_name rows bytes streams fails output_contains command
41 Run sling with replication configuration 09 >1 sling run -r cmd/sling/tests/replications/r.09.yaml
42 Run sling with replication configuration 11 and year parameter 3 test1k/2005 YEAR=2005 sling run -r cmd/sling/tests/replications/r.11.yaml
43 Run sling with replication configuration 12 1 sling run -r cmd/sling/tests/replications/r.12.yaml
44 Run sling with replication configuration 15 2 sling run -r cmd/sling/tests/replications/r.15.yaml
45 Run sling with replication configuration 14 2 sling run -r cmd/sling/tests/replications/r.14.yaml
46 Run sling with replication configuration 14 0 sling run -r cmd/sling/tests/replications/r.14.yaml # file incremental. Second run should have no new rows
47 Run sling with task configuration 24 sling run -c cmd/sling/tests/task.yaml
48 Run sling with Parquet source 1018 sling run --src-stream 'file://cmd/sling/tests/files/parquet' --stdout > /dev/null
49 Run sling with empty input 0 execution succeeded echo '' | sling run --stdout
44 Run sling with replication configuration 15 2 100 rows ICEBERG_LIMIT=100 sling run -r cmd/sling/tests/replications/r.15.yaml # iceberg & delta
45 Run sling with replication configuration 15 incremental 1 incremental ICEBERG_MODE=incremental sling run -r cmd/sling/tests/replications/r.15.yaml --streams sling_test/lineitem_iceberg/
46 Run sling with replication configuration 14 2 sling run -r cmd/sling/tests/replications/r.14.yaml
47 Run sling with replication configuration 14 0 sling run -r cmd/sling/tests/replications/r.14.yaml # file incremental. Second run should have no new rows
48 Run sling with task configuration 24 sling run -c cmd/sling/tests/task.yaml
49 Run sling with Parquet source 1018 sling run --src-stream 'file://cmd/sling/tests/files/parquet' --stdout > /dev/null
50 Run sling with empty input 0 execution succeeded echo '' | sling run --stdout
19 changes: 9 additions & 10 deletions core/dbio/iop/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,12 @@ type schemaChg struct {
}

type FileStreamConfig struct {
Limit int `json:"limit"`
Select []string `json:"select"`
Format dbio.FileType `json:"format"`
IncrementalKey string `json:"incremental_key"`
IncrementalVal any `json:"incremental_val"`
IncrementalValStr string `json:"incremental_val_str"`
Props map[string]string `json:"props"`
Limit int `json:"limit"`
Select []string `json:"select"`
Format dbio.FileType `json:"format"`
IncrementalKey string `json:"incremental_key"`
IncrementalValue string `json:"incremental_value"`
Props map[string]string `json:"props"`
}

type KeyValue struct {
Expand Down Expand Up @@ -1447,7 +1446,7 @@ func (ds *Datastream) ConsumeParquetReaderDuckDb(uri string, sc FileStreamConfig
return g.Error(err, "could not create ParquetDuckDb")
}

sql := p.MakeSelectQuery(sc.Select, cast.ToUint64(sc.Limit), sc.IncrementalKey, sc.IncrementalValStr)
sql := p.MakeSelectQuery(sc.Select, cast.ToUint64(sc.Limit), sc.IncrementalKey, sc.IncrementalValue)
ds, err = p.Duck.Stream(sql, g.M("datastream", ds))
if err != nil {
return g.Error(err, "could not read parquet rows")
Expand All @@ -1467,7 +1466,7 @@ func (ds *Datastream) ConsumeIcebergReader(uri string, sc FileStreamConfig) (err
return g.Error(err, "could not create IcebergDuckDb")
}

sql := i.MakeSelectQuery(sc.Select, cast.ToUint64(sc.Limit), sc.IncrementalKey, sc.IncrementalValStr)
sql := i.MakeSelectQuery(sc.Select, cast.ToUint64(sc.Limit), sc.IncrementalKey, sc.IncrementalValue)
ds, err = i.Duck.Stream(sql, g.M("datastream", ds))
if err != nil {
return g.Error(err, "could not read iceberg rows")
Expand All @@ -1487,7 +1486,7 @@ func (ds *Datastream) ConsumeDeltaReader(uri string, sc FileStreamConfig) (err e
return g.Error(err, "could not create DeltaReader")
}

sql := d.MakeSelectQuery(sc.Select, cast.ToUint64(sc.Limit), sc.IncrementalKey, sc.IncrementalValStr)
sql := d.MakeSelectQuery(sc.Select, cast.ToUint64(sc.Limit), sc.IncrementalKey, sc.IncrementalValue)
ds, err = d.Duck.Stream(sql, g.M("datastream", ds))
if err != nil {
return g.Error(err, "could not read delta rows")
Expand Down
4 changes: 2 additions & 2 deletions core/dbio/iop/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (d *DeltaReader) Columns() (Columns, error) {
}

var err error
d.columns, err = d.Duck.Describe(d.MakeSelectQuery(nil, 0, "", nil))
d.columns, err = d.Duck.Describe(d.MakeSelectQuery(nil, 0, "", ""))
if err != nil {
return nil, g.Error(err, "could not get columns")
}
Expand All @@ -43,6 +43,6 @@ func (d *DeltaReader) Close() error {
return d.Duck.Close()
}

func (d *DeltaReader) MakeSelectQuery(fields []string, limit uint64, incrementalKey string, incrementalValue any) string {
func (d *DeltaReader) MakeSelectQuery(fields []string, limit uint64, incrementalKey, incrementalValue string) string {
return d.Duck.MakeScanSelectQuery("delta_scan", d.URI, fields, incrementalKey, incrementalValue, limit)
}
4 changes: 2 additions & 2 deletions core/dbio/iop/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func testDeltaReader(t *testing.T, d *DeltaReader) {
// Test MakeSelectQuery method with all fields
allFields := []string{"*"}
limit := uint64(0) // No limit
query := d.MakeSelectQuery(allFields, limit, "", nil)
query := d.MakeSelectQuery(allFields, limit, "", "")
expectedQuery := g.F("select * from delta_scan('%s')", d.URI)
assert.Equal(t, expectedQuery, query, "Generated query should match expected query for all fields")

Expand Down Expand Up @@ -111,7 +111,7 @@ func testDeltaReader(t *testing.T, d *DeltaReader) {
// Test MakeSelectQuery method
fields := []string{"first_name", "last_name", "country"}
limit := uint64(10)
query := d.MakeSelectQuery(fields, limit, "", nil)
query := d.MakeSelectQuery(fields, limit, "", "")
expectedQuery := g.F("select \"first_name\",\"last_name\",\"country\" from delta_scan('%s') limit 10", d.URI)
assert.Equal(t, expectedQuery, query, "Generated query should match expected query")

Expand Down
6 changes: 3 additions & 3 deletions core/dbio/iop/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,16 +798,16 @@ func (duck *DuckDb) Describe(query string) (columns Columns, err error) {
return
}

func (duck *DuckDb) MakeScanSelectQuery(scanFunc, uri string, fields []string, incrementalKey string, incrementalValue any, limit uint64) (sql string) {
func (duck *DuckDb) MakeScanSelectQuery(scanFunc, uri string, fields []string, incrementalKey, incrementalValue string, limit uint64) (sql string) {
if len(fields) == 0 || fields[0] == "*" {
fields = []string{"*"}
} else {
fields = dbio.TypeDbDuckDb.QuoteNames(fields...)
}

where := ""
if incrementalKey != "" && incrementalValue != nil {
where = fmt.Sprintf("where %s > %v", dbio.TypeDbDuckDb.Quote(incrementalKey), incrementalValue)
if incrementalKey != "" && incrementalValue != "" {
where = fmt.Sprintf("where %s > %s", dbio.TypeDbDuckDb.Quote(incrementalKey), incrementalValue)
}

templ := dbio.TypeDbDuckDb.GetTemplateValue("core." + scanFunc)
Expand Down
4 changes: 2 additions & 2 deletions core/dbio/iop/iceberg.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (i *IcebergReader) Columns() (Columns, error) {
}

var err error
i.columns, err = i.Duck.Describe(i.MakeSelectQuery(nil, 0, "", nil))
i.columns, err = i.Duck.Describe(i.MakeSelectQuery(nil, 0, "", ""))
if err != nil {
return nil, g.Error(err, "could not get columns")
}
Expand All @@ -43,6 +43,6 @@ func (i *IcebergReader) Close() error {
return i.Duck.Close()
}

func (i *IcebergReader) MakeSelectQuery(fields []string, limit uint64, incrementalKey string, incrementalValue any) string {
func (i *IcebergReader) MakeSelectQuery(fields []string, limit uint64, incrementalKey, incrementalValue string) string {
return i.Duck.MakeScanSelectQuery("iceberg_scan", i.URI, fields, incrementalKey, incrementalValue, limit)
}
4 changes: 2 additions & 2 deletions core/dbio/iop/iceberg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func testIcebergReader(t *testing.T, i *IcebergReader) {
// Test MakeSelectQuery method with all fields
allFields := []string{"*"}
limit := uint64(0) // No limit
query := i.MakeSelectQuery(allFields, limit, "", nil)
query := i.MakeSelectQuery(allFields, limit, "", "")
expectedQuery := fmt.Sprintf("select * from iceberg_scan('%s', allow_moved_paths = true)", i.URI)
assert.Equal(t, expectedQuery, query, "Generated query should match expected query for all fields")

Expand All @@ -125,7 +125,7 @@ func testIcebergReader(t *testing.T, i *IcebergReader) {
// Test MakeSelectQuery method
fields := []string{"l_orderkey", "l_quantity", "l_shipdate"}
limit := uint64(10)
query := i.MakeSelectQuery(fields, limit, "", nil)
query := i.MakeSelectQuery(fields, limit, "", "")
expectedQuery := fmt.Sprintf("select \"l_orderkey\",\"l_quantity\",\"l_shipdate\" from iceberg_scan('%s', allow_moved_paths = true) limit 10", i.URI)
assert.Equal(t, expectedQuery, query, "Generated query should match expected query")

Expand Down
4 changes: 2 additions & 2 deletions core/dbio/iop/parquet_duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (p *ParquetDuckDb) Columns() (Columns, error) {
// query := fmt.Sprintf("SELECT path_in_schema as column_name, type as column_type, column_id, num_values, total_uncompressed_size FROM parquet_metadata('%s') order by column_id", p.URI)

var err error
p.columns, err = p.Duck.Describe(p.MakeSelectQuery(nil, 0, "", nil))
p.columns, err = p.Duck.Describe(p.MakeSelectQuery(nil, 0, "", ""))
if err != nil {
return nil, g.Error(err, "could not get columns")
}
Expand All @@ -42,6 +42,6 @@ func (p *ParquetDuckDb) Close() error {
return p.Duck.Close()
}

func (p *ParquetDuckDb) MakeSelectQuery(fields []string, limit uint64, incrementalKey string, incrementalValue any) string {
func (p *ParquetDuckDb) MakeSelectQuery(fields []string, limit uint64, incrementalKey, incrementalValue string) string {
return p.Duck.MakeScanSelectQuery("parquet_scan", p.URI, fields, incrementalKey, incrementalValue, limit)
}
11 changes: 3 additions & 8 deletions core/sling/task_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (t *TaskExecution) Execute() error {

g.DebugLow("Sling version: %s (%s %s)", core.Version, runtime.GOOS, runtime.GOARCH)
g.DebugLow("type is %s", t.Type)
g.Debug("using: %s", g.Marshal(g.M("columns", t.Config.Target.Columns, "transforms", t.Config.Transforms)))
g.Debug("using: %s", g.Marshal(g.M("mode", t.Config.Mode, "columns", t.Config.Target.Columns, "transforms", t.Config.Transforms)))
g.Debug("using source options: %s", g.Marshal(t.Config.Source.Options))
g.Debug("using target options: %s", g.Marshal(t.Config.Target.Options))

Expand Down Expand Up @@ -364,14 +364,9 @@ func (t *TaskExecution) runFileToDB() (err error) {
if t.Config.Source.UpdateKey == "." {
t.Config.Source.UpdateKey = slingLoadedAtColumn
}
varMap := map[string]string{
"date_layout": "2006-01-02",
"date_layout_str": "{value}",
"timestamp_layout": "2006-01-02 15:04:05.000 -07",
"timestamp_layout_str": "{value}",
}

if err = getIncrementalValue(t.Config, tgtConn, varMap); err != nil {
template, _ := dbio.TypeDbDuckDb.Template()
if err = getIncrementalValue(t.Config, tgtConn, template.Variable); err != nil {
err = g.Error(err, "Could not get incremental value")
return err
}
Expand Down
9 changes: 4 additions & 5 deletions core/sling/task_run_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,10 @@ func (t *TaskExecution) ReadFromFile(cfg *Config) (df *iop.Dataflow, err error)
}

fsCfg := iop.FileStreamConfig{
Select: cfg.Source.Select,
Limit: cfg.Source.Limit(),
IncrementalKey: cfg.Source.UpdateKey,
IncrementalVal: cfg.IncrementalVal,
IncrementalValStr: cfg.IncrementalValStr,
Select: cfg.Source.Select,
Limit: cfg.Source.Limit(),
IncrementalKey: cfg.Source.UpdateKey,
IncrementalValue: cfg.IncrementalValStr,
}
if ffmt := cfg.Source.Options.Format; ffmt != nil {
fsCfg.Format = *ffmt
Expand Down

0 comments on commit 3ee4116

Please sign in to comment.