From 7d82c68c39249cc386fe910958fbc751f138f321 Mon Sep 17 00:00:00 2001 From: Miguel Molina Date: Thu, 10 Jan 2019 15:46:10 +0100 Subject: [PATCH] sql: use checksums to determine if indexes are outdated Signed-off-by: Miguel Molina --- sql/index.go | 70 ++++++++++++++++++++++++++++++----- sql/index/pilosa/index.go | 14 +++++++ sql/index_test.go | 52 ++++++++++++++++++++++++++ sql/plan/create_index.go | 9 ++++- sql/plan/create_index_test.go | 57 +++++++++++++++++++++++++++- 5 files changed, 191 insertions(+), 11 deletions(-) diff --git a/sql/index.go b/sql/index.go index 3965b0b5b..d726362c9 100644 --- a/sql/index.go +++ b/sql/index.go @@ -5,12 +5,23 @@ import ( "strings" "sync" + "github.com/sirupsen/logrus" "gopkg.in/src-d/go-errors.v1" ) // IndexBatchSize is the number of rows to save at a time when creating indexes. const IndexBatchSize = uint64(10000) +// ChecksumKey is the key in an index config to store the checksum. +const ChecksumKey = "checksum" + +// Checksumable provides the checksum of some data. +type Checksumable interface { + // Checksum returns a checksum and an error if there was any problem + // computing or obtaining the checksum. + Checksum() (string, error) +} + // PartitionIndexKeyValueIter is an iterator of partitions that will return // the partition and the IndexKeyValueIter of that partition. type PartitionIndexKeyValueIter interface { @@ -211,17 +222,43 @@ func (r *IndexRegistry) LoadIndexes(dbs Databases) error { for _, driver := range r.drivers { for _, db := range dbs { - for t := range db.Tables() { - indexes, err := driver.LoadAll(db.Name(), t) + for _, t := range db.Tables() { + indexes, err := driver.LoadAll(db.Name(), t.Name()) if err != nil { return err } + var checksum string + if c, ok := t.(Checksumable); ok { + checksum, err = c.Checksum() + if err != nil { + return err + } + } + for _, idx := range indexes { k := indexKey{db.Name(), idx.ID()} r.indexes[k] = idx r.indexOrder = append(r.indexOrder, k) - r.statuses[k] = IndexReady + + var idxChecksum string + if c, ok := idx.(Checksumable); ok { + idxChecksum, err = c.Checksum() + if err != nil { + return err + } + } + + if checksum == "" || checksum == idxChecksum { + r.statuses[k] = IndexReady + } else { + logrus.Warnf( + "index %q is outdated and will not be used, you can remove it using `DROP INDEX %s`", + idx.ID(), + idx.ID(), + ) + r.statuses[k] = IndexOutdated + } } } } @@ -244,6 +281,18 @@ func (r *IndexRegistry) CanUseIndex(idx Index) bool { return r.canUseIndex(idx) } +// CanRemoveIndex returns whether the given index is ready to be removed. +func (r *IndexRegistry) CanRemoveIndex(idx Index) bool { + if idx == nil { + return false + } + + r.mut.RLock() + defer r.mut.RUnlock() + status := r.statuses[indexKey{idx.Database(), idx.ID()}] + return status == IndexReady || status == IndexOutdated +} + func (r *IndexRegistry) canUseIndex(idx Index) bool { if idx == nil { return false @@ -400,8 +449,8 @@ var ( ErrIndexNotFound = errors.NewKind("index %q was not found") // ErrIndexDeleteInvalidStatus is returned when the index trying to delete - // does not have a ready state. - ErrIndexDeleteInvalidStatus = errors.NewKind("can't delete index %q because it's not ready for usage") + // does not have a ready or outdated state. + ErrIndexDeleteInvalidStatus = errors.NewKind("can't delete index %q because it's not ready for removal") ) func (r *IndexRegistry) validateIndexToAdd(idx Index) error { @@ -507,7 +556,7 @@ func (r *IndexRegistry) DeleteIndex(db, id string, force bool) (<-chan struct{}, var key indexKey for k, idx := range r.indexes { if strings.ToLower(id) == idx.ID() { - if !force && !r.CanUseIndex(idx) { + if !force && !r.CanRemoveIndex(idx) { r.mut.RUnlock() return nil, ErrIndexDeleteInvalidStatus.New(id) } @@ -563,13 +612,16 @@ func (r *IndexRegistry) DeleteIndex(db, id string, force bool) (<-chan struct{}, } // IndexStatus represents the current status in which the index is. -type IndexStatus bool +type IndexStatus byte const ( // IndexNotReady means the index is not ready to be used. - IndexNotReady IndexStatus = false + IndexNotReady IndexStatus = iota // IndexReady means the index can be used. - IndexReady IndexStatus = true + IndexReady + // IndexOutdated means the index is loaded but will not be used because the + // contents in it are outdated. + IndexOutdated ) // IsUsable returns whether the index can be used or not based on the status. diff --git a/sql/index/pilosa/index.go b/sql/index/pilosa/index.go index 00fd4db53..bd9b6d88e 100644 --- a/sql/index/pilosa/index.go +++ b/sql/index/pilosa/index.go @@ -67,9 +67,18 @@ type pilosaIndex struct { table string id string expressions []string + checksum string } func newPilosaIndex(idx *pilosa.Index, mapping *mapping, cfg *index.Config) *pilosaIndex { + var checksum string + for _, c := range cfg.Drivers { + if ch, ok := c[sql.ChecksumKey]; ok { + checksum = ch + } + break + } + return &pilosaIndex{ index: newConcurrentPilosaIndex(idx), db: cfg.DB, @@ -77,9 +86,14 @@ func newPilosaIndex(idx *pilosa.Index, mapping *mapping, cfg *index.Config) *pil id: cfg.ID, expressions: cfg.Expressions, mapping: mapping, + checksum: checksum, } } +func (idx *pilosaIndex) Checksum() (string, error) { + return idx.checksum, nil +} + // Get returns an IndexLookup for the given key in the index. // If key parameter is not present then the returned iterator // will go through all the locations on the index. diff --git a/sql/index_test.go b/sql/index_test.go index 9c9889fd6..da42c2464 100644 --- a/sql/index_test.go +++ b/sql/index_test.go @@ -299,6 +299,40 @@ func TestLoadIndexes(t *testing.T) { } } +func TestLoadOutdatedIndexes(t *testing.T) { + require := require.New(t) + + d := &loadDriver{id: "d1", indexes: []Index{ + &checksumIndex{&dummyIdx{id: "idx1", database: "db1", table: "t1"}, "2"}, + &checksumIndex{&dummyIdx{id: "idx2", database: "db1", table: "t2"}, "2"}, + }} + + registry := NewIndexRegistry() + registry.RegisterIndexDriver(d) + + dbs := Databases{ + dummyDB{ + name: "db1", + tables: map[string]Table{ + "t1": &checksumTable{&dummyTable{name: "t1"}, "2"}, + "t2": &checksumTable{&dummyTable{name: "t2"}, "1"}, + }, + }, + } + + require.NoError(registry.LoadIndexes(dbs)) + + var result []Index + for _, idx := range registry.indexes { + result = append(result, idx) + } + + require.ElementsMatch(d.indexes, result) + + require.Equal(registry.statuses[indexKey{"db1", "idx1"}], IndexReady) + require.Equal(registry.statuses[indexKey{"db1", "idx2"}], IndexOutdated) +} + type dummyDB struct { name string tables map[string]Table @@ -377,3 +411,21 @@ func (dummyExpr) Type() Type { panic("not implemented") } func (e dummyExpr) WithIndex(idx int) Expression { return &dummyExpr{idx, e.colName} } + +type checksumTable struct { + Table + checksum string +} + +func (t *checksumTable) Checksum() (string, error) { + return t.checksum, nil +} + +type checksumIndex struct { + Index + checksum string +} + +func (idx *checksumIndex) Checksum() (string, error) { + return idx.checksum, nil +} diff --git a/sql/plan/create_index.go b/sql/plan/create_index.go index 1ecd7e43e..4a1569b3f 100644 --- a/sql/plan/create_index.go +++ b/sql/plan/create_index.go @@ -81,7 +81,7 @@ func getIndexableTable(t sql.Table) (sql.IndexableTable, error) { case sql.TableWrapper: return getIndexableTable(t.Underlying()) default: - return nil, ErrInsertIntoNotSupported.New() + return nil, ErrNotIndexable.New() } } @@ -119,6 +119,13 @@ func (c *CreateIndex) RowIter(ctx *sql.Context) (sql.RowIter, error) { } } + if ch, ok := table.Table.(sql.Checksumable); ok { + c.Config[sql.ChecksumKey], err = ch.Checksum() + if err != nil { + return nil, err + } + } + index, err := driver.Create( c.CurrentDatabase, table.Name(), diff --git a/sql/plan/create_index_test.go b/sql/plan/create_index_test.go index 00ef852cd..c200a4030 100644 --- a/sql/plan/create_index_test.go +++ b/sql/plan/create_index_test.go @@ -171,6 +171,44 @@ func TestCreateIndexSync(t *testing.T) { require.True(found) } +func TestCreateIndexChecksum(t *testing.T) { + require := require.New(t) + + table := &checksumTable{ + mem.NewTable("foo", sql.Schema{ + {Name: "a", Source: "foo"}, + {Name: "b", Source: "foo"}, + {Name: "c", Source: "foo"}, + }), + "1", + } + + driver := new(mockDriver) + catalog := sql.NewCatalog() + catalog.RegisterIndexDriver(driver) + db := mem.NewDatabase("foo") + db.AddTable("foo", table) + catalog.AddDatabase(db) + + exprs := []sql.Expression{ + expression.NewGetFieldWithTable(2, sql.Int64, "foo", "c", true), + expression.NewGetFieldWithTable(0, sql.Int64, "foo", "a", true), + } + + ci := NewCreateIndex( + "idx", NewResolvedTable(table), exprs, "mock", + map[string]string{"async": "false"}, + ) + ci.Catalog = catalog + ci.CurrentDatabase = "foo" + + _, err := ci.RowIter(sql.NewEmptyContext()) + require.NoError(err) + + require.Equal([]string{"idx"}, driver.saved) + require.Equal("1", driver.config["idx"][sql.ChecksumKey]) +} + func TestCreateIndexWithIter(t *testing.T) { require := require.New(t) foo := mem.NewPartitionedTable("foo", sql.Schema{ @@ -283,6 +321,7 @@ func (i *mockIndex) Has(sql.Partition, ...interface{}) (bool, error) { func (*mockIndex) Driver() string { return "mock" } type mockDriver struct { + config map[string]map[string]string deleted []string saved []string } @@ -290,7 +329,12 @@ type mockDriver struct { var _ sql.IndexDriver = (*mockDriver)(nil) func (*mockDriver) ID() string { return "mock" } -func (*mockDriver) Create(db, table, id string, exprs []sql.Expression, config map[string]string) (sql.Index, error) { +func (d *mockDriver) Create(db, table, id string, exprs []sql.Expression, config map[string]string) (sql.Index, error) { + if d.config == nil { + d.config = make(map[string]map[string]string) + } + d.config[id] = config + return &mockIndex{db, table, id, exprs}, nil } func (*mockDriver) LoadAll(db, table string) ([]sql.Index, error) { @@ -305,3 +349,14 @@ func (d *mockDriver) Delete(index sql.Index, _ sql.PartitionIter) error { d.deleted = append(d.deleted, index.ID()) return nil } + +type checksumTable struct { + sql.Table + checksum string +} + +func (t *checksumTable) Checksum() (string, error) { + return t.checksum, nil +} + +func (t *checksumTable) Underlying() sql.Table { return t.Table }