Skip to content

Commit

Permalink
Single binary server (SQLite) and concurrency bugfixes for in memory …
Browse files Browse the repository at this point in the history
…storages (#2114)
  • Loading branch information
wolfy-j authored Feb 1, 2022
1 parent 426621d commit 736fabf
Show file tree
Hide file tree
Showing 14 changed files with 492 additions and 154 deletions.
27 changes: 17 additions & 10 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
workflowpb "go.temporal.io/api/workflow/v1"

enumsspb "go.temporal.io/server/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
Expand All @@ -60,6 +61,7 @@ import (
"go.temporal.io/server/common/persistence/sql"
"go.temporal.io/server/common/persistence/sql/sqlplugin/mysql"
"go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql"
"go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/resolver"
"go.temporal.io/server/common/searchattribute"
Expand All @@ -79,15 +81,16 @@ type (

// TestBaseOptions options to configure workflow test base.
TestBaseOptions struct {
SQLDBPluginName string
DBName string
DBUsername string
DBPassword string
DBHost string
DBPort int `yaml:"-"`
StoreType string `yaml:"-"`
SchemaDir string `yaml:"-"`
FaultInjection *config.FaultInjection `yaml:"faultinjection"`
SQLDBPluginName string
DBName string
DBUsername string
DBPassword string
DBHost string
DBPort int `yaml:"-"`
ConnectAttributes map[string]string
StoreType string `yaml:"-"`
SchemaDir string `yaml:"-"`
FaultInjection *config.FaultInjection `yaml:"faultinjection"`
}

// TestBase wraps the base setup needed to create workflows over persistence layer.
Expand Down Expand Up @@ -152,6 +155,8 @@ func NewTestBaseWithSQL(options *TestBaseOptions) TestBase {
options.DBPort = environment.GetMySQLPort()
case postgresql.PluginName:
options.DBPort = environment.GetPostgreSQLPort()
case sqlite.PluginName:
options.DBPort = 0
default:
panic(fmt.Sprintf("unknown sql store drier: %v", options.SQLDBPluginName))
}
Expand All @@ -162,11 +167,13 @@ func NewTestBaseWithSQL(options *TestBaseOptions) TestBase {
options.DBHost = environment.GetMySQLAddress()
case postgresql.PluginName:
options.DBHost = environment.GetPostgreSQLAddress()
case sqlite.PluginName:
options.DBHost = environment.Localhost
default:
panic(fmt.Sprintf("unknown sql store drier: %v", options.SQLDBPluginName))
}
}
testCluster := sql.NewTestCluster(options.SQLDBPluginName, options.DBName, options.DBUsername, options.DBPassword, options.DBHost, options.DBPort, options.SchemaDir, options.FaultInjection, logger)
testCluster := sql.NewTestCluster(options.SQLDBPluginName, options.DBName, options.DBUsername, options.DBPassword, options.DBHost, options.DBPort, options.ConnectAttributes, options.SchemaDir, options.FaultInjection, logger)
return NewTestBaseForCluster(testCluster, logger)
}

Expand Down
20 changes: 20 additions & 0 deletions common/persistence/persistence-tests/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/persistence/sql/sqlplugin/mysql"
"go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql"
"go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite"
"go.temporal.io/server/environment"
)

Expand All @@ -39,6 +40,11 @@ const (
testPostgreSQLUser = "temporal"
testPostgreSQLPassword = "temporal"
testPostgreSQLSchemaDir = "schema/postgresql/v96"

testSQLiteUser = ""
testSQLitePassword = ""
testSQLiteMode = "memory"
testSQLiteSchemaDir = "" // specify if mode is not "memory"
)

// GetMySQLTestClusterOption return test options
Expand Down Expand Up @@ -66,3 +72,17 @@ func GetPostgreSQLTestClusterOption() *TestBaseOptions {
StoreType: config.StoreTypeSQL,
}
}

// GetSQLiteTestClusterOption return test options
func GetSQLiteTestClusterOption() *TestBaseOptions {
return &TestBaseOptions{
SQLDBPluginName: sqlite.PluginName,
DBUsername: testSQLiteUser,
DBPassword: testSQLitePassword,
DBHost: environment.Localhost,
DBPort: 0,
SchemaDir: testSQLiteSchemaDir,
StoreType: config.StoreTypeSQL,
ConnectAttributes: map[string]string{"mode": testSQLiteMode},
}
}
14 changes: 9 additions & 5 deletions common/persistence/sql/sqlPersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ import (
"path"
"strings"

"go.temporal.io/server/common/config"

"go.temporal.io/server/common"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand All @@ -58,6 +57,7 @@ func NewTestCluster(
password string,
host string,
port int,
connectAttributes map[string]string,
schemaDir string,
faultInjection *config.FaultInjection,
logger log.Logger,
Expand All @@ -66,9 +66,6 @@ func NewTestCluster(
result.logger = logger
result.dbName = dbName

if schemaDir == "" {
panic("must provide schema dir")
}
result.schemaDir = schemaDir
result.cfg = config.SQL{
User: username,
Expand All @@ -78,7 +75,9 @@ func NewTestCluster(
PluginName: pluginName,
DatabaseName: dbName,
TaskScanPartitions: 4,
ConnectAttributes: connectAttributes,
}

result.faultInjection = faultInjection
return &result
}
Expand All @@ -92,6 +91,11 @@ func (s *TestCluster) DatabaseName() string {
func (s *TestCluster) SetupTestDatabase() {
s.CreateDatabase()

if s.schemaDir == "" {
s.logger.Info("No schema directory provided, skipping schema setup")
return
}

schemaDir := s.schemaDir + "/"
if !strings.HasPrefix(schemaDir, "/") && !strings.HasPrefix(schemaDir, "../") {
temporalPackageDir, err := getTemporalPackageDir()
Expand Down
111 changes: 111 additions & 0 deletions common/persistence/sql/sqlplugin/sqlite/conn_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:build cgo

package sqlite

import (
"sync"

"github.com/jmoiron/sqlx"

"go.temporal.io/server/common/config"
"go.temporal.io/server/common/resolver"
)

// This pool properly enabled the support for SQLite in the temporal server.
// Internal Temporal services are highly isolated, each will create at least a single connection to the database violating
// the SQLite concept of safety only within a single thread.
type connPool struct {
mu sync.Mutex
pool map[string]entry
}

type entry struct {
db *sqlx.DB
refCount int
}

func newConnPool() *connPool {
return &connPool{
pool: make(map[string]entry),
}
}

// Allocate allocates the shared database in the pool or returns already exists instance with the same DSN. If instance
// for such DSN already exists, it will be returned instead. Each request counts as reference until Close.
func (cp *connPool) Allocate(
cfg *config.SQL,
resolver resolver.ServiceResolver,
create func(cfg *config.SQL, resolver resolver.ServiceResolver) (*sqlx.DB, error),
) (db *sqlx.DB, err error) {
cp.mu.Lock()
defer cp.mu.Unlock()

dsn, err := buildDSN(cfg)
if err != nil {
return nil, err
}

if entry, ok := cp.pool[dsn]; ok {
entry.refCount++
return entry.db, nil
}

db, err = create(cfg, resolver)
if err != nil {
return nil, err
}

cp.pool[dsn] = entry{db: db, refCount: 1}

return db, nil
}

// Close virtual connection to database. Only closes for real once no references left.
func (cp *connPool) Close(cfg *config.SQL) {
cp.mu.Lock()
defer cp.mu.Unlock()

dsn, err := buildDSN(cfg)
if err != nil {
return
}

e, ok := cp.pool[dsn]
if !ok {
// no such database
return
}

e.refCount--
if e.refCount == 0 {
// todo: at the moment pool will persist a single connection to the DB for the whole duration of application
// temporal will start and stop DB connections multiple times, which will cause the loss of the cache
// and "db is closed" error
// e.db.Close()
// delete(cp.pool, dsn)
}
}
30 changes: 24 additions & 6 deletions common/persistence/sql/sqlplugin/sqlite/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ package sqlite
import (
"context"
"fmt"
"sync"

"github.com/jmoiron/sqlx"

Expand All @@ -43,6 +44,9 @@ type db struct {
dbKind sqlplugin.DbKind
dbName string

mu sync.RWMutex
onClose []func()

db *sqlx.DB
tx *sqlx.Tx
conn sqlplugin.Conn
Expand All @@ -62,10 +66,11 @@ func newDB(
tx *sqlx.Tx,
) *db {
mdb := &db{
dbKind: dbKind,
dbName: dbName,
db: xdb,
tx: tx,
dbKind: dbKind,
dbName: dbName,
onClose: make([]func(), 0),
db: xdb,
tx: tx,
}
mdb.conn = xdb
if tx != nil {
Expand Down Expand Up @@ -94,11 +99,24 @@ func (mdb *db) Rollback() error {
return mdb.tx.Rollback()
}

func (mdb *db) OnClose(hook func()) {
mdb.mu.Lock()
mdb.onClose = append(mdb.onClose, hook)
mdb.mu.Unlock()
}

// Close closes the connection to the sqlite db
func (mdb *db) Close() error {
mdb.mu.RLock()
defer mdb.mu.RUnlock()

for _, hook := range mdb.onClose {
// de-registers the database from conn pool
hook()
}

// database connection will be automatically closed by the hook handler when all references are removed
return nil
// TODO: fix `Error: sql: database is closed` when Close() is enabled
// return mdb.db.Close()
}

// PluginName returns the name of the plugin
Expand Down
Loading

0 comments on commit 736fabf

Please sign in to comment.