Skip to content

Commit

Permalink
add SchemataLevel type
Browse files Browse the repository at this point in the history
  • Loading branch information
flarco committed Sep 3, 2024
1 parent a263858 commit 9048a22
Show file tree
Hide file tree
Showing 32 changed files with 484 additions and 296 deletions.
70 changes: 48 additions & 22 deletions cmd/sling/sling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func TestSuiteDatabaseMariaDB(t *testing.T) {

func TestSuiteDatabaseOracle(t *testing.T) {
t.Parallel()
testSuite(t, dbio.TypeDbOracle, "1-21") // for some reason 22-discover hangs.
testSuite(t, dbio.TypeDbOracle) // for some reason 22-discover hangs.
}

// func TestSuiteDatabaseBigTable(t *testing.T) {
Expand Down Expand Up @@ -909,9 +909,9 @@ func testDiscover(t *testing.T, cfg *sling.Config, connType dbio.Type) {
conn := connMap[connType]

opt := connection.DiscoverOptions{
Pattern: cfg.Target.Object,
ColumnLevel: cast.ToBool(cfg.Env["column_level"]),
Recursive: cast.ToBool(cfg.Env["recursive"]),
Pattern: cfg.Target.Object,
Level: database.SchemataLevel(cast.ToString(cfg.Env["level"])),
Recursive: cast.ToBool(cfg.Env["recursive"]),
}

if g.In(connType, dbio.TypeFileLocal, dbio.TypeFileSftp) && opt.Pattern == "" {
Expand Down Expand Up @@ -947,46 +947,72 @@ func testDiscover(t *testing.T, cfg *sling.Config, connType dbio.Type) {
}

if connType.IsDb() {
schemas := lo.Values(schemata.Database().Schemas)
tables := lo.Values(schemata.Tables())
columns := iop.Columns(lo.Values(schemata.Columns()))
if valRowCount > 0 {
if opt.ColumnLevel {
assert.Equal(t, valRowCount, len(columns), columns.Names())
} else {
switch opt.Level {
case d.SchemataLevelSchema:
assert.Equal(t, valRowCount, len(schemas), lo.Keys(schemata.Database().Schemas))
case d.SchemataLevelTable:
assert.Equal(t, valRowCount, len(tables), lo.Keys(schemata.Tables()))
case d.SchemataLevelColumn:
assert.Equal(t, valRowCount, len(columns), columns.Names())
}
} else {
assert.Greater(t, len(tables), 0)
switch opt.Level {
case d.SchemataLevelSchema:
assert.Greater(t, len(schemas), 0)
case d.SchemataLevelTable:
assert.Greater(t, len(tables), 0)
case d.SchemataLevelColumn:
assert.Greater(t, len(columns), 0)
}
}

if valRowCountMin > -1 {
if opt.ColumnLevel {
assert.Greater(t, len(columns), valRowCountMin)
} else {
assert.Greater(t, len(tables), valRowCountMin)
switch opt.Level {
case d.SchemataLevelSchema:
assert.Greater(t, len(schemas), valRowCountMin, lo.Keys(schemata.Database().Schemas))
case d.SchemataLevelTable:
assert.Greater(t, len(tables), valRowCountMin, lo.Keys(schemata.Tables()))
case d.SchemataLevelColumn:
assert.Greater(t, len(columns), valRowCountMin, columns.Names())
}
}

if len(containsMap) > 0 {
resultType := "tables"

if opt.ColumnLevel {
resultType = "columns"
for _, col := range columns {
var resultType string
switch opt.Level {
case d.SchemataLevelSchema:
resultType = "schemas"
for _, schema := range schemas {
for word := range containsMap {
if strings.EqualFold(word, col.Name) {
if strings.EqualFold(word, schema.Name) {
containsMap[word] = true
}
}
}
} else {

case d.SchemataLevelTable:
resultType = "tables"
for _, table := range tables {
for word := range containsMap {
if strings.EqualFold(word, table.Name) {
containsMap[word] = true
}
}
}

case d.SchemataLevelColumn:
resultType = "columns"
for _, col := range columns {
for word := range containsMap {
if strings.EqualFold(word, col.Name) {
containsMap[word] = true
}
}
}
}

for _, word := range valContains {
Expand Down Expand Up @@ -1025,15 +1051,15 @@ func testDiscover(t *testing.T, cfg *sling.Config, connType dbio.Type) {
columns := iop.Columns(lo.Values(files.Columns()))

if valRowCount > 0 {
if opt.ColumnLevel {
if opt.Level == d.SchemataLevelColumn {
assert.Equal(t, valRowCount, len(files[0].Columns), g.Marshal(files[0].Columns.Names()))
} else {
assert.Equal(t, valRowCount, len(files), g.Marshal(files.Paths()))
}
}

if valRowCountMin > -1 {
if opt.ColumnLevel {
if opt.Level == d.SchemataLevelColumn {
assert.Greater(t, len(files[0].Columns), valRowCountMin)
} else {
assert.Greater(t, len(files), valRowCountMin)
Expand All @@ -1043,7 +1069,7 @@ func testDiscover(t *testing.T, cfg *sling.Config, connType dbio.Type) {
if len(containsMap) > 0 {
resultType := "files"

if opt.ColumnLevel {
if opt.Level == d.SchemataLevelColumn {
resultType = "columns"
for _, col := range columns {
for word := range containsMap {
Expand Down
12 changes: 7 additions & 5 deletions cmd/sling/tests/suite.db.template.tsv
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
n test_name source_conn source_stream source_options target_conn target_object mode source_primary_key source_update_key target_options options env
1 csv_full_refresh file://tests/files/test1.csv "{""columns"": {""first_name"": ""string(100)"", ""update_dt"": ""timestampz""}, ""transforms"": {""update_dt"":[""set_timezone(\""America/New_York\"")""]}}" [conn] [schema].[table] full-refresh "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true, ""pre_sql"" : ""{drop_view}"", ""table_keys"": { ""unique"": [ ""id"" ], ""index"": [ ""code"" ] }}" {} "{""validation_row_count"": ""1000""}"
1 csv_full_refresh file://tests/files/test1.csv "{""columns"": {""first_name"": ""string(100)"", ""update_dt"": ""timestampz""}, ""trasforms"": {""update_dt"":[""set_timezone(\""America/New_York\"")""]}}" [conn] [schema].[table] full-refresh "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true, ""pre_sql"" : ""{drop_view}"", ""table_keys"": { ""unique"": [ ""id"" ], ""index"": [ ""code"" ] }}" {} "{""validation_row_count"": ""1000""}"
2 csv_full_refresh_delimiter file://tests/files/test5.csv {} [conn] [schema].[table]_2 full-refresh "{""validation_row_count"": "">0""}"
3 discover_table [conn] [schema].[table] discover "{""validation_contains"": ""create_dt"", ""validation_row_count"": ""11"", ""column_level"": true}"
3 discover_table [conn] [schema].[table] discover "{""validation_contains"": ""create_dt"", ""validation_row_count"": ""11"", ""level"": ""column""}"
4 csv_incremental file://tests/files/test1.upsert.csv "{""columns"": {""first_name"": ""string(100)""}}" [conn] [schema].[table] incremental id create_dt "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true, ""post_sql"" : ""{drop_view}"", ""pre_sql"" : ""drop table [schema].[table]_2"", ""table_keys"": { ""primary"": [ ""id"" ] }}" {} "{""validation_file"": ""file://tests/files/test1.result.csv"", ""validation_cols"": ""0,1,2,3,4,6"", ""validation_row_count"": "">0"", ""validation_stream_row_count"": 14}"
5 parquet_snapshot file://tests/files/test1.parquet {} [conn] [schema].[table]_snapshot snapshot "{""pre_sql"" : ""create view [schema].[table]_vw as select * from [schema].[table]""}" "{""validation_row_count"": "">999""}"
6 parquet_truncate file://tests/files/test1.parquet "{""columns"": {""rating"": ""float""}}" [conn] [schema].[table]_truncate truncate "{""pre_sql"": ""drop table [schema].[table]_snapshot "", ""post_sql"" : ""drop table [schema].[table]_truncate""}"
7 csv_wide_full_refresh file://tests/files/test.wide.csv "{""limit"": 90}" [conn] [schema].[table]_wide full-refresh "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": false, ""table_keys"": { ""unique"": [ ""id"" ] }}" {} "{""validation_row_count"": ""90""}"
8 discover_schema [conn] [schema].* discover "{""validation_contains"": ""[table],[table]_wide,[table]_vw""}"
9 discover_filter [conn] [schema].[table]_v* discover "{""validation_contains"": ""[table]_vw"", ""validation_row_count"": "">0""}"
8 discover_schema [conn] [schema].* discover "{""validation_contains"": ""[table],[table]_wide,[table]_vw"", ""level"": ""table""}"
9 discover_filter [conn] [schema].[table]_v* discover "{""validation_contains"": ""[table]_vw"", ""validation_row_count"": "">0"", ""level"": ""table""}"
10 table_full_refresh_into_postgres [conn] [schema].[table] {} postgres public.[table]_pg full-refresh "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true}" {} "{""validation_row_count"": "">999""}"
11 table_incremental_into_postgres [conn] [schema].[table] "{""limit"": 10000}" postgres public.[table]_pg incremental id create_dt "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true}" {} "{""validation_file"": ""file://tests/files/test1.result.csv"", ""validation_cols"": ""0,1,2,3,4,6"", ""validation_stream_row_count"": "">0""}"
12 view_full_refresh_into_postgres [conn] [schema].[table]_vw "{""limit"": 100, ""offset"": 50}" postgres public.[table]_pg_vw full-refresh "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true}" {} "{""validation_row_count"": ""100""}"
Expand All @@ -20,4 +20,6 @@ n test_name source_conn source_stream source_options target_conn target_object m
19 sql_full_refresh_from_postgres postgres select * from public.[table]_pg where 1=1 {} [conn] [schema].[table]_pg full-refresh id create_dt "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true}" {} "{""validation_file"": ""file://tests/files/test1.result.csv"", ""validation_cols"": ""0,1,2,3,4,6"", ""validation_row_count"": "">0""}"
20 sql_incremental_from_postgres postgres select * from public.[table]_pg where {incremental_where_cond} {} [conn] [schema].[table]_pg incremental id create_dt "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": false, ""post_sql"": ""{drop_view}""}" {} "{""validation_file"": ""file://tests/files/test1.result.csv"", ""validation_cols"": ""0,1,2,3,4,6"", ""validation_row_count"": "">0"", ""validation_stream_row_count"": "">0""}"
21 table_backfill_from_postgres postgres public.[table]_pg "{""range"":""2020-01-01,2021-01-01""}" [conn] [schema].[table]_pg backfill id create_dt "{""post_sql"": ""drop table [schema].[table] ; drop table [schema].[table]_pg; drop table [schema].[table]_vw_pg""}" {} "{""validation_stream_row_count"": "">0""}"
22 discover [conn] discover
22 discover_schemas [conn] discover "{""level"": ""schema"", ""validation_row_count"": "">0""}"
23 discover_tables [conn] [schema].* discover "{""level"": ""table"", ""validation_row_count"": "">0""}"
24 discover_columns [conn] [schema].* discover "{""level"": ""column"", ""validation_row_count"": "">0""}"
2 changes: 1 addition & 1 deletion cmd/sling/tests/suite.file.template.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ n test_name source_conn source_stream source_options target_conn target_object m
8 discover_folder_files [conn] [folder]/files/ discover "{""validation_contains"": ""[folder]/files/[table].csv,[folder]/files/[table].json,[folder]/files/[table].parquet"", ""validation_not_contains"": ""[folder]/files/[table].parquet/"", ""validation_row_count"": ""3""}"
9 discover_folder_files_csv_wildcard [conn] [folder]/files/*.csv discover "{""validation_contains"": ""[folder]/files/[table].csv"", ""validation_row_count"": ""1""}"
10 discover_filter [conn] [folder]/**/*[table].js* discover "{""recursive"": true, ""validation_contains"": ""[folder]/files/[table].json"", ""validation_row_count"": ""1""}"
11 discover_file [conn] [folder]/files/[table].parquet discover "{""validation_contains"": ""create_dt"", ""validation_row_count"": ""12"", ""column_level"": true}"
11 discover_file [conn] [folder]/files/[table].parquet discover "{""validation_contains"": ""create_dt"", ""validation_row_count"": ""12"", ""level"": ""column""}"
12 discover_csv_folder_files [conn] [folder]/csv/*.csv discover "{""validation_row_count"": "">5""}"
13 discover_recursive [conn] [folder] discover "{""recursive"": true, ""validation_row_count"": "">5""}"
14 csv_to_pg [conn] [folder]/files/[table].csv postgres [schema].[table] full-refresh "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true, ""table_keys"": { ""index"": [ ""id"" ] }}" "{""validation_file"": ""file://tests/files/test1.csv"", ""validation_cols"": ""0,1,2,3,4,6""}"
Expand Down
34 changes: 23 additions & 11 deletions core/dbio/connection/connection_discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"sort"
"strings"
"time"

"github.com/flarco/g"
"github.com/gobwas/glob"
Expand Down Expand Up @@ -54,9 +55,9 @@ func (c *Connection) Test() (ok bool, err error) {
}

type DiscoverOptions struct {
Pattern string `json:"pattern,omitempty"`
ColumnLevel bool `json:"column_level,omitempty"` // get column level
Recursive bool `json:"recursive,omitempty"`
Pattern string `json:"pattern,omitempty"`
Level database.SchemataLevel `json:"level,omitempty"`
Recursive bool `json:"recursive,omitempty"`
}

func (c *Connection) Discover(opt *DiscoverOptions) (ok bool, nodes filesys.FileNodes, schemata database.Schemata, err error) {
Expand Down Expand Up @@ -92,13 +93,15 @@ func (c *Connection) Discover(opt *DiscoverOptions) (ok bool, nodes filesys.File
if err != nil {
return ok, nodes, schemata, g.Error(err, "could not initiate %s", c.Name)
}
err = dbConn.Connect()
err = dbConn.Connect(10)
if err != nil {
return ok, nodes, schemata, g.Error(err, "could not connect to %s", c.Name)
}

var table database.Table
level := database.SchemataLevelSchema
if opt.Pattern != "" {
level = database.SchemataLevelTable
table, _ = database.ParseTableName(opt.Pattern, c.Type)
if strings.Contains(table.Schema, "*") || strings.Contains(table.Schema, "?") {
table.Schema = ""
Expand All @@ -107,14 +110,19 @@ func (c *Connection) Discover(opt *DiscoverOptions) (ok bool, nodes filesys.File
table.Name = ""
}
}
g.Debug("database discover inputs: %s", g.Marshal(g.M("pattern", opt.Pattern, "schema", table.Schema, "table", table.Name, "column_level", opt.ColumnLevel)))

schemata, err = dbConn.GetSchemata(table.Schema, table.Name)
if string(opt.Level) == "" {
opt.Level = level
}

g.Debug("database discover inputs: %s", g.Marshal(g.M("pattern", opt.Pattern, "schema", table.Schema, "table", table.Name, "level", opt.Level)))

schemata, err = dbConn.GetSchemata(opt.Level, table.Schema, table.Name)
if err != nil {
return ok, nodes, schemata, g.Error(err, "could not discover %s", c.Name)
}

if opt.ColumnLevel {
if opt.Level == database.SchemataLevelColumn {
g.Debug("unfiltered nodes returned: %d", len(schemata.Columns()))
if len(schemata.Columns()) <= 10 {
g.Debug(g.Marshal(lo.Keys(schemata.Columns())))
Expand All @@ -128,15 +136,19 @@ func (c *Connection) Discover(opt *DiscoverOptions) (ok bool, nodes filesys.File

// apply filter
if len(patterns) > 0 {
schemata = schemata.Filtered(opt.ColumnLevel, patterns...)
schemata = schemata.Filtered(opt.Level == database.SchemataLevelColumn, patterns...)
}

case c.Type.IsFile():
fileClient, err := c.AsFile()
if err != nil {
return ok, nodes, schemata, g.Error(err, "could not initiate %s", c.Name)
}
err = fileClient.Init(context.Background())

parent, cancel := context.WithTimeout(context.Background(), 25*time.Second)
defer cancel()

err = fileClient.Init(parent)
if err != nil {
return ok, nodes, schemata, g.Error(err, "could not connect to %s", c.Name)
}
Expand All @@ -152,7 +164,7 @@ func (c *Connection) Discover(opt *DiscoverOptions) (ok bool, nodes filesys.File
parsePattern()
}

g.Debug("file discover inputs: %s", g.Marshal(g.M("pattern", opt.Pattern, "url", url, "column_level", opt.ColumnLevel, "recursive", opt.Recursive)))
g.Debug("file discover inputs: %s", g.Marshal(g.M("pattern", opt.Pattern, "url", url, "column_level", opt.Level, "recursive", opt.Recursive)))
if opt.Recursive {
nodes, err = fileClient.ListRecursive(url)
} else {
Expand Down Expand Up @@ -182,7 +194,7 @@ func (c *Connection) Discover(opt *DiscoverOptions) (ok bool, nodes filesys.File
})

// if single file, get columns of file content
if opt.ColumnLevel {
if opt.Level == database.SchemataLevelColumn {
ctx := g.NewContext(fileClient.Context().Ctx, 5)

getColumns := func(i int) {
Expand Down
2 changes: 1 addition & 1 deletion core/dbio/database/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (da *DataAnalyzer) GetSchemata(force bool) (err error) {

for _, schema := range da.Options.SchemaNames {
g.Info("getting schemata for %s", schema)
schemata, err := da.Conn.GetSchemata(schema, "")
schemata, err := da.Conn.GetSchemata(SchemataLevelTable, schema, "")
if err != nil {
return g.Error(err, "could not get schemata")
}
Expand Down
Loading

0 comments on commit 9048a22

Please sign in to comment.