Skip to content

Commit

Permalink
Rename cassandra files to nosqlStores
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Jun 29, 2021
1 parent 3cc8c31 commit ff0046f
Show file tree
Hide file tree
Showing 17 changed files with 242 additions and 214 deletions.
2 changes: 0 additions & 2 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ const (
rowTypeExecutionTaskID = int64(-10)
rowTypeShardTaskID = int64(-11)
emptyInitiatedID = int64(-7)

stickyTaskListTTL = int64(24 * time.Hour / time.Second) // if sticky task_list stopped being updated, remove it in one day
)

const (
Expand Down
20 changes: 10 additions & 10 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/cassandra"
"github.com/uber/cadence/common/persistence/elasticsearch"
"github.com/uber/cadence/common/persistence/nosql"
"github.com/uber/cadence/common/persistence/serialization"
"github.com/uber/cadence/common/persistence/sql"
"github.com/uber/cadence/common/quotas"
Expand Down Expand Up @@ -72,10 +72,10 @@ type (
NewTaskStore() (p.TaskStore, error)
// NewShardStore returns a new shard store
NewShardStore() (p.ShardStore, error)
// NewHistoryV2Store returns a new historyV2 store
NewHistoryV2Store() (p.HistoryStore, error)
// NewMetadataStore returns a new metadata store
NewMetadataStore() (p.MetadataStore, error)
// NewHistoryStore returns a new history store
NewHistoryStore() (p.HistoryStore, error)
// NewDomainStore returns a new metadata store
NewDomainStore() (p.DomainStore, error)
// NewExecutionStore returns an execution store for given shardID
NewExecutionStore(shardID int) (p.ExecutionStore, error)
// NewVisibilityStore returns a new visibility store,
Expand Down Expand Up @@ -190,7 +190,7 @@ func (f *factoryImpl) NewShardManager() (p.ShardManager, error) {
// NewHistoryManager returns a new history manager
func (f *factoryImpl) NewHistoryManager() (p.HistoryManager, error) {
ds := f.datastores[storeTypeHistory]
store, err := ds.factory.NewHistoryV2Store()
store, err := ds.factory.NewHistoryStore()
if err != nil {
return nil, err
}
Expand All @@ -210,9 +210,9 @@ func (f *factoryImpl) NewHistoryManager() (p.HistoryManager, error) {
// NewDomainManager returns a new metadata manager
func (f *factoryImpl) NewDomainManager() (p.DomainManager, error) {
var err error
var store p.MetadataStore
var store p.DomainStore
ds := f.datastores[storeTypeMetadata]
store, err = ds.factory.NewMetadataStore()
store, err = ds.factory.NewDomainStore()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -388,7 +388,7 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
defaultDataStore := Datastore{ratelimit: limiters[f.config.DefaultStore]}
switch {
case defaultCfg.NoSQL != nil:
defaultDataStore.factory = cassandra.NewFactory(*defaultCfg.NoSQL, clusterName, f.logger)
defaultDataStore.factory = nosql.NewFactory(*defaultCfg.NoSQL, clusterName, f.logger)
case defaultCfg.SQL != nil:
if defaultCfg.SQL.EncodingType == "" {
defaultCfg.SQL.EncodingType = string(common.EncodingTypeThriftRW)
Expand Down Expand Up @@ -430,7 +430,7 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
visibilityDataStore := Datastore{ratelimit: limiters[f.config.VisibilityStore]}
switch {
case visibilityCfg.NoSQL != nil:
visibilityDataStore.factory = cassandra.NewFactory(*visibilityCfg.NoSQL, clusterName, f.logger)
visibilityDataStore.factory = nosql.NewFactory(*visibilityCfg.NoSQL, clusterName, f.logger)
case visibilityCfg.SQL != nil:
var decodingTypes []common.EncodingType
for _, dt := range visibilityCfg.SQL.DecodingTypes {
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/dataStoreInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ type (
GetOrphanTasks(ctx context.Context, request *GetOrphanTasksRequest) (*GetOrphanTasksResponse, error)
}

// MetadataStore is a lower level of DomainManager
MetadataStore interface {
// DomainStore is a lower level of DomainManager
DomainStore interface {
Closeable
GetName() string
CreateDomain(ctx context.Context, request *InternalCreateDomainRequest) (*CreateDomainResponse, error)
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/domainManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ import (

type (

// domainManagerImpl implements DomainManager based on MetadataStore and PayloadSerializer
// domainManagerImpl implements DomainManager based on DomainStore and PayloadSerializer
domainManagerImpl struct {
serializer PayloadSerializer
persistence MetadataStore
persistence DomainStore
logger log.Logger
}
)

var _ DomainManager = (*domainManagerImpl)(nil)

//NewDomainManagerImpl returns new DomainManager
func NewDomainManagerImpl(persistence MetadataStore, logger log.Logger) DomainManager {
func NewDomainManagerImpl(persistence DomainStore, logger log.Logger) DomainManager {
return &domainManagerImpl{
serializer: NewPayloadSerializer(),
persistence: persistence,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package cassandra
package nosql

import (
"sync"

"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
p "github.com/uber/cadence/common/persistence"

// NOTE: this package will be refactored and removed soon
cassandraOld "github.com/uber/cadence/common/persistence/cassandra"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
)
Expand Down Expand Up @@ -59,22 +62,22 @@ func NewFactory(cfg config.Cassandra, clusterName string, logger log.Logger) *Fa

// NewTaskStore returns a new task store
func (f *Factory) NewTaskStore() (p.TaskStore, error) {
return newTaskPersistence(f.cfg, f.logger)
return newNoSQLTaskStore(f.cfg, f.logger)
}

// NewShardStore returns a new shard store
func (f *Factory) NewShardStore() (p.ShardStore, error) {
return newShardPersistence(f.cfg, f.clusterName, f.logger)
return newNoSQLShardStore(f.cfg, f.clusterName, f.logger)
}

// NewHistoryV2Store returns a new history store
func (f *Factory) NewHistoryV2Store() (p.HistoryStore, error) {
return newHistoryV2Persistence(f.cfg, f.logger)
// NewHistoryStore returns a new history store
func (f *Factory) NewHistoryStore() (p.HistoryStore, error) {
return newNoSQLHistoryStore(f.cfg, f.logger)
}

// NewMetadataStore returns a metadata store that understands only v2
func (f *Factory) NewMetadataStore() (p.MetadataStore, error) {
return newMetadataPersistenceV2(f.cfg, f.clusterName, f.logger)
// NewDomainStore returns a metadata store that understands only v2
func (f *Factory) NewDomainStore() (p.DomainStore, error) {
return newNoSQLDomainStore(f.cfg, f.clusterName, f.logger)
}

// NewExecutionStore returns an ExecutionStore for a given shardID
Expand All @@ -88,12 +91,12 @@ func (f *Factory) NewExecutionStore(shardID int) (p.ExecutionStore, error) {

// NewVisibilityStore returns a visibility store
func (f *Factory) NewVisibilityStore(sortByCloseTime bool) (p.VisibilityStore, error) {
return newVisibilityPersistence(sortByCloseTime, f.cfg, f.logger)
return newNoSQLVisibilityStore(sortByCloseTime, f.cfg, f.logger)
}

// NewQueue returns a new queue backed by cassandra
func (f *Factory) NewQueue(queueType p.QueueType) (p.Queue, error) {
return newQueue(f.cfg, f.logger, queueType)
return newNoSQLQueueStore(f.cfg, f.logger, queueType)
}

// Close closes the factory
Expand Down Expand Up @@ -149,7 +152,7 @@ func (f *executionStoreFactory) close() {

// new implements ExecutionStoreFactory interface
func (f *executionStoreFactory) new(shardID int) (p.ExecutionStore, error) {
pmgr, err := NewWorkflowExecutionPersistence(shardID, f.client, f.session, f.logger)
pmgr, err := cassandraOld.NewWorkflowExecutionPersistence(shardID, f.client, f.session, f.logger)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package cassandra
package nosql

import (
"context"
Expand All @@ -35,26 +35,26 @@ import (
)

type (
nosqlDomainManager struct {
nosqlManager
nosqlDomainStore struct {
nosqlStore
currentClusterName string
}
)

// newMetadataPersistenceV2 is used to create an instance of HistoryManager implementation
func newMetadataPersistenceV2(
// newNoSQLDomainStore is used to create an instance of DomainStore implementation
func newNoSQLDomainStore(
cfg config.Cassandra,
currentClusterName string,
logger log.Logger,
) (p.MetadataStore, error) {
) (p.DomainStore, error) {
// TODO hardcoding to Cassandra for now, will switch to dynamically loading later
db, err := cassandra.NewCassandraDB(cfg, logger)
if err != nil {
return nil, err
}

return &nosqlDomainManager{
nosqlManager: nosqlManager{
return &nosqlDomainStore{
nosqlStore: nosqlStore{
db: db,
logger: logger,
},
Expand All @@ -67,7 +67,7 @@ func newMetadataPersistenceV2(
// 'Domains' table and then do a conditional insert into domains_by_name table. If the conditional write fails we
// delete the orphaned entry from domains table. There is a chance delete entry could fail and we never delete the
// orphaned entry from domains table. We might need a background job to delete those orphaned record.
func (m *nosqlDomainManager) CreateDomain(
func (m *nosqlDomainStore) CreateDomain(
ctx context.Context,
request *p.InternalCreateDomainRequest,
) (*p.CreateDomainResponse, error) {
Expand Down Expand Up @@ -100,7 +100,7 @@ func (m *nosqlDomainManager) CreateDomain(
return &p.CreateDomainResponse{ID: request.Info.ID}, nil
}

func (m *nosqlDomainManager) UpdateDomain(
func (m *nosqlDomainStore) UpdateDomain(
ctx context.Context,
request *p.InternalUpdateDomainRequest,
) error {
Expand Down Expand Up @@ -130,7 +130,7 @@ func (m *nosqlDomainManager) UpdateDomain(
return nil
}

func (m *nosqlDomainManager) GetDomain(
func (m *nosqlDomainStore) GetDomain(
ctx context.Context,
request *p.GetDomainRequest,
) (*p.InternalGetDomainResponse, error) {
Expand Down Expand Up @@ -198,7 +198,7 @@ func (m *nosqlDomainManager) GetDomain(
}, nil
}

func (m *nosqlDomainManager) ListDomains(
func (m *nosqlDomainStore) ListDomains(
ctx context.Context,
request *p.ListDomainsRequest,
) (*p.InternalListDomainsResponse, error) {
Expand Down Expand Up @@ -242,7 +242,7 @@ func (m *nosqlDomainManager) ListDomains(
}, nil
}

func (m *nosqlDomainManager) DeleteDomain(
func (m *nosqlDomainStore) DeleteDomain(
ctx context.Context,
request *p.DeleteDomainRequest,
) error {
Expand All @@ -253,7 +253,7 @@ func (m *nosqlDomainManager) DeleteDomain(
return nil
}

func (m *nosqlDomainManager) DeleteDomainByName(
func (m *nosqlDomainStore) DeleteDomainByName(
ctx context.Context,
request *p.DeleteDomainByNameRequest,
) error {
Expand All @@ -264,7 +264,7 @@ func (m *nosqlDomainManager) DeleteDomainByName(
return nil
}

func (m *nosqlDomainManager) GetMetadata(
func (m *nosqlDomainStore) GetMetadata(
ctx context.Context,
) (*p.GetMetadataResponse, error) {
notificationVersion, err := m.db.SelectDomainMetadata(ctx)
Expand All @@ -274,7 +274,7 @@ func (m *nosqlDomainManager) GetMetadata(
return &p.GetMetadataResponse{NotificationVersion: notificationVersion}, nil
}

func (m *nosqlDomainManager) toNoSQLInternalDomainConfig(
func (m *nosqlDomainStore) toNoSQLInternalDomainConfig(
domainConfig *p.InternalDomainConfig,
) (*nosqlplugin.NoSQLInternalDomainConfig, error) {
return &nosqlplugin.NoSQLInternalDomainConfig{
Expand All @@ -290,7 +290,7 @@ func (m *nosqlDomainManager) toNoSQLInternalDomainConfig(
}, nil
}

func (m *nosqlDomainManager) fromNoSQLInternalDomainConfig(
func (m *nosqlDomainStore) fromNoSQLInternalDomainConfig(
domainConfig *nosqlplugin.NoSQLInternalDomainConfig,
) (*p.InternalDomainConfig, error) {
return &p.InternalDomainConfig{
Expand Down
Loading

0 comments on commit ff0046f

Please sign in to comment.