Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 30, 2024
1 parent 0d56f22 commit a584128
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 20 deletions.
5 changes: 2 additions & 3 deletions clients/databricks/dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ func (d DatabricksDialect) IsColumnAlreadyExistsErr(_ error) bool {
return false
}

func (d DatabricksDialect) IsTableDoesNotExistErr(err error) bool {
// Implement the logic to check if the error is a "table does not exist" error
return strings.Contains(err.Error(), "does not exist")
func (DatabricksDialect) IsTableDoesNotExistErr(err error) bool {
return strings.Contains(err.Error(), "[TABLE_OR_VIEW_NOT_FOUND]")
}

func (d DatabricksDialect) BuildCreateTableQuery(tableID sql.TableIdentifier, temporary bool, colSQLParts []string) string {
Expand Down
39 changes: 22 additions & 17 deletions clients/databricks/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ import (

type Store struct {
db.Store
cfg config.Config
cfg config.Config
configMap *types.DwhToTablesConfigMap
}

func describeTableQuery(tableID TableIdentifier) (string, []any) {
return fmt.Sprintf("DESCRIBE TABLE %s.%s.%s", tableID.Database(), tableID.Schema(), tableID.Table()), nil
}

func (s Store) Merge(tableData *optimization.TableData) error {
Expand All @@ -41,20 +46,19 @@ func (s Store) Dedupe(tableID sql.TableIdentifier, primaryKeys []string, include
}

func (s Store) GetTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error) {
panic("not implemented")
//tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name())
//query, args := describeTableQuery(tableID)
//return shared.GetTableCfgArgs{
// Dwh: s,
// TableID: tableID,
// ConfigMap: s.configMap,
// Query: query,
// Args: args,
// ColumnNameForName: "column_name",
// ColumnNameForDataType: "data_type",
// ColumnNameForComment: "description",
// DropDeletedColumns: tableData.TopicConfig().DropDeletedColumns,
//}.GetTableConfig()
tableID := NewTableIdentifier(tableData.TopicConfig().Database, tableData.TopicConfig().Schema, tableData.Name())
query, args := describeTableQuery(tableID)
return shared.GetTableCfgArgs{
Dwh: s,
TableID: tableID,
ConfigMap: s.configMap,
Query: query,
Args: args,
ColumnNameForName: "column_name",
ColumnNameForDataType: "data_type",
ColumnNameForComment: "description",
DropDeletedColumns: tableData.TopicConfig().DropDeletedColumns,
}.GetTableConfig()
}

func (s Store) PrepareTemporaryTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableID sql.TableIdentifier, parentTableID sql.TableIdentifier, additionalSettings types.AdditionalSettings, createTempTable bool) error {
Expand All @@ -68,7 +72,8 @@ func LoadStore(cfg config.Config) (Store, error) {
return Store{}, err
}
return Store{
Store: store,
cfg: cfg,
Store: store,
cfg: cfg,
configMap: &types.DwhToTablesConfigMap{},
}, nil
}

0 comments on commit a584128

Please sign in to comment.