Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated schema tool to work with full semantic versions, i.e., MAJOR.MINOR.PATCH #2417

Merged
merged 6 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tools/common/schema/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func validateSetupConfig(config *SetupConfig) error {
flag(CLIOptVersion) + " but not both must be specified")
}
if !config.DisableVersioning {
ver, err := parseValidateVersion(config.InitialVersion)
ver, err := normalizeVersionString(config.InitialVersion)
if err != nil {
return NewConfigError("invalid " + flag(CLIOptVersion) + " argument:" + err.Error())
}
Expand All @@ -96,7 +96,7 @@ func validateUpdateConfig(config *UpdateConfig) error {
return NewConfigError("missing " + flag(CLIOptSchemaDir) + " argument ")
}
if len(config.TargetVersion) > 0 {
ver, err := parseValidateVersion(config.TargetVersion)
ver, err := normalizeVersionString(config.TargetVersion)
if err != nil {
return NewConfigError("invalid " + flag(CLIOptTargetVersion) + " argument:" + err.Error())
}
Expand Down
173 changes: 97 additions & 76 deletions tools/common/schema/updatetask.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ import (
"encoding/json"
"fmt"
"os"
"regexp"
"sort"
"strings"

"github.com/blang/semver/v4"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
)
Expand Down Expand Up @@ -67,11 +70,6 @@ type (
manifest *manifest
cqlStmts []string
}

// byVersion is a comparator type
// for sorting a set of version
// strings
byVersion []string
)

const (
Expand All @@ -80,6 +78,7 @@ const (

var (
whitelistedCQLPrefixes = [4]string{"CREATE", "ALTER", "INSERT", "DROP"}
versionDirectoryRegex = regexp.MustCompile(`^v[\d.]+`)
)

// NewUpdateSchemaTask returns a new instance of UpdateTask
Expand All @@ -98,7 +97,7 @@ func (task *UpdateTask) Run() error {
task.logger.Info("UpdateSchemeTask started", tag.NewAnyTag("config", config))

if config.IsDryRun {
if err := task.setupDryrunDatabase(); err != nil {
if err := task.setupDryRunDatabase(); err != nil {
return fmt.Errorf("error creating dryrun database:%v", err.Error())
}
}
Expand Down Expand Up @@ -179,11 +178,13 @@ func (task *UpdateTask) buildChangeSet(currVer string) ([]changeSet, error) {

config := task.config

verDirs, err := readSchemaDir(config.SchemaDir, currVer, config.TargetVersion)
verDirs, err := readSchemaDir(config.SchemaDir, currVer, config.TargetVersion, task.logger)
if err != nil {
return nil, fmt.Errorf("error listing schema dir:%v", err.Error())
}

task.logger.Debug(fmt.Sprintf("Schema Dirs: %s", verDirs))

var result []changeSet

for _, vd := range verDirs {
Expand All @@ -196,8 +197,10 @@ func (task *UpdateTask) buildChangeSet(currVer string) ([]changeSet, error) {
}

if m.CurrVersion != dirToVersion(vd) {
return nil, fmt.Errorf("manifest version doesn't match with dirname, dir=%v,manifest.version=%v",
vd, m.CurrVersion)
return nil, fmt.Errorf(
"manifest version doesn't match with dirname, dir=%v,manifest.version=%v",
vd, m.CurrVersion,
)
}

stmts, e := task.parseSQLStmts(dirPath, m)
Expand Down Expand Up @@ -226,6 +229,7 @@ func (task *UpdateTask) parseSQLStmts(dir string, manifest *manifest) ([]string,

for _, file := range manifest.SchemaUpdateCqlFiles {
path := dir + "/" + file
task.logger.Info("Processing schema file: " + path)
stmts, err := ParseFile(path)
if err != nil {
return nil, fmt.Errorf("error parsing file %v, err=%v", path, err)
Expand Down Expand Up @@ -270,13 +274,13 @@ func readManifest(dirPath string) (*manifest, error) {
return nil, err
}

currVer, err := parseValidateVersion(manifest.CurrVersion)
currVer, err := normalizeVersionString(manifest.CurrVersion)
if err != nil {
return nil, fmt.Errorf("invalid CurrVersion in manifest")
}
manifest.CurrVersion = currVer

minVer, err := parseValidateVersion(manifest.MinCompatibleVersion)
minVer, err := normalizeVersionString(manifest.MinCompatibleVersion)
if err != nil {
return nil, err
}
Expand All @@ -297,88 +301,119 @@ func readManifest(dirPath string) (*manifest, error) {
return &manifest, nil
}

// readSchemaDir returns a sorted list of subdir names that hold
// the schema changes for versions in the range startVer < ver <= endVer
// when endVer is empty this method returns all subdir names that are greater than startVer
// this method has an assumption that the subdirs containing the
// schema changes will be of the form vx.x, where x.x is the version
// returns error when
// - startVer < endVer
// - endVer is empty and no subdirs have version >= startVer
// - endVer is non-empty and subdir with version == endVer is not found
func readSchemaDir(dir string, startVer string, endVer string) ([]string, error) {

subdirs, err := os.ReadDir(dir)
// sortAndFilterVersions returns a sorted list of semantic versions the fall within the range
// (startVerExcl, endVerIncl]. If endVerIncl is not specified, returns all versions > startVerExcl.
// If endVerIncl is specified, it must be present in the list of versions.
func sortAndFilterVersions(versions []string, startVerExcl string, endVerIncl string, logger log.Logger) ([]string, error) {

startVersionExclusive, err := semver.ParseTolerant(startVerExcl)
if err != nil {
return nil, err
}

hasEndVer := len(endVer) > 0

if hasEndVer && cmpVersion(startVer, endVer) > 0 {
return nil, fmt.Errorf("startVer (%v) must be less than or equal to endVer (%v)", startVer, endVer)
var endVersionInclusive *semver.Version
if len(endVerIncl) > 0 {
evi, err := semver.ParseTolerant(endVerIncl)
if err != nil {
return nil, err
}
endVersionInclusive = &evi

cmp := startVersionExclusive.Compare(*endVersionInclusive)
if cmp > 0 {
return nil, fmt.Errorf("start version '%s' must be less than end version '%s'", startVerExcl, endVerIncl)
} else if cmp == 0 {
logger.Warn(
fmt.Sprintf(
"Start version '%s' is equal to end version '%s'. Returning empty version list",
startVerExcl,
endVerIncl,
),
)
return []string{}, nil
}
}

var endFound bool
var highestVer string
var result []string
var retVersions []string

for _, dir := range subdirs {
foundEndVer := false

if !dir.IsDir() {
for _, version := range versions {
semVer, err := semver.ParseTolerant(version)
if err != nil {
logger.Warn(fmt.Sprintf("Input '%s' is not a valid semver", version))
continue
}

dirname := dir.Name()

if !versionStrRegex.MatchString(dirname) {
if startVersionExclusive.Compare(semVer) >= 0 {
continue
}

ver := dirToVersion(dirname)

if len(highestVer) == 0 {
highestVer = ver
} else if cmpVersion(ver, highestVer) > 0 {
highestVer = ver
if endVersionInclusive != nil && endVersionInclusive.Compare(semVer) < 0 {
continue
}

highcmp := 0
lowcmp := cmpVersion(ver, startVer)
if hasEndVer {
highcmp = cmpVersion(ver, endVer)
endFound = endFound || (highcmp == 0)
if endVersionInclusive != nil && endVersionInclusive.Compare(semVer) == 0 {
foundEndVer = true
}

if lowcmp <= 0 || highcmp > 0 {
continue // out of range
retVersions = append(retVersions, version)
}

if endVersionInclusive != nil && !foundEndVer {
return nil, fmt.Errorf("end version '%s' specified but not found. existing versions: %s", endVerIncl, versions)
}

sort.Slice(retVersions, func(i, j int) bool {
verI, err := semver.ParseTolerant(retVersions[i])
if err != nil {
panic(err)
}
verJ, err := semver.ParseTolerant(retVersions[j])
if err != nil {
panic(err)
}
return verI.Compare(verJ) < 0
})

result = append(result, dirname)
return retVersions, nil
}

// readSchemaDir returns a sorted list of subdir names that hold
// the schema changes for versions in the range startVer < ver <= endVer
// when endVer is empty this method returns all subdir names that are greater than startVer
func readSchemaDir(dir string, startVer string, endVer string, logger log.Logger) ([]string, error) {

subDirs, err := os.ReadDir(dir)
if err != nil {
return nil, err
}

// when endVer is specified, atleast one result MUST be found
if hasEndVer && !endFound {
return nil, fmt.Errorf("version dir not found for target version %v", endVer)
if len(subDirs) == 0 {
return nil, fmt.Errorf("directory '%s' contains no subDirs", dir)
}

// when endVer is empty and no result is found, then the highest version
// found must be equal to startVer, else return error
if !hasEndVer && len(result) == 0 {
if len(highestVer) == 0 || cmpVersion(startVer, highestVer) != 0 {
return nil, fmt.Errorf("no subdirs found with version >= %v", startVer)
dirNames := make([]string, 0, len(subDirs))
for _, d := range subDirs {
if !d.IsDir() {
logger.Warn("not a directory: " + d.Name())
continue
}
return result, nil
}

sort.Sort(byVersion(result))
if !versionDirectoryRegex.MatchString(d.Name()) {
logger.Warn("invalid directory name: " + d.Name())
continue
}

return result, nil
dirNames = append(dirNames, d.Name())
}

return sortAndFilterVersions(dirNames, startVer, endVer, logger)
}

// sets up a temporary dryrun database for
// executing the cassandra schema update
func (task *UpdateTask) setupDryrunDatabase() error {
func (task *UpdateTask) setupDryRunDatabase() error {
setupConfig := &SetupConfig{
Overwrite: true,
InitialVersion: "0.0",
Expand All @@ -390,17 +425,3 @@ func (task *UpdateTask) setupDryrunDatabase() error {
func dirToVersion(dir string) string {
return dir[1:]
}

func (v byVersion) Len() int {
return len(v)
}

func (v byVersion) Less(i, j int) bool {
v1 := dirToVersion(v[i])
v2 := dirToVersion(v[j])
return cmpVersion(v1, v2) < 0
}

func (v byVersion) Swap(i, j int) {
v[i], v[j] = v[j], v[i]
}
Loading