Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache table and schema indexes on schema address #7859

Merged
merged 21 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go/cmd/dolt/commands/engine/sqlengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/mysql_file_handler"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/statsnoms"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/statspro"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/writer"
"github.com/dolthub/dolt/go/libraries/utils/config"
"github.com/dolthub/dolt/go/store/types"
)
Expand Down Expand Up @@ -397,7 +398,7 @@ func sqlContextFactory() contextFactory {
// doltSessionFactory returns a sessionFactory that creates a new DoltSession
func doltSessionFactory(pro *dsqle.DoltDatabaseProvider, statsPro sql.StatsProvider, config config.ReadWriteConfig, bc *branch_control.Controller, autocommit bool) sessionFactory {
return func(mysqlSess *sql.BaseSession, provider sql.DatabaseProvider) (*dsess.DoltSession, error) {
doltSession, err := dsess.NewDoltSession(mysqlSess, pro, config, bc, statsPro)
doltSession, err := dsess.NewDoltSession(mysqlSess, pro, config, bc, statsPro, writer.NewWriteSession)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion go/cmd/dolt/commands/filter-branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/rebase"
dsqle "github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/writer"
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
"github.com/dolthub/dolt/go/libraries/utils/argparser"
"github.com/dolthub/dolt/go/store/hash"
Expand Down Expand Up @@ -292,7 +293,7 @@ func rebaseSqlEngine(ctx context.Context, dEnv *env.DoltEnv, cm *doltdb.Commit)
return nil, nil, err
}

sess := dsess.DefaultSession(pro)
sess := dsess.DefaultSession(pro, writer.NewWriteSession)

sqlCtx := sql.NewContext(ctx, sql.WithSession(sess))
err = sqlCtx.SetSessionVariable(sqlCtx, sql.AutoCommitSessionVar, false)
Expand Down
19 changes: 11 additions & 8 deletions go/libraries/doltcore/doltdb/durable/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type IndexSet interface {
HashOf() (hash.Hash, error)

// GetIndex gets an index from the set.
GetIndex(ctx context.Context, sch schema.Schema, name string) (Index, error)
GetIndex(ctx context.Context, tableSch schema.Schema, idxSch schema.Schema, name string) (Index, error)

// HasIndex returns true if an index with the specified name exists in the set.
HasIndex(ctx context.Context, name string) (bool, error)
Expand Down Expand Up @@ -160,7 +160,7 @@ func IterAllIndexes(
cb func(name string, idx Index) error,
) error {
for _, def := range sch.Indexes().AllIndexes() {
idx, err := set.GetIndex(ctx, sch, def.Name())
idx, err := set.GetIndex(ctx, sch, nil, def.Name())
if err != nil {
return err
}
Expand Down Expand Up @@ -405,7 +405,7 @@ func (s nomsIndexSet) HasIndex(ctx context.Context, name string) (bool, error) {
}

// GetIndex implements IndexSet.
func (s nomsIndexSet) GetIndex(ctx context.Context, sch schema.Schema, name string) (Index, error) {
func (s nomsIndexSet) GetIndex(ctx context.Context, tableSch schema.Schema, idxSch schema.Schema, name string) (Index, error) {
v, ok, err := s.indexes.MaybeGet(ctx, types.String(name))
if !ok {
err = fmt.Errorf("index %s not found in IndexSet", name)
Expand All @@ -414,12 +414,12 @@ func (s nomsIndexSet) GetIndex(ctx context.Context, sch schema.Schema, name stri
return nil, err
}

idx := sch.Indexes().GetByName(name)
idx := tableSch.Indexes().GetByName(name)
if idx == nil {
return nil, fmt.Errorf("index not found: %s", name)
}

return indexFromRef(ctx, s.vrw, s.ns, idx.Schema(), v.(types.Ref))
return indexFromRef(ctx, s.vrw, s.ns, idxSch, v.(types.Ref))
}

// PutNomsIndex implements IndexSet.
Expand Down Expand Up @@ -497,19 +497,22 @@ func (is doltDevIndexSet) HasIndex(ctx context.Context, name string) (bool, erro
return true, nil
}

func (is doltDevIndexSet) GetIndex(ctx context.Context, sch schema.Schema, name string) (Index, error) {
func (is doltDevIndexSet) GetIndex(ctx context.Context, tableSch schema.Schema, idxSch schema.Schema, name string) (Index, error) {
addr, err := is.am.Get(ctx, name)
if err != nil {
return nil, err
}
if addr.IsEmpty() {
return nil, fmt.Errorf("index %s not found in IndexSet", name)
}
idx := sch.Indexes().GetByName(name)
idx := tableSch.Indexes().GetByName(name)
if idx == nil {
return nil, fmt.Errorf("index schema not found: %s", name)
}
return indexFromAddr(ctx, is.vrw, is.ns, idx.Schema(), addr)
if idxSch == nil {
idxSch = idx.Schema()
}
return indexFromAddr(ctx, is.vrw, is.ns, idxSch, addr)
}

func (is doltDevIndexSet) PutIndex(ctx context.Context, name string, idx Index) (IndexSet, error) {
Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/doltdb/root_val.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type rootValue struct {
ns tree.NodeStore
st rootValueStorage
fkc *ForeignKeyCollection // cache the first load
hash hash.Hash // cache first load
hash hash.Hash // cache the first load
}

var _ RootValue = (*rootValue)(nil)
Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/doltdb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (t *Table) GetNomsIndexRowData(ctx context.Context, indexName string) (type
return types.EmptyMap, err
}

idx, err := indexes.GetIndex(ctx, sch, indexName)
idx, err := indexes.GetIndex(ctx, sch, nil, indexName)
if err != nil {
return types.EmptyMap, err
}
Expand All @@ -597,7 +597,7 @@ func (t *Table) GetIndexRowData(ctx context.Context, indexName string) (durable.
return nil, err
}

return indexes.GetIndex(ctx, sch, indexName)
return indexes.GetIndex(ctx, sch, nil, indexName)
}

// SetIndexRows replaces the current row data for the given index and returns an updated Table.
Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/merge/merge_prolly_indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func mergeProllySecondaryIndexes(
tryGetIdx := func(sch schema.Schema, iS durable.IndexSet, indexName string) (prolly.Map, bool, error) {
ok := sch.Indexes().Contains(indexName)
if ok {
idx, err := iS.GetIndex(ctx, sch, indexName)
idx, err := iS.GetIndex(ctx, sch, nil, indexName)
if err != nil {
return prolly.Map{}, false, err
}
Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/merge/merge_prolly_rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func newUniqValidator(ctx *sql.Context, sch schema.Schema, tm *TableMerger, vm *
continue // todo: how do we validate in this case?
}

idx, err := indexes.GetIndex(ctx, sch, def.Name())
idx, err := indexes.GetIndex(ctx, sch, nil, def.Name())
if err != nil {
return uniqValidator{}, err
}
Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/merge/mutable_secondary_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
func GetMutableSecondaryIdxs(ctx *sql.Context, sch schema.Schema, tableName string, indexes durable.IndexSet) ([]MutableSecondaryIdx, error) {
mods := make([]MutableSecondaryIdx, sch.Indexes().Count())
for i, index := range sch.Indexes().AllIndexes() {
idx, err := indexes.GetIndex(ctx, sch, index.Name())
idx, err := indexes.GetIndex(ctx, sch, nil, index.Name())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -66,7 +66,7 @@ func GetMutableSecondaryIdxsWithPending(ctx *sql.Context, sch schema.Schema, tab
continue
}

idx, err := indexes.GetIndex(ctx, sch, index.Name())
idx, err := indexes.GetIndex(ctx, sch, nil, index.Name())
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/merge/schema_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1480,9 +1480,9 @@ func testSchemaMergeHelper(t *testing.T, tests []schemaMergeTest, flipSides bool
actIndexSet, err := actTbl.GetIndexSet(ctx)
require.NoError(t, err)
expSchema.Indexes().Iter(func(index schema.Index) (stop bool, err error) {
expIndex, err := expIndexSet.GetIndex(ctx, expSchema, index.Name())
expIndex, err := expIndexSet.GetIndex(ctx, expSchema, nil, index.Name())
require.NoError(t, err)
actIndex, err := actIndexSet.GetIndex(ctx, expSchema, index.Name())
actIndex, err := actIndexSet.GetIndex(ctx, expSchema, nil, index.Name())
require.NoError(t, err)
expIndexHash, err := expIndex.HashOf()
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions go/libraries/doltcore/migrate/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,19 +565,19 @@ func migrateIndexSet(
return nil, err
}
for _, def := range sch.Indexes().AllIndexes() {
idx, err := oldParentSet.GetIndex(ctx, sch, def.Name())
idx, err := oldParentSet.GetIndex(ctx, sch, nil, def.Name())
if err != nil {
return nil, err
}
oldParent := durable.NomsMapFromIndex(idx)

idx, err = oldSet.GetIndex(ctx, sch, def.Name())
idx, err = oldSet.GetIndex(ctx, sch, nil, def.Name())
if err != nil {
return nil, err
}
old := durable.NomsMapFromIndex(idx)

idx, err = newParentSet.GetIndex(ctx, sch, def.Name())
idx, err = newParentSet.GetIndex(ctx, sch, nil, def.Name())
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ func (a *binlogReplicaApplier) processRowEvent(ctx *sql.Context, event mysql.Bin
//

// closeWriteSession flushes and closes the specified |writeSession| and returns an error if anything failed.
func closeWriteSession(ctx *sql.Context, engine *gms.Engine, databaseName string, writeSession writer.WriteSession) error {
func closeWriteSession(ctx *sql.Context, engine *gms.Engine, databaseName string, writeSession dsess.WriteSession) error {
newWorkingSet, err := writeSession.Flush(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -727,7 +727,7 @@ func getTableSchema(ctx *sql.Context, engine *gms.Engine, tableName, databaseNam
}

// getTableWriter returns a WriteSession and a TableWriter for writing to the specified |table| in the specified |database|.
func getTableWriter(ctx *sql.Context, engine *gms.Engine, tableName, databaseName string, foreignKeyChecksDisabled bool) (writer.WriteSession, writer.TableWriter, error) {
func getTableWriter(ctx *sql.Context, engine *gms.Engine, tableName, databaseName string, foreignKeyChecksDisabled bool) (dsess.WriteSession, dsess.TableWriter, error) {
database, err := engine.Analyzer.Catalog.Database(ctx, databaseName)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -758,6 +758,7 @@ func getTableWriter(ctx *sql.Context, engine *gms.Engine, tableName, databaseNam

ds := dsess.DSessFromSess(ctx.Session)
setter := ds.SetWorkingRoot

tableWriter, err := writeSession.GetTableWriter(ctx, doltdb.TableName{Name: tableName}, databaseName, setter)
if err != nil {
return nil, nil, err
Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/sqle/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1418,7 +1418,7 @@ func (db Database) GetTriggers(ctx *sql.Context) ([]sql.TriggerDefinition, error

key, err := doltdb.NewDataCacheKey(root)
if err != nil {
max-hoffman marked this conversation as resolved.
Show resolved Hide resolved
return nil, nil
return nil, err
}

ds := dsess.DSessFromSess(ctx.Session)
Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (a AutoIncrementTracker) deepSet(ctx *sql.Context, tableName string, table
return nil, err
}

indexData, err = indexes.GetIndex(ctx, sch, aiIndex.Name())
indexData, err = indexes.GetIndex(ctx, sch, nil, aiIndex.Name())
if err != nil {
return nil, err
}
Expand Down
7 changes: 3 additions & 4 deletions go/libraries/doltcore/sqle/dsess/database_session_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/globalstate"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/writer"
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
"github.com/dolthub/dolt/go/libraries/utils/concurrentmap"
)
Expand Down Expand Up @@ -96,7 +95,7 @@ func newEmptyDatabaseSessionState() *DatabaseSessionState {
type SessionState interface {
WorkingSet() *doltdb.WorkingSet
WorkingRoot() doltdb.RootValue
WriteSession() writer.WriteSession
WriteSession() WriteSession
EditOpts() editor.Options
SessionCache() *SessionCache
}
Expand All @@ -120,7 +119,7 @@ type branchState struct {
// dbData is an accessor for the underlying doltDb
dbData env.DbData
// writeSession is this head's write session
writeSession writer.WriteSession
writeSession WriteSession
// readOnly is true if this database is read only
readOnly bool
// dirty is true if this branch state has uncommitted changes
Expand Down Expand Up @@ -161,7 +160,7 @@ func (bs *branchState) WorkingSet() *doltdb.WorkingSet {
return bs.workingSet
}

func (bs *branchState) WriteSession() writer.WriteSession {
func (bs *branchState) WriteSession() WriteSession {
return bs.writeSession
}

Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/sqle/dsess/dolt_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ import (
)

func TestDoltSessionInit(t *testing.T) {
dsess := DefaultSession(emptyDatabaseProvider())
dsess := DefaultSession(emptyDatabaseProvider(), nil)
conf := config.NewMapConfig(make(map[string]string))
assert.Equal(t, conf, dsess.globalsConf)
}

func TestNewPersistedSystemVariables(t *testing.T) {
dsess := DefaultSession(emptyDatabaseProvider())
dsess := DefaultSession(emptyDatabaseProvider(), nil)
conf := config.NewMapConfig(map[string]string{"max_connections": "1000"})
dsess = dsess.WithGlobals(conf)

Expand Down
14 changes: 11 additions & 3 deletions go/libraries/doltcore/sqle/dsess/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/env/actions"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/globalstate"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/writer"
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
"github.com/dolthub/dolt/go/libraries/utils/config"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
Expand Down Expand Up @@ -62,6 +61,7 @@ type DoltSession struct {
statsProv sql.StatsProvider
mu *sync.Mutex
fs filesys.Filesys
writeSessProv WriteSessFunc

// If non-nil, this will be returned from ValidateSession.
// Used by sqle/cluster to put a session into a terminal err state.
Expand All @@ -74,7 +74,7 @@ var _ sql.TransactionSession = (*DoltSession)(nil)
var _ branch_control.Context = (*DoltSession)(nil)

// DefaultSession creates a DoltSession with default values
func DefaultSession(pro DoltDatabaseProvider) *DoltSession {
func DefaultSession(pro DoltDatabaseProvider, sessFunc WriteSessFunc) *DoltSession {
return &DoltSession{
Session: sql.NewBaseSession(),
username: "",
Expand All @@ -87,6 +87,7 @@ func DefaultSession(pro DoltDatabaseProvider) *DoltSession {
branchController: branch_control.CreateDefaultController(context.TODO()), // Default sessions are fine with the default controller
mu: &sync.Mutex{},
fs: pro.FileSystem(),
writeSessProv: sessFunc,
}
}

Expand All @@ -97,6 +98,7 @@ func NewDoltSession(
conf config.ReadWriteConfig,
branchController *branch_control.Controller,
statsProvider sql.StatsProvider,
writeSessProv WriteSessFunc,
) (*DoltSession, error) {
username := conf.GetStringOrDefault(config.UserNameKey, "")
email := conf.GetStringOrDefault(config.UserEmailKey, "")
Expand All @@ -115,6 +117,7 @@ func NewDoltSession(
statsProv: statsProvider,
mu: &sync.Mutex{},
fs: pro.FileSystem(),
writeSessProv: writeSessProv,
}

return sess, nil
Expand Down Expand Up @@ -1253,7 +1256,7 @@ func (d *DoltSession) addDB(ctx *sql.Context, db SqlDatabase) error {
if err != nil {
return err
}
branchState.writeSession = writer.NewWriteSession(nbf, branchState.WorkingSet(), tracker, editOpts)
branchState.writeSession = d.writeSessProv(nbf, branchState.WorkingSet(), tracker, editOpts)
}

// WorkingSet is nil in the case of a read only, detached head DB
Expand Down Expand Up @@ -1742,3 +1745,8 @@ func DefaultHead(baseName string, db SqlDatabase) (string, error) {

return head, nil
}

// WriteSessFunc is a constructor that session builders use to
// create fresh table editors.
// The indirection avoids a writer/dsess package import cycle.
type WriteSessFunc func(nbf *types.NomsBinFormat, ws *doltdb.WorkingSet, aiTracker globalstate.AutoIncrementTracker, opts editor.Options) WriteSession
Loading
Loading