diff --git a/.vscode/launch.json b/.vscode/launch.json index 708a474d64d..78cae732e3a 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -51,6 +51,19 @@ "start", ] }, + { + "name": "Debug Server with SQLite", + "type": "go", + "request": "launch", + "mode": "debug", + "program": "${workspaceFolder}/cmd/server", + "cwd": "${workspaceFolder}", + "args": [ + "--env", + "development_sqlite", + "start", + ] + }, { "name": "Debug CLI Namespace Describe", "type": "go", diff --git a/common/backoff/retry_test.go b/common/backoff/retry_test.go index 340332eef77..ac2f64d80cd 100644 --- a/common/backoff/retry_test.go +++ b/common/backoff/retry_test.go @@ -123,6 +123,7 @@ func (s *RetrySuite) TestIsRetryableSuccess() { func (s *RetrySuite) TestIsRetryableFailure() { i := 0 + theErr := someError{} op := func() error { i++ @@ -130,14 +131,14 @@ func (s *RetrySuite) TestIsRetryableFailure() { return nil } - return &someError{} + return &theErr } policy := NewExponentialRetryPolicy(1 * time.Millisecond) policy.SetMaximumInterval(5 * time.Millisecond) policy.SetMaximumAttempts(10) - err := Retry(op, policy, IgnoreErrors([]error{&someError{}})) + err := Retry(op, policy, IgnoreErrors([]error{&theErr})) s.Error(err) s.Equal(1, i) } diff --git a/common/persistence/persistence-tests/setup.go b/common/persistence/persistence-tests/setup.go index 02c479bc5ef..e085ea66b03 100644 --- a/common/persistence/persistence-tests/setup.go +++ b/common/persistence/persistence-tests/setup.go @@ -44,7 +44,8 @@ const ( testSQLiteUser = "" testSQLitePassword = "" testSQLiteMode = "memory" - testSQLiteSchemaDir = "" // specify if mode is not "memory" + testSQLiteCache = "private" + testSQLiteSchemaDir = "schema/sqlite/v3" // specify if mode is not "memory" ) // GetMySQLTestClusterOption return test options @@ -81,8 +82,8 @@ func GetSQLiteTestClusterOption() *TestBaseOptions { DBPassword: testSQLitePassword, DBHost: environment.Localhost, DBPort: 0, - SchemaDir: testSQLiteSchemaDir, + SchemaDir: "", StoreType: config.StoreTypeSQL, - ConnectAttributes: map[string]string{"mode": testSQLiteMode}, + ConnectAttributes: map[string]string{"mode": testSQLiteMode, "cache": testSQLiteCache}, } } diff --git a/common/persistence/persistence-tests/sqlite_test.go b/common/persistence/persistence-tests/sqlite_test.go new file mode 100644 index 00000000000..b40871039a9 --- /dev/null +++ b/common/persistence/persistence-tests/sqlite_test.go @@ -0,0 +1,80 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package persistencetests + +import ( + "testing" + + "github.com/stretchr/testify/suite" +) + +func TestSQLiteHistoryV2PersistenceSuite(t *testing.T) { + s := new(HistoryV2PersistenceSuite) + s.TestBase = NewTestBaseWithSQL(GetSQLiteTestClusterOption()) + s.TestBase.Setup(nil) + suite.Run(t, s) +} + +func TestSQLiteMetadataPersistenceSuiteV2(t *testing.T) { + s := new(MetadataPersistenceSuiteV2) + s.TestBase = NewTestBaseWithSQL(GetSQLiteTestClusterOption()) + s.TestBase.Setup(nil) + suite.Run(t, s) +} + +func TestSQLiteShardPersistenceSuite(t *testing.T) { + s := new(ShardPersistenceSuite) + s.TestBase = NewTestBaseWithSQL(GetSQLiteTestClusterOption()) + s.TestBase.Setup(nil) + suite.Run(t, s) +} + +func TestSQLiteExecutionManagerSuite(t *testing.T) { + s := new(ExecutionManagerSuite) + s.TestBase = NewTestBaseWithSQL(GetSQLiteTestClusterOption()) + s.TestBase.Setup(nil) + suite.Run(t, s) +} + +func TestSQLiteExecutionManagerWithEventsV2(t *testing.T) { + s := new(ExecutionManagerSuiteForEventsV2) + s.TestBase = NewTestBaseWithSQL(GetSQLiteTestClusterOption()) + s.TestBase.Setup(nil) + suite.Run(t, s) +} + +func TestSQLiteClusterMetadataPersistence(t *testing.T) { + s := new(ClusterMetadataManagerSuite) + s.TestBase = NewTestBaseWithSQL(GetSQLiteTestClusterOption()) + s.TestBase.Setup(nil) + suite.Run(t, s) +} + +func TestSQLiteQueuePersistence(t *testing.T) { + s := new(QueuePersistenceSuite) + s.TestBase = NewTestBaseWithSQL(GetSQLiteTestClusterOption()) + s.TestBase.Setup(nil) + suite.Run(t, s) +} diff --git a/common/persistence/sql/cluster_metadata.go b/common/persistence/sql/cluster_metadata.go index 95a0c29be1c..1b669b5f24c 100644 --- a/common/persistence/sql/cluster_metadata.go +++ b/common/persistence/sql/cluster_metadata.go @@ -253,9 +253,11 @@ func (s *sqlClusterMetadataManager) PruneClusterMembership( ) error { ctx, cancel := newExecutionContext() defer cancel() - _, err := s.Db.PruneClusterMembership(ctx, &sqlplugin.PruneClusterMembershipFilter{ - PruneRecordsBefore: time.Now().UTC(), - MaxRecordsAffected: request.MaxRecordsPruned}) + _, err := s.Db.PruneClusterMembership( + ctx, + &sqlplugin.PruneClusterMembershipFilter{ + PruneRecordsBefore: time.Now().UTC(), + }) if err != nil { return convertCommonErrors("PruneClusterMembership", err) diff --git a/common/persistence/sql/sqlplugin/cluster_metadata.go b/common/persistence/sql/sqlplugin/cluster_metadata.go index d490b9b2e41..ce1b63f1bf9 100644 --- a/common/persistence/sql/sqlplugin/cluster_metadata.go +++ b/common/persistence/sql/sqlplugin/cluster_metadata.go @@ -73,7 +73,6 @@ type ( // PruneClusterMembershipFilter is used for PruneClusterMembership queries PruneClusterMembershipFilter struct { PruneRecordsBefore time.Time - MaxRecordsAffected int } // ClusterMetadata is the SQL persistence interface for cluster metadata diff --git a/common/persistence/sql/sqlplugin/mysql/cluster_metadata.go b/common/persistence/sql/sqlplugin/mysql/cluster_metadata.go index 7926733a582..d633c0209e9 100644 --- a/common/persistence/sql/sqlplugin/mysql/cluster_metadata.go +++ b/common/persistence/sql/sqlplugin/mysql/cluster_metadata.go @@ -59,7 +59,7 @@ session_start=VALUES(session_start), last_heartbeat=VALUES(last_heartbeat), reco templatePruneStaleClusterMembership = `DELETE FROM cluster_membership -WHERE membership_partition = ? AND record_expiry < ? LIMIT ?` +WHERE membership_partition = ? AND record_expiry < ?` templateGetClusterMembership = `SELECT host_id, rpc_address, rpc_port, role, session_start, last_heartbeat, record_expiry FROM cluster_membership WHERE membership_partition = ?` @@ -262,6 +262,5 @@ func (mdb *db) PruneClusterMembership( templatePruneStaleClusterMembership, constMembershipPartition, mdb.converter.ToMySQLDateTime(filter.PruneRecordsBefore), - filter.MaxRecordsAffected, ) } diff --git a/common/persistence/sql/sqlplugin/postgresql/cluster_metadata.go b/common/persistence/sql/sqlplugin/postgresql/cluster_metadata.go index ccb94b2e569..a0cc849cfe8 100644 --- a/common/persistence/sql/sqlplugin/postgresql/cluster_metadata.go +++ b/common/persistence/sql/sqlplugin/postgresql/cluster_metadata.go @@ -63,7 +63,7 @@ membership_partition = $1, host_id = $2, rpc_address = $3, rpc_port = $4, role = templatePruneStaleClusterMembership = `DELETE FROM cluster_membership WHERE host_id = ANY(ARRAY( -SELECT host_id FROM cluster_membership WHERE membership_partition = $1 AND record_expiry < $2 LIMIT $3))` +SELECT host_id FROM cluster_membership WHERE membership_partition = $1 AND record_expiry < $2))` templateGetClusterMembership = `SELECT host_id, rpc_address, rpc_port, role, session_start, last_heartbeat, record_expiry FROM cluster_membership WHERE membership_partition = $` @@ -281,6 +281,5 @@ func (pdb *db) PruneClusterMembership( templatePruneStaleClusterMembership, constMembershipPartition, filter.PruneRecordsBefore, - filter.MaxRecordsAffected, ) } diff --git a/common/persistence/sql/sqlplugin/sqlite/cluster_metadata.go b/common/persistence/sql/sqlplugin/sqlite/cluster_metadata.go index 3deccfb609d..b2dd9110132 100644 --- a/common/persistence/sql/sqlplugin/sqlite/cluster_metadata.go +++ b/common/persistence/sql/sqlplugin/sqlite/cluster_metadata.go @@ -61,7 +61,7 @@ VALUES(?, ?, ?, ?, ?, ?, ?, ?) ` templatePruneStaleClusterMembership = `DELETE FROM cluster_membership -WHERE membership_partition = ? AND record_expiry < ? LIMIT ?` +WHERE membership_partition = ? AND record_expiry < ?` templateGetClusterMembership = `SELECT host_id, rpc_address, rpc_port, role, session_start, last_heartbeat, record_expiry FROM cluster_membership WHERE membership_partition = ?` @@ -264,6 +264,5 @@ func (mdb *db) PruneClusterMembership( templatePruneStaleClusterMembership, constMembershipPartition, mdb.converter.ToSQLiteDateTime(filter.PruneRecordsBefore), - filter.MaxRecordsAffected, ) } diff --git a/common/persistence/tests/sqlite_test.go b/common/persistence/tests/sqlite_test.go new file mode 100644 index 00000000000..825a8580cde --- /dev/null +++ b/common/persistence/tests/sqlite_test.go @@ -0,0 +1,206 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package tests + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/suite" + + "go.temporal.io/server/common/config" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/persistence/serialization" + "go.temporal.io/server/common/persistence/sql" + "go.temporal.io/server/common/persistence/sql/sqlplugin" + _ "go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite" + "go.temporal.io/server/common/resolver" + "go.temporal.io/server/environment" +) + +// TODO merge the initialization with existing persistence setup +const ( + testSQLiteClusterName = "temporal_sqlite_cluster" +) + +func TestSQLiteExecutionMutableStateStoreSuite(t *testing.T) { + cfg := NewSQLiteConfig() + SetupSQLiteDatabase(cfg) + logger := log.NewNoopLogger() + factory := sql.NewFactory( + *cfg, + resolver.NewNoopResolver(), + testSQLiteClusterName, + logger, + ) + shardStore, err := factory.NewShardStore() + if err != nil { + t.Fatalf("unable to create SQLite DB: %v", err) + } + executionStore, err := factory.NewExecutionStore() + if err != nil { + t.Fatalf("unable to create SQLite DB: %v", err) + } + defer func() { + factory.Close() + }() + + s := NewExecutionMutableStateSuite( + t, + shardStore, + executionStore, + serialization.NewSerializer(), + logger, + ) + suite.Run(t, s) +} + +func TestSQLiteExecutionMutableStateTaskStoreSuite(t *testing.T) { + cfg := NewSQLiteConfig() + SetupSQLiteDatabase(cfg) + logger := log.NewNoopLogger() + factory := sql.NewFactory( + *cfg, + resolver.NewNoopResolver(), + testSQLiteClusterName, + logger, + ) + shardStore, err := factory.NewShardStore() + if err != nil { + t.Fatalf("unable to create SQLite DB: %v", err) + } + executionStore, err := factory.NewExecutionStore() + if err != nil { + t.Fatalf("unable to create SQLite DB: %v", err) + } + defer func() { + factory.Close() + }() + + s := NewExecutionMutableStateTaskSuite( + t, + shardStore, + executionStore, + serialization.NewSerializer(), + logger, + ) + suite.Run(t, s) +} + +func TestSQLiteHistoryStoreSuite(t *testing.T) { + cfg := NewSQLiteConfig() + SetupSQLiteDatabase(cfg) + logger := log.NewNoopLogger() + factory := sql.NewFactory( + *cfg, + resolver.NewNoopResolver(), + testSQLiteClusterName, + logger, + ) + store, err := factory.NewExecutionStore() + if err != nil { + t.Fatalf("unable to create SQLite DB: %v", err) + } + defer func() { + factory.Close() + }() + + s := NewHistoryEventsSuite(t, store, logger) + suite.Run(t, s) +} + +func TestSQLiteTaskQueueSuite(t *testing.T) { + cfg := NewSQLiteConfig() + SetupSQLiteDatabase(cfg) + logger := log.NewNoopLogger() + factory := sql.NewFactory( + *cfg, + resolver.NewNoopResolver(), + testSQLiteClusterName, + logger, + ) + taskQueueStore, err := factory.NewTaskStore() + if err != nil { + t.Fatalf("unable to create SQLite DB: %v", err) + } + defer func() { + factory.Close() + }() + + s := NewTaskQueueSuite(t, taskQueueStore, logger) + suite.Run(t, s) +} + +func TestSQLiteTaskQueueTaskSuite(t *testing.T) { + cfg := NewSQLiteConfig() + SetupSQLiteDatabase(cfg) + logger := log.NewNoopLogger() + factory := sql.NewFactory( + *cfg, + resolver.NewNoopResolver(), + testSQLiteClusterName, + logger, + ) + taskQueueStore, err := factory.NewTaskStore() + if err != nil { + t.Fatalf("unable to create SQLite DB: %v", err) + } + defer func() { + factory.Close() + }() + + s := NewTaskQueueTaskSuite(t, taskQueueStore, logger) + suite.Run(t, s) +} + +// NewSQLiteConfig returns a new MySQL config for test +func NewSQLiteConfig() *config.SQL { + return &config.SQL{ + User: "", + Password: "", + ConnectAddr: environment.Localhost, + ConnectProtocol: "tcp", + PluginName: "sqlite", + DatabaseName: "default", + ConnectAttributes: map[string]string{"mode": "memory", "cache": "private"}, + } +} + +func SetupSQLiteDatabase(cfg *config.SQL) { + adminCfg := *cfg + // NOTE need to connect with empty name to create new database + adminCfg.DatabaseName = "" + + db, err := sql.NewSQLAdminDB(sqlplugin.DbKindUnknown, &adminCfg, resolver.NewNoopResolver()) + if err != nil { + panic(fmt.Sprintf("unable to create SQLite admin DB: %v", err)) + } + defer func() { _ = db.Close() }() + + err = db.CreateDatabase(cfg.DatabaseName) + if err != nil { + panic(fmt.Sprintf("unable to create SQLite database: %v", err)) + } +}