Skip to content

Commit

Permalink
Updated schema tool to work with full semantic versions, i.e., MAJOR.…
Browse files Browse the repository at this point in the history
…MINOR.PATCH (#2417)

* Added UnitTestLogger which sends log messages to the testing.T.Log API.

* Updated schema tool to work with full semantic versions, i.e., MAJOR.MINOR.PATCH

* Responded to PR comments
  • Loading branch information
Jason Roselander authored Jan 28, 2022
1 parent 6ebaa80 commit b80a54f
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 173 deletions.
4 changes: 2 additions & 2 deletions tools/common/schema/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func validateSetupConfig(config *SetupConfig) error {
flag(CLIOptVersion) + " but not both must be specified")
}
if !config.DisableVersioning {
ver, err := parseValidateVersion(config.InitialVersion)
ver, err := normalizeVersionString(config.InitialVersion)
if err != nil {
return NewConfigError("invalid " + flag(CLIOptVersion) + " argument:" + err.Error())
}
Expand All @@ -96,7 +96,7 @@ func validateUpdateConfig(config *UpdateConfig) error {
return NewConfigError("missing " + flag(CLIOptSchemaDir) + " argument ")
}
if len(config.TargetVersion) > 0 {
ver, err := parseValidateVersion(config.TargetVersion)
ver, err := normalizeVersionString(config.TargetVersion)
if err != nil {
return NewConfigError("invalid " + flag(CLIOptTargetVersion) + " argument:" + err.Error())
}
Expand Down
173 changes: 97 additions & 76 deletions tools/common/schema/updatetask.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ import (
"encoding/json"
"fmt"
"os"
"regexp"
"sort"
"strings"

"github.com/blang/semver/v4"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
)
Expand Down Expand Up @@ -67,11 +70,6 @@ type (
manifest *manifest
cqlStmts []string
}

// byVersion is a comparator type
// for sorting a set of version
// strings
byVersion []string
)

const (
Expand All @@ -80,6 +78,7 @@ const (

var (
whitelistedCQLPrefixes = [4]string{"CREATE", "ALTER", "INSERT", "DROP"}
versionDirectoryRegex = regexp.MustCompile(`^v[\d.]+`)
)

// NewUpdateSchemaTask returns a new instance of UpdateTask
Expand All @@ -98,7 +97,7 @@ func (task *UpdateTask) Run() error {
task.logger.Info("UpdateSchemeTask started", tag.NewAnyTag("config", config))

if config.IsDryRun {
if err := task.setupDryrunDatabase(); err != nil {
if err := task.setupDryRunDatabase(); err != nil {
return fmt.Errorf("error creating dryrun database:%v", err.Error())
}
}
Expand Down Expand Up @@ -179,11 +178,13 @@ func (task *UpdateTask) buildChangeSet(currVer string) ([]changeSet, error) {

config := task.config

verDirs, err := readSchemaDir(config.SchemaDir, currVer, config.TargetVersion)
verDirs, err := readSchemaDir(config.SchemaDir, currVer, config.TargetVersion, task.logger)
if err != nil {
return nil, fmt.Errorf("error listing schema dir:%v", err.Error())
}

task.logger.Debug(fmt.Sprintf("Schema Dirs: %s", verDirs))

var result []changeSet

for _, vd := range verDirs {
Expand All @@ -196,8 +197,10 @@ func (task *UpdateTask) buildChangeSet(currVer string) ([]changeSet, error) {
}

if m.CurrVersion != dirToVersion(vd) {
return nil, fmt.Errorf("manifest version doesn't match with dirname, dir=%v,manifest.version=%v",
vd, m.CurrVersion)
return nil, fmt.Errorf(
"manifest version doesn't match with dirname, dir=%v,manifest.version=%v",
vd, m.CurrVersion,
)
}

stmts, e := task.parseSQLStmts(dirPath, m)
Expand Down Expand Up @@ -226,6 +229,7 @@ func (task *UpdateTask) parseSQLStmts(dir string, manifest *manifest) ([]string,

for _, file := range manifest.SchemaUpdateCqlFiles {
path := dir + "/" + file
task.logger.Info("Processing schema file: " + path)
stmts, err := ParseFile(path)
if err != nil {
return nil, fmt.Errorf("error parsing file %v, err=%v", path, err)
Expand Down Expand Up @@ -270,13 +274,13 @@ func readManifest(dirPath string) (*manifest, error) {
return nil, err
}

currVer, err := parseValidateVersion(manifest.CurrVersion)
currVer, err := normalizeVersionString(manifest.CurrVersion)
if err != nil {
return nil, fmt.Errorf("invalid CurrVersion in manifest")
}
manifest.CurrVersion = currVer

minVer, err := parseValidateVersion(manifest.MinCompatibleVersion)
minVer, err := normalizeVersionString(manifest.MinCompatibleVersion)
if err != nil {
return nil, err
}
Expand All @@ -297,88 +301,119 @@ func readManifest(dirPath string) (*manifest, error) {
return &manifest, nil
}

// readSchemaDir returns a sorted list of subdir names that hold
// the schema changes for versions in the range startVer < ver <= endVer
// when endVer is empty this method returns all subdir names that are greater than startVer
// this method has an assumption that the subdirs containing the
// schema changes will be of the form vx.x, where x.x is the version
// returns error when
// - startVer < endVer
// - endVer is empty and no subdirs have version >= startVer
// - endVer is non-empty and subdir with version == endVer is not found
func readSchemaDir(dir string, startVer string, endVer string) ([]string, error) {

subdirs, err := os.ReadDir(dir)
// sortAndFilterVersions returns a sorted list of semantic versions the fall within the range
// (startVerExcl, endVerIncl]. If endVerIncl is not specified, returns all versions > startVerExcl.
// If endVerIncl is specified, it must be present in the list of versions.
func sortAndFilterVersions(versions []string, startVerExcl string, endVerIncl string, logger log.Logger) ([]string, error) {

startVersionExclusive, err := semver.ParseTolerant(startVerExcl)
if err != nil {
return nil, err
}

hasEndVer := len(endVer) > 0

if hasEndVer && cmpVersion(startVer, endVer) > 0 {
return nil, fmt.Errorf("startVer (%v) must be less than or equal to endVer (%v)", startVer, endVer)
var endVersionInclusive *semver.Version
if len(endVerIncl) > 0 {
evi, err := semver.ParseTolerant(endVerIncl)
if err != nil {
return nil, err
}
endVersionInclusive = &evi

cmp := startVersionExclusive.Compare(*endVersionInclusive)
if cmp > 0 {
return nil, fmt.Errorf("start version '%s' must be less than end version '%s'", startVerExcl, endVerIncl)
} else if cmp == 0 {
logger.Warn(
fmt.Sprintf(
"Start version '%s' is equal to end version '%s'. Returning empty version list",
startVerExcl,
endVerIncl,
),
)
return []string{}, nil
}
}

var endFound bool
var highestVer string
var result []string
var retVersions []string

for _, dir := range subdirs {
foundEndVer := false

if !dir.IsDir() {
for _, version := range versions {
semVer, err := semver.ParseTolerant(version)
if err != nil {
logger.Warn(fmt.Sprintf("Input '%s' is not a valid semver", version))
continue
}

dirname := dir.Name()

if !versionStrRegex.MatchString(dirname) {
if startVersionExclusive.Compare(semVer) >= 0 {
continue
}

ver := dirToVersion(dirname)

if len(highestVer) == 0 {
highestVer = ver
} else if cmpVersion(ver, highestVer) > 0 {
highestVer = ver
if endVersionInclusive != nil && endVersionInclusive.Compare(semVer) < 0 {
continue
}

highcmp := 0
lowcmp := cmpVersion(ver, startVer)
if hasEndVer {
highcmp = cmpVersion(ver, endVer)
endFound = endFound || (highcmp == 0)
if endVersionInclusive != nil && endVersionInclusive.Compare(semVer) == 0 {
foundEndVer = true
}

if lowcmp <= 0 || highcmp > 0 {
continue // out of range
retVersions = append(retVersions, version)
}

if endVersionInclusive != nil && !foundEndVer {
return nil, fmt.Errorf("end version '%s' specified but not found. existing versions: %s", endVerIncl, versions)
}

sort.Slice(retVersions, func(i, j int) bool {
verI, err := semver.ParseTolerant(retVersions[i])
if err != nil {
panic(err)
}
verJ, err := semver.ParseTolerant(retVersions[j])
if err != nil {
panic(err)
}
return verI.Compare(verJ) < 0
})

result = append(result, dirname)
return retVersions, nil
}

// readSchemaDir returns a sorted list of subdir names that hold
// the schema changes for versions in the range startVer < ver <= endVer
// when endVer is empty this method returns all subdir names that are greater than startVer
func readSchemaDir(dir string, startVer string, endVer string, logger log.Logger) ([]string, error) {

subDirs, err := os.ReadDir(dir)
if err != nil {
return nil, err
}

// when endVer is specified, atleast one result MUST be found
if hasEndVer && !endFound {
return nil, fmt.Errorf("version dir not found for target version %v", endVer)
if len(subDirs) == 0 {
return nil, fmt.Errorf("directory '%s' contains no subDirs", dir)
}

// when endVer is empty and no result is found, then the highest version
// found must be equal to startVer, else return error
if !hasEndVer && len(result) == 0 {
if len(highestVer) == 0 || cmpVersion(startVer, highestVer) != 0 {
return nil, fmt.Errorf("no subdirs found with version >= %v", startVer)
dirNames := make([]string, 0, len(subDirs))
for _, d := range subDirs {
if !d.IsDir() {
logger.Warn("not a directory: " + d.Name())
continue
}
return result, nil
}

sort.Sort(byVersion(result))
if !versionDirectoryRegex.MatchString(d.Name()) {
logger.Warn("invalid directory name: " + d.Name())
continue
}

return result, nil
dirNames = append(dirNames, d.Name())
}

return sortAndFilterVersions(dirNames, startVer, endVer, logger)
}

// sets up a temporary dryrun database for
// executing the cassandra schema update
func (task *UpdateTask) setupDryrunDatabase() error {
func (task *UpdateTask) setupDryRunDatabase() error {
setupConfig := &SetupConfig{
Overwrite: true,
InitialVersion: "0.0",
Expand All @@ -390,17 +425,3 @@ func (task *UpdateTask) setupDryrunDatabase() error {
func dirToVersion(dir string) string {
return dir[1:]
}

func (v byVersion) Len() int {
return len(v)
}

func (v byVersion) Less(i, j int) bool {
v1 := dirToVersion(v[i])
v2 := dirToVersion(v[j])
return cmpVersion(v1, v2) < 0
}

func (v byVersion) Swap(i, j int) {
v[i], v[j] = v[j], v[i]
}
Loading

0 comments on commit b80a54f

Please sign in to comment.