Skip to content

Commit

Permalink
Cache table and schema indexes on schema address
Browse files Browse the repository at this point in the history
  • Loading branch information
max-hoffman committed May 14, 2024
1 parent eb86db5 commit 51a2cae
Show file tree
Hide file tree
Showing 27 changed files with 346 additions and 166 deletions.
3 changes: 2 additions & 1 deletion go/cmd/dolt/commands/engine/sqlengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package engine
import (
"context"
"fmt"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/writer"
"os"
"runtime"
"strconv"
Expand Down Expand Up @@ -396,7 +397,7 @@ func sqlContextFactory() contextFactory {
// doltSessionFactory returns a sessionFactory that creates a new DoltSession
func doltSessionFactory(pro *dsqle.DoltDatabaseProvider, statsPro sql.StatsProvider, config config.ReadWriteConfig, bc *branch_control.Controller, autocommit bool) sessionFactory {
return func(mysqlSess *sql.BaseSession, provider sql.DatabaseProvider) (*dsess.DoltSession, error) {
doltSession, err := dsess.NewDoltSession(mysqlSess, pro, config, bc, statsPro)
doltSession, err := dsess.NewDoltSession(mysqlSess, pro, config, bc, statsPro, writer.NewWriteSession)
if err != nil {
return nil, err
}
Expand Down
19 changes: 11 additions & 8 deletions go/libraries/doltcore/doltdb/durable/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type IndexSet interface {
HashOf() (hash.Hash, error)

// GetIndex gets an index from the set.
GetIndex(ctx context.Context, sch schema.Schema, name string) (Index, error)
GetIndex(ctx context.Context, tableSch schema.Schema, idxSch schema.Schema, name string) (Index, error)

// HasIndex returns true if an index with the specified name exists in the set.
HasIndex(ctx context.Context, name string) (bool, error)
Expand Down Expand Up @@ -160,7 +160,7 @@ func IterAllIndexes(
cb func(name string, idx Index) error,
) error {
for _, def := range sch.Indexes().AllIndexes() {
idx, err := set.GetIndex(ctx, sch, def.Name())
idx, err := set.GetIndex(ctx, sch, nil, def.Name())
if err != nil {
return err
}
Expand Down Expand Up @@ -405,7 +405,7 @@ func (s nomsIndexSet) HasIndex(ctx context.Context, name string) (bool, error) {
}

// GetIndex implements IndexSet.
func (s nomsIndexSet) GetIndex(ctx context.Context, sch schema.Schema, name string) (Index, error) {
func (s nomsIndexSet) GetIndex(ctx context.Context, tableSch schema.Schema, idxSch schema.Schema, name string) (Index, error) {
v, ok, err := s.indexes.MaybeGet(ctx, types.String(name))
if !ok {
err = fmt.Errorf("index %s not found in IndexSet", name)
Expand All @@ -414,12 +414,12 @@ func (s nomsIndexSet) GetIndex(ctx context.Context, sch schema.Schema, name stri
return nil, err
}

idx := sch.Indexes().GetByName(name)
idx := tableSch.Indexes().GetByName(name)
if idx == nil {
return nil, fmt.Errorf("index not found: %s", name)
}

return indexFromRef(ctx, s.vrw, s.ns, idx.Schema(), v.(types.Ref))
return indexFromRef(ctx, s.vrw, s.ns, idxSch, v.(types.Ref))
}

// PutNomsIndex implements IndexSet.
Expand Down Expand Up @@ -497,19 +497,22 @@ func (is doltDevIndexSet) HasIndex(ctx context.Context, name string) (bool, erro
return true, nil
}

func (is doltDevIndexSet) GetIndex(ctx context.Context, sch schema.Schema, name string) (Index, error) {
func (is doltDevIndexSet) GetIndex(ctx context.Context, tableSch schema.Schema, idxSch schema.Schema, name string) (Index, error) {
addr, err := is.am.Get(ctx, name)
if err != nil {
return nil, err
}
if addr.IsEmpty() {
return nil, fmt.Errorf("index %s not found in IndexSet", name)
}
idx := sch.Indexes().GetByName(name)
idx := tableSch.Indexes().GetByName(name)
if idx == nil {
return nil, fmt.Errorf("index schema not found: %s", name)
}
return indexFromAddr(ctx, is.vrw, is.ns, idx.Schema(), addr)
if idxSch == nil {
idxSch = idx.Schema()
}
return indexFromAddr(ctx, is.vrw, is.ns, idxSch, addr)
}

func (is doltDevIndexSet) PutIndex(ctx context.Context, name string, idx Index) (IndexSet, error) {
Expand Down
42 changes: 35 additions & 7 deletions go/libraries/doltcore/doltdb/root_val.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,15 @@ type RootValue interface {

// rootValue is Dolt's implementation of RootValue.
type rootValue struct {
vrw types.ValueReadWriter
ns tree.NodeStore
st rootValueStorage
fkc *ForeignKeyCollection // cache the first load
hash hash.Hash // cache first load
vrw types.ValueReadWriter
ns tree.NodeStore
st rootValueStorage
fkc *ForeignKeyCollection // cache the first load
hash hash.Hash
fragmentHash hash.Hash
schemaHash hash.Hash
// TODO cache schema fragment table hash
// TODO cache schema hash
}

var _ RootValue = (*rootValue)(nil)
Expand Down Expand Up @@ -161,7 +165,7 @@ var NewRootValue = func(ctx context.Context, vrw types.ValueReadWriter, ns tree.
}
}

return &rootValue{vrw, ns, storage, nil, hash.Hash{}}, nil
return &rootValue{vrw, ns, storage, nil, hash.Hash{}, hash.Hash{}, hash.Hash{}}, nil
}

// EmptyRootValue returns an empty RootValue. This is a variable as it's changed in Doltgres.
Expand Down Expand Up @@ -683,7 +687,7 @@ func (root *rootValue) IterTables(ctx context.Context, cb func(name string, tabl
}

func (root *rootValue) withStorage(st rootValueStorage) *rootValue {
return &rootValue{root.vrw, root.ns, st, nil, hash.Hash{}}
return &rootValue{root.vrw, root.ns, st, nil, hash.Hash{}, hash.Hash{}, hash.Hash{}}
}

func (root *rootValue) NomsValue() types.Value {
Expand Down Expand Up @@ -815,6 +819,30 @@ func (root *rootValue) HashOf() (hash.Hash, error) {
return root.hash, nil
}

// SchemaHash gets the hash of the schema value
func (root *rootValue) SchemaHash() (hash.Hash, error) {
if root.schemaHash.IsEmpty() {
var err error
root.hash, err = root.st.nomsValue().Hash(root.vrw.Format())
if err != nil {
return hash.Hash{}, nil
}
}
return root.schemaHash, nil
}

// FragmentHash gets the hash of the schema fragment table root
func (root *rootValue) FragmentHash() (hash.Hash, error) {
if root.fragmentHash.IsEmpty() {
var err error
root.hash, err = root.st.nomsValue().Hash(root.vrw.Format())
if err != nil {
return hash.Hash{}, nil
}
}
return root.fragmentHash, nil
}

// RenameTable renames a table by changing its string key in the RootValue's table map. In order to preserve
// column tag information, use this method instead of a table drop + add.
func (root *rootValue) RenameTable(ctx context.Context, oldName, newName string) (RootValue, error) {
Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/doltdb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (t *Table) GetNomsIndexRowData(ctx context.Context, indexName string) (type
return types.EmptyMap, err
}

idx, err := indexes.GetIndex(ctx, sch, indexName)
idx, err := indexes.GetIndex(ctx, sch, nil, indexName)
if err != nil {
return types.EmptyMap, err
}
Expand All @@ -597,7 +597,7 @@ func (t *Table) GetIndexRowData(ctx context.Context, indexName string) (durable.
return nil, err
}

return indexes.GetIndex(ctx, sch, indexName)
return indexes.GetIndex(ctx, sch, nil, indexName)
}

// SetIndexRows replaces the current row data for the given index and returns an updated Table.
Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/merge/merge_prolly_indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func mergeProllySecondaryIndexes(
tryGetIdx := func(sch schema.Schema, iS durable.IndexSet, indexName string) (prolly.Map, bool, error) {
ok := sch.Indexes().Contains(indexName)
if ok {
idx, err := iS.GetIndex(ctx, sch, indexName)
idx, err := iS.GetIndex(ctx, sch, nil, indexName)
if err != nil {
return prolly.Map{}, false, err
}
Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/merge/merge_prolly_rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func newUniqValidator(ctx *sql.Context, sch schema.Schema, tm *TableMerger, vm *
continue // todo: how do we validate in this case?
}

idx, err := indexes.GetIndex(ctx, sch, def.Name())
idx, err := indexes.GetIndex(ctx, sch, nil, def.Name())
if err != nil {
return uniqValidator{}, err
}
Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/merge/mutable_secondary_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
func GetMutableSecondaryIdxs(ctx *sql.Context, sch schema.Schema, tableName string, indexes durable.IndexSet) ([]MutableSecondaryIdx, error) {
mods := make([]MutableSecondaryIdx, sch.Indexes().Count())
for i, index := range sch.Indexes().AllIndexes() {
idx, err := indexes.GetIndex(ctx, sch, index.Name())
idx, err := indexes.GetIndex(ctx, sch, nil, index.Name())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -66,7 +66,7 @@ func GetMutableSecondaryIdxsWithPending(ctx *sql.Context, sch schema.Schema, tab
continue
}

idx, err := indexes.GetIndex(ctx, sch, index.Name())
idx, err := indexes.GetIndex(ctx, sch, nil, index.Name())
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/merge/schema_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1480,9 +1480,9 @@ func testSchemaMergeHelper(t *testing.T, tests []schemaMergeTest, flipSides bool
actIndexSet, err := actTbl.GetIndexSet(ctx)
require.NoError(t, err)
expSchema.Indexes().Iter(func(index schema.Index) (stop bool, err error) {
expIndex, err := expIndexSet.GetIndex(ctx, expSchema, index.Name())
expIndex, err := expIndexSet.GetIndex(ctx, expSchema, nil, index.Name())
require.NoError(t, err)
actIndex, err := actIndexSet.GetIndex(ctx, expSchema, index.Name())
actIndex, err := actIndexSet.GetIndex(ctx, expSchema, nil, index.Name())
require.NoError(t, err)
expIndexHash, err := expIndex.HashOf()
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions go/libraries/doltcore/migrate/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,19 +565,19 @@ func migrateIndexSet(
return nil, err
}
for _, def := range sch.Indexes().AllIndexes() {
idx, err := oldParentSet.GetIndex(ctx, sch, def.Name())
idx, err := oldParentSet.GetIndex(ctx, sch, nil, def.Name())
if err != nil {
return nil, err
}
oldParent := durable.NomsMapFromIndex(idx)

idx, err = oldSet.GetIndex(ctx, sch, def.Name())
idx, err = oldSet.GetIndex(ctx, sch, nil, def.Name())
if err != nil {
return nil, err
}
old := durable.NomsMapFromIndex(idx)

idx, err = newParentSet.GetIndex(ctx, sch, def.Name())
idx, err = newParentSet.GetIndex(ctx, sch, nil, def.Name())
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ func (a *binlogReplicaApplier) processRowEvent(ctx *sql.Context, event mysql.Bin
//

// closeWriteSession flushes and closes the specified |writeSession| and returns an error if anything failed.
func closeWriteSession(ctx *sql.Context, engine *gms.Engine, databaseName string, writeSession writer.WriteSession) error {
func closeWriteSession(ctx *sql.Context, engine *gms.Engine, databaseName string, writeSession dsess.WriteSession) error {
newWorkingSet, err := writeSession.Flush(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -727,7 +727,7 @@ func getTableSchema(ctx *sql.Context, engine *gms.Engine, tableName, databaseNam
}

// getTableWriter returns a WriteSession and a TableWriter for writing to the specified |table| in the specified |database|.
func getTableWriter(ctx *sql.Context, engine *gms.Engine, tableName, databaseName string, foreignKeyChecksDisabled bool) (writer.WriteSession, writer.TableWriter, error) {
func getTableWriter(ctx *sql.Context, engine *gms.Engine, tableName, databaseName string, foreignKeyChecksDisabled bool) (dsess.WriteSession, dsess.TableWriter, error) {
database, err := engine.Analyzer.Catalog.Database(ctx, databaseName)
if err != nil {
return nil, nil, err
Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (a AutoIncrementTracker) deepSet(ctx *sql.Context, tableName string, table
return nil, err
}

indexData, err = indexes.GetIndex(ctx, sch, aiIndex.Name())
indexData, err = indexes.GetIndex(ctx, sch, nil, aiIndex.Name())
if err != nil {
return nil, err
}
Expand Down
7 changes: 3 additions & 4 deletions go/libraries/doltcore/sqle/dsess/database_session_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/globalstate"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/writer"
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
"github.com/dolthub/dolt/go/libraries/utils/concurrentmap"
)
Expand Down Expand Up @@ -96,7 +95,7 @@ func newEmptyDatabaseSessionState() *DatabaseSessionState {
type SessionState interface {
WorkingSet() *doltdb.WorkingSet
WorkingRoot() doltdb.RootValue
WriteSession() writer.WriteSession
WriteSession() WriteSession
EditOpts() editor.Options
SessionCache() *SessionCache
}
Expand All @@ -120,7 +119,7 @@ type branchState struct {
// dbData is an accessor for the underlying doltDb
dbData env.DbData
// writeSession is this head's write session
writeSession writer.WriteSession
writeSession WriteSession
// readOnly is true if this database is read only
readOnly bool
// dirty is true if this branch state has uncommitted changes
Expand Down Expand Up @@ -161,7 +160,7 @@ func (bs *branchState) WorkingSet() *doltdb.WorkingSet {
return bs.workingSet
}

func (bs *branchState) WriteSession() writer.WriteSession {
func (bs *branchState) WriteSession() WriteSession {
return bs.writeSession
}

Expand Down
44 changes: 42 additions & 2 deletions go/libraries/doltcore/sqle/dsess/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/env/actions"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/globalstate"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/writer"
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
"github.com/dolthub/dolt/go/libraries/utils/config"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
Expand Down Expand Up @@ -62,6 +61,7 @@ type DoltSession struct {
statsProv sql.StatsProvider
mu *sync.Mutex
fs filesys.Filesys
writeSessProv writeSessFunc

// If non-nil, this will be returned from ValidateSession.
// Used by sqle/cluster to put a session into a terminal err state.
Expand Down Expand Up @@ -90,13 +90,16 @@ func DefaultSession(pro DoltDatabaseProvider) *DoltSession {
}
}

type writeSessFunc func(nbf *types.NomsBinFormat, ws *doltdb.WorkingSet, aiTracker globalstate.AutoIncrementTracker, opts editor.Options) WriteSession

// NewDoltSession creates a DoltSession object from a standard sql.Session and 0 or more Database objects.
func NewDoltSession(
sqlSess *sql.BaseSession,
pro DoltDatabaseProvider,
conf config.ReadWriteConfig,
branchController *branch_control.Controller,
statsProvider sql.StatsProvider,
writeSessProv writeSessFunc,
) (*DoltSession, error) {
username := conf.GetStringOrDefault(config.UserNameKey, "")
email := conf.GetStringOrDefault(config.UserEmailKey, "")
Expand All @@ -115,6 +118,7 @@ func NewDoltSession(
statsProv: statsProvider,
mu: &sync.Mutex{},
fs: pro.FileSystem(),
writeSessProv: writeSessProv,
}

return sess, nil
Expand Down Expand Up @@ -1253,7 +1257,7 @@ func (d *DoltSession) addDB(ctx *sql.Context, db SqlDatabase) error {
if err != nil {
return err
}
branchState.writeSession = writer.NewWriteSession(nbf, branchState.WorkingSet(), tracker, editOpts)
branchState.writeSession = d.writeSessProv(nbf, branchState.WorkingSet(), tracker, editOpts)
}

// WorkingSet is nil in the case of a read only, detached head DB
Expand Down Expand Up @@ -1742,3 +1746,39 @@ func DefaultHead(baseName string, db SqlDatabase) (string, error) {

return head, nil
}

// WriteSession encapsulates writes made within a SQL session.
// It's responsible for creating and managing the lifecycle of TableWriter's.
type WriteSession interface {
// GetTableWriter creates a TableWriter and adds it to the WriteSession.
GetTableWriter(ctx *sql.Context, tableName doltdb.TableName, db string, setter SessionRootSetter) (TableWriter, error)

// SetWorkingSet modifies the state of the WriteSession. The WorkingSetRef of |ws| must match the existing Ref.
SetWorkingSet(ctx *sql.Context, ws *doltdb.WorkingSet) error

// GetOptions returns the editor.Options for this session.
GetOptions() editor.Options

// SetOptions sets the editor.Options for this session.
SetOptions(opts editor.Options)

WriteSessionFlusher
}

type TableWriter interface {
sql.TableEditor
sql.ForeignKeyEditor
sql.AutoIncrementSetter
}

// SessionRootSetter sets the root value for the session.
type SessionRootSetter func(ctx *sql.Context, dbName string, root doltdb.RootValue) error

// WriteSessionFlusher is responsible for flushing any pending edits to the session
type WriteSessionFlusher interface {
// Flush flushes the pending writes in the session.
Flush(ctx *sql.Context) (*doltdb.WorkingSet, error)
// FlushWithAutoIncrementOverrides flushes the pending writes in the session, overriding the auto increment values
// for any tables provided in the map
FlushWithAutoIncrementOverrides(ctx *sql.Context, increment bool, autoIncrements map[string]uint64) (*doltdb.WorkingSet, error)
}
Loading

0 comments on commit 51a2cae

Please sign in to comment.