Skip to content

Commit

Permalink
Upgrade domain_config type in cassandra schema to add async wf config (
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir authored Feb 5, 2024
1 parent ca68fbf commit 5175867
Show file tree
Hide file tree
Showing 19 changed files with 197 additions and 33 deletions.
132 changes: 128 additions & 4 deletions .gen/go/sqlblobs/sqlblobs.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmd/server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/startreedata/pinot-client-go v0.0.0-20230303070132-3b84c28a9e95 // latest release doesn't support pinot v0.12, so use master branch
github.com/stretchr/testify v1.8.3
github.com/uber-go/tally v3.3.15+incompatible // indirect
github.com/uber/cadence-idl v0.0.0-20240122233329-e98679fb0e07
github.com/uber/cadence-idl v0.0.0-20240127005514-29d89d50745f
github.com/uber/ringpop-go v0.8.5 // indirect
github.com/uber/tchannel-go v1.22.2 // indirect
github.com/urfave/cli v1.22.4
Expand Down
4 changes: 2 additions & 2 deletions cmd/server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,8 @@ github.com/uber-go/tally v3.3.12+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyu
github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9PcvAkXO4nNMwg=
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/cadence-idl v0.0.0-20211111101836-d6b70b60eb8c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20240122233329-e98679fb0e07 h1:VxvBaCqyXmHIxPHYcQrz3oBsCeiO7diPyFau6E1CgaE=
github.com/uber/cadence-idl v0.0.0-20240122233329-e98679fb0e07/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20240127005514-29d89d50745f h1:+Z2D/fV23Sl2ZBP4ADMeKH9uXNJO9sJlV9SlR3/QFzw=
github.com/uber/cadence-idl v0.0.0-20240127005514-29d89d50745f/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
Expand Down
2 changes: 2 additions & 0 deletions common/domain/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,8 @@ func (d *handlerImpl) UpdateAsyncWorkflowConfiguraton(
currentDomainConfig.Config.AsyncWorkflowConfig = *updateRequest.Configuration
}

d.logger.Debug("async workflow queue config update", tag.Dynamic("config", currentDomainConfig))

updateReq := createUpdateRequest(
currentDomainConfig.Info,
currentDomainConfig.Config,
Expand Down
1 change: 1 addition & 0 deletions common/persistence/nosql/nosql_domain_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ func (m *nosqlDomainStore) toNoSQLInternalDomainConfig(
VisibilityArchivalURI: domainConfig.VisibilityArchivalURI,
BadBinaries: domainConfig.BadBinaries,
IsolationGroups: domainConfig.IsolationGroups,
AsyncWorkflowsConfig: domainConfig.AsyncWorkflowsConfig,
}, nil
}

Expand Down
28 changes: 27 additions & 1 deletion common/persistence/nosql/nosqlplugin/cassandra/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (db *cdb) InsertDomain(
failoverEndTime = row.FailoverEndTime.UnixNano()
}
isolationGroupData, isolationGroupEncoding := getIsolationGroupFields(row)
asyncWFConfigData, asyncWFConfigEncoding := getAsyncWFConfigFields(row)

batch.Query(templateCreateDomainByNameQueryWithinBatchV2,
constDomainPartition,
Expand All @@ -88,6 +89,8 @@ func (db *cdb) InsertDomain(
string(row.Config.BadBinaries.Encoding),
isolationGroupData,
isolationGroupEncoding,
asyncWFConfigData,
asyncWFConfigEncoding,
row.ReplicationConfig.ActiveClusterName,
persistence.SerializeClusterConfigs(row.ReplicationConfig.Clusters),
row.IsGlobalDomain,
Expand Down Expand Up @@ -171,6 +174,7 @@ func (db *cdb) UpdateDomain(
}

isolationGroupData, isolationGroupEncoding := getIsolationGroupFields(row)
asyncWFConfigData, asyncWFConfigEncoding := getAsyncWFConfigFields(row)

batch.Query(templateUpdateDomainByNameQueryWithinBatchV2,
row.Info.ID,
Expand All @@ -191,6 +195,8 @@ func (db *cdb) UpdateDomain(
string(row.Config.BadBinaries.Encoding),
isolationGroupData,
isolationGroupEncoding,
asyncWFConfigData,
asyncWFConfigEncoding,
row.ReplicationConfig.ActiveClusterName,
persistence.SerializeClusterConfigs(row.ReplicationConfig.Clusters),
row.ConfigVersion,
Expand Down Expand Up @@ -264,6 +270,8 @@ func (db *cdb) SelectDomain(
var retentionDays int32
var isolationGroupData []byte
var isolationGroupEncoding string
var asyncWFConfigData []byte
var asyncWFConfigEncoding string

query = db.session.Query(templateGetDomainByNameQueryV2, constDomainPartition, domainName).WithContext(ctx)
err = query.Scan(
Expand All @@ -287,6 +295,8 @@ func (db *cdb) SelectDomain(
&replicationClusters,
&isolationGroupData,
&isolationGroupEncoding,
&asyncWFConfigData,
&asyncWFConfigEncoding,
&isGlobalDomain,
&configVersion,
&failoverVersion,
Expand All @@ -302,6 +312,7 @@ func (db *cdb) SelectDomain(
}

config.IsolationGroups = persistence.NewDataBlob(isolationGroupData, common.EncodingType(isolationGroupEncoding))
config.AsyncWorkflowsConfig = persistence.NewDataBlob(asyncWFConfigData, common.EncodingType(asyncWFConfigEncoding))
config.BadBinaries = persistence.NewDataBlob(badBinariesData, common.EncodingType(badBinariesDataEncoding))
config.Retention = common.DaysToDuration(retentionDays)
replicationConfig.Clusters = persistence.DeserializeClusterConfigs(replicationClusters)
Expand Down Expand Up @@ -350,6 +361,8 @@ func (db *cdb) SelectAllDomains(
var badBinariesDataEncoding string
var isolationGroups []byte
var isolationGroupsEncoding string
var asyncWFConfigData []byte
var asyncWFConfigEncoding string
var retentionDays int32
var failoverEndTime int64
var lastUpdateTime int64
Expand All @@ -374,6 +387,8 @@ func (db *cdb) SelectAllDomains(
&badBinariesDataEncoding,
&isolationGroups,
&isolationGroupsEncoding,
&asyncWFConfigData,
&asyncWFConfigEncoding,
&domain.ReplicationConfig.ActiveClusterName,
&replicationClusters,
&domain.IsGlobalDomain,
Expand All @@ -391,6 +406,7 @@ func (db *cdb) SelectAllDomains(
domain.ReplicationConfig.Clusters = persistence.DeserializeClusterConfigs(replicationClusters)
domain.Config.Retention = common.DaysToDuration(retentionDays)
domain.Config.IsolationGroups = persistence.NewDataBlob(isolationGroups, common.EncodingType(isolationGroupsEncoding))
domain.Config.AsyncWorkflowsConfig = persistence.NewDataBlob(asyncWFConfigData, common.EncodingType(asyncWFConfigEncoding))
domain.LastUpdatedTime = time.Unix(0, lastUpdateTime)
if failoverEndTime > emptyFailoverEndTime {
domain.FailoverEndTime = common.TimePtr(time.Unix(0, failoverEndTime))
Expand Down Expand Up @@ -441,7 +457,7 @@ func (db *cdb) DeleteDomain(
} else {
var ID string
query := db.session.Query(templateGetDomainByNameQueryV2, constDomainPartition, *domainName).WithContext(ctx)
err := query.Scan(&ID, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
err := query.Scan(&ID, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
if err != nil {
if db.client.IsNotFoundError(err) {
return nil
Expand Down Expand Up @@ -494,3 +510,13 @@ func getIsolationGroupFields(row *nosqlplugin.DomainRow) ([]byte, string) {
}
return d, e
}

func getAsyncWFConfigFields(row *nosqlplugin.DomainRow) ([]byte, string) {
var d []byte
var e string
if row != nil && row.Config != nil && row.Config.AsyncWorkflowsConfig != nil {
d = row.Config.AsyncWorkflowsConfig.GetData()
e = row.Config.AsyncWorkflowsConfig.GetEncodingString()
}
return d, e
}
7 changes: 6 additions & 1 deletion common/persistence/nosql/nosqlplugin/cassandra/domain_cql.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ const (
`bad_binaries: ?,` +
`bad_binaries_encoding: ?,` +
`isolation_groups: ?,` +
`isolation_groups_encoding: ?` +
`isolation_groups_encoding: ?,` +
`async_workflow_config: ?,` +
`async_workflow_config_encoding: ?` +
`}`

templateDomainReplicationConfigType = `{` +
Expand Down Expand Up @@ -75,6 +77,8 @@ const (
`replication_config.active_cluster_name, replication_config.clusters, ` +
`config.isolation_groups,` +
`config.isolation_groups_encoding,` +
`config.async_workflow_config,` +
`config.async_workflow_config_encoding,` +
`is_global_domain, ` +
`config_version, ` +
`failover_version, ` +
Expand Down Expand Up @@ -123,6 +127,7 @@ const (
`config.visibility_archival_status, config.visibility_archival_uri, ` +
`config.bad_binaries, config.bad_binaries_encoding, ` +
`config.isolation_groups, config.isolation_groups_encoding, ` +
`config.async_workflow_config, config.async_workflow_config_encoding, ` +
`replication_config.active_cluster_name, replication_config.clusters, ` +
`is_global_domain, ` +
`config_version, ` +
Expand Down
19 changes: 4 additions & 15 deletions common/persistence/persistence-tests/metadataPersistenceV2Test.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,16 +717,10 @@ func (m *MetadataPersistenceSuiteV2) TestUpdateDomain() {
}

asyncWFCfg1 := types.AsyncWorkflowConfiguration{
QueueType: types.AsyncWorkflowQueueTypeKafka,
KafkaConfig: &types.AsyncWorkflowKafkaQueueConfiguration{
Topic: "topic1",
},
PredefinedQueueName: "queue1",
}
asyncWFCfg2 := types.AsyncWorkflowConfiguration{
QueueType: types.AsyncWorkflowQueueTypeKafka,
KafkaConfig: &types.AsyncWorkflowKafkaQueueConfiguration{
Topic: "topic2",
},
PredefinedQueueName: "queue2",
}

id := uuid.New()
Expand Down Expand Up @@ -899,9 +893,7 @@ func (m *MetadataPersistenceSuiteV2) TestUpdateDomain() {
m.Equal(&failoverEndTime, resp4.FailoverEndTime)
m.Equal(lastUpdateTime, resp4.LastUpdatedTime)
m.Equal(isolationGroups2, resp4.Config.IsolationGroups)
// TODO(taylan): uncomment when persistent layer schema is updated
// m.Equal(asyncWFCfg2, resp4.Config.AsyncWorkflowConfig)
m.Equal(types.AsyncWorkflowConfiguration{}, resp4.Config.AsyncWorkflowConfig)
m.Equal(asyncWFCfg2, resp4.Config.AsyncWorkflowConfig)

resp5, err5 := m.GetDomain(ctx, id, "")
m.NoError(err5)
Expand Down Expand Up @@ -999,10 +991,7 @@ func (m *MetadataPersistenceSuiteV2) TestUpdateDomain() {
m.Nil(resp6.FailoverEndTime)
m.Equal(lastUpdateTime, resp6.LastUpdatedTime)
m.Equal(isolationGroups1, resp6.Config.IsolationGroups)

// TODO(taylan): uncomment when persistent layer schema is updated
// m.Equal(asyncWFCfg1, resp6.Config.AsyncWorkflowConfig)
m.Equal(types.AsyncWorkflowConfiguration{}, resp6.Config.AsyncWorkflowConfig)
m.Equal(asyncWFCfg1, resp6.Config.AsyncWorkflowConfig)
}

// TestDeleteDomain test
Expand Down
4 changes: 4 additions & 0 deletions common/persistence/serialization/thrift_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ func domainInfoToThrift(info *DomainInfo) *sqlblobs.DomainInfo {
LastUpdatedTime: timeToUnixNanoPtr(info.LastUpdatedTimestamp),
IsolationGroupsConfiguration: info.IsolationGroups,
IsolationGroupsConfigurationEncoding: &info.IsolationGroupsEncoding,
AsyncWorkflowConfiguration: info.AsyncWorkflowConfig,
AsyncWorkflowConfigurationEncoding: &info.AsyncWorkflowConfigEncoding,
}
}

Expand Down Expand Up @@ -162,6 +164,8 @@ func domainInfoFromThrift(info *sqlblobs.DomainInfo) *DomainInfo {
LastUpdatedTimestamp: timeFromUnixNano(info.GetLastUpdatedTime()),
IsolationGroups: info.GetIsolationGroupsConfiguration(),
IsolationGroupsEncoding: info.GetIsolationGroupsConfigurationEncoding(),
AsyncWorkflowConfig: info.AsyncWorkflowConfiguration,
AsyncWorkflowConfigEncoding: info.GetAsyncWorkflowConfigurationEncoding(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ require (
github.com/startreedata/pinot-client-go v0.0.0-20230303070132-3b84c28a9e95 // latest release doesn't support pinot v0.12, so use master branch
github.com/stretchr/testify v1.8.3
github.com/uber-go/tally v3.3.15+incompatible
github.com/uber/cadence-idl v0.0.0-20240122233329-e98679fb0e07
github.com/uber/cadence-idl v0.0.0-20240127005514-29d89d50745f
github.com/uber/ringpop-go v0.8.5
github.com/uber/tchannel-go v1.22.2
github.com/urfave/cli v1.22.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,8 @@ github.com/uber-go/tally v3.3.12+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyu
github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9PcvAkXO4nNMwg=
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/cadence-idl v0.0.0-20211111101836-d6b70b60eb8c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20240122233329-e98679fb0e07 h1:VxvBaCqyXmHIxPHYcQrz3oBsCeiO7diPyFau6E1CgaE=
github.com/uber/cadence-idl v0.0.0-20240122233329-e98679fb0e07/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20240127005514-29d89d50745f h1:+Z2D/fV23Sl2ZBP4ADMeKH9uXNJO9sJlV9SlR3/QFzw=
github.com/uber/cadence-idl v0.0.0-20240127005514-29d89d50745f/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
Expand Down
2 changes: 1 addition & 1 deletion idls
Submodule idls updated from e98679 to 29d89d
2 changes: 2 additions & 0 deletions schema/cassandra/cadence/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ CREATE TYPE domain_config (
bad_binaries_encoding blob,
isolation_groups blob,
isolation_groups_encoding text,
async_workflow_config blob,
async_workflow_config_encoding text,
);

CREATE TYPE cluster_replication_config (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TYPE domain_config ADD async_workflow_config blob;
ALTER TYPE domain_config ADD async_workflow_config_encoding text;
8 changes: 8 additions & 0 deletions schema/cassandra/cadence/versioned/v0.37/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"CurrVersion": "0.37",
"MinCompatibleVersion": "0.37",
"Description": "Adding the domain configuration for async workflow config",
"SchemaUpdateCqlFiles": [
"async_workflow_config.cql"
]
}
2 changes: 1 addition & 1 deletion schema/cassandra/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package cassandra
// NOTE: whenever there is a new data base schema update, plz update the following versions

// Version is the Cassandra database release version
const Version = "0.36"
const Version = "0.37"

// VisibilityVersion is the Cassandra visibility database release version
const VisibilityVersion = "0.9"
5 changes: 3 additions & 2 deletions scripts/buildkite/golint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ set -ex

make tidy
make go-generate
make fmt
make lint
make fmt
make lint
make copyright

# intentionally capture stderr, so status-errors are also PR-failing.
Expand All @@ -15,5 +15,6 @@ if [ -n "$(git status --porcelain 2>&1)" ]; then
echo "There are changes after make go-generate && make fmt && make lint && make copyright"
echo "Please rerun the command and commit the changes"
git status --porcelain
git --no-pager diff
exit 1
fi
2 changes: 1 addition & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func (e *historyEngineImpl) registerDomainFailoverCallback() {
previousFailoverVersion := nextDomain.GetPreviousFailoverVersion()
previousClusterName, err := e.clusterMetadata.ClusterNameForFailoverVersion(previousFailoverVersion)
if err != nil {
e.logger.Error("Failed to handle graceful failover", tag.WorkflowDomainID(nextDomain.GetInfo().ID))
e.logger.Error("Failed to handle graceful failover", tag.WorkflowDomainID(nextDomain.GetInfo().ID), tag.Error(err))
continue
}

Expand Down
2 changes: 1 addition & 1 deletion tools/common/schema/updatetask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *UpdateTaskTestSuite) TestReadSchemaDirFromEmbeddings() {
s.NoError(err)
ans, err := readSchemaDir(fsys, "0.30", "")
s.NoError(err)
s.Equal([]string{"v0.31", "v0.32", "v0.33", "v0.34", "v0.35", "v0.36"}, ans)
s.Equal([]string{"v0.31", "v0.32", "v0.33", "v0.34", "v0.35", "v0.36", "v0.37"}, ans)

fsys, err = fs.Sub(cassandra.SchemaFS, "visibility/versioned")
s.NoError(err)
Expand Down

0 comments on commit 5175867

Please sign in to comment.