Skip to content
This repository has been archived by the owner on Jan 28, 2021. It is now read-only.

Commit

Permalink
sql: use checksums to determine if indexes are outdated
Browse files Browse the repository at this point in the history
Signed-off-by: Miguel Molina <miguel@erizocosmi.co>
  • Loading branch information
erizocosmico committed Jan 10, 2019
1 parent 09a67ca commit ce9e3a5
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 11 deletions.
68 changes: 59 additions & 9 deletions sql/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,21 @@ import (
"strings"
"sync"

"github.com/sirupsen/logrus"
"gopkg.in/src-d/go-errors.v1"
)

// IndexBatchSize is the number of rows to save at a time when creating indexes.
const IndexBatchSize = uint64(10000)

// ChecksumKey is the key in an index config to store the checksum.
const ChecksumKey = "checksum"

// Checksumable tables can return a checksum of their data.
type Checksumable interface {
Checksum() (string, error)
}

// PartitionIndexKeyValueIter is an iterator of partitions that will return
// the partition and the IndexKeyValueIter of that partition.
type PartitionIndexKeyValueIter interface {
Expand Down Expand Up @@ -211,17 +220,43 @@ func (r *IndexRegistry) LoadIndexes(dbs Databases) error {

for _, driver := range r.drivers {
for _, db := range dbs {
for t := range db.Tables() {
indexes, err := driver.LoadAll(db.Name(), t)
for _, t := range db.Tables() {
indexes, err := driver.LoadAll(db.Name(), t.Name())
if err != nil {
return err
}

var checksum string
if c, ok := t.(Checksumable); ok {
checksum, err = c.Checksum()
if err != nil {
return err
}
}

for _, idx := range indexes {
k := indexKey{db.Name(), idx.ID()}
r.indexes[k] = idx
r.indexOrder = append(r.indexOrder, k)
r.statuses[k] = IndexReady

var idxChecksum string
if c, ok := idx.(Checksumable); ok {
idxChecksum, err = c.Checksum()
if err != nil {
return err
}
}

if checksum == "" || checksum == idxChecksum {
r.statuses[k] = IndexReady
} else {
logrus.Warnf(
"index %q is outdated and will not be used, you can remove it using `DROP INDEX %s`",
idx.ID(),
idx.ID(),
)
r.statuses[k] = IndexOutdated
}
}
}
}
Expand All @@ -244,6 +279,18 @@ func (r *IndexRegistry) CanUseIndex(idx Index) bool {
return r.canUseIndex(idx)
}

// CanRemoveIndex returns whether the given index is ready to be removed.
func (r *IndexRegistry) CanRemoveIndex(idx Index) bool {
if idx == nil {
return false
}

r.mut.RLock()
defer r.mut.RUnlock()
status := r.statuses[indexKey{idx.Database(), idx.ID()}]
return status == IndexReady || status == IndexOutdated
}

func (r *IndexRegistry) canUseIndex(idx Index) bool {
if idx == nil {
return false
Expand Down Expand Up @@ -400,8 +447,8 @@ var (
ErrIndexNotFound = errors.NewKind("index %q was not found")

// ErrIndexDeleteInvalidStatus is returned when the index trying to delete
// does not have a ready state.
ErrIndexDeleteInvalidStatus = errors.NewKind("can't delete index %q because it's not ready for usage")
// does not have a ready or outdated state.
ErrIndexDeleteInvalidStatus = errors.NewKind("can't delete index %q because it's not ready for removal")
)

func (r *IndexRegistry) validateIndexToAdd(idx Index) error {
Expand Down Expand Up @@ -507,7 +554,7 @@ func (r *IndexRegistry) DeleteIndex(db, id string, force bool) (<-chan struct{},
var key indexKey
for k, idx := range r.indexes {
if strings.ToLower(id) == idx.ID() {
if !force && !r.CanUseIndex(idx) {
if !force && !r.CanRemoveIndex(idx) {
r.mut.RUnlock()
return nil, ErrIndexDeleteInvalidStatus.New(id)
}
Expand Down Expand Up @@ -563,13 +610,16 @@ func (r *IndexRegistry) DeleteIndex(db, id string, force bool) (<-chan struct{},
}

// IndexStatus represents the current status in which the index is.
type IndexStatus bool
type IndexStatus byte

const (
// IndexNotReady means the index is not ready to be used.
IndexNotReady IndexStatus = false
IndexNotReady IndexStatus = iota
// IndexReady means the index can be used.
IndexReady IndexStatus = true
IndexReady
// IndexOutdated means the index is loaded but will not be used because the
// contents in it are outdated.
IndexOutdated
)

// IsUsable returns whether the index can be used or not based on the status.
Expand Down
14 changes: 14 additions & 0 deletions sql/index/pilosa/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,33 @@ type pilosaIndex struct {
table string
id string
expressions []string
checksum string
}

func newPilosaIndex(idx *pilosa.Index, mapping *mapping, cfg *index.Config) *pilosaIndex {
var checksum string
for _, c := range cfg.Drivers {
if ch, ok := c[sql.ChecksumKey]; ok {
checksum = ch
}
break
}

return &pilosaIndex{
index: newConcurrentPilosaIndex(idx),
db: cfg.DB,
table: cfg.Table,
id: cfg.ID,
expressions: cfg.Expressions,
mapping: mapping,
checksum: checksum,
}
}

func (idx *pilosaIndex) Checksum() (string, error) {
return idx.checksum, nil
}

// Get returns an IndexLookup for the given key in the index.
// If key parameter is not present then the returned iterator
// will go through all the locations on the index.
Expand Down
52 changes: 52 additions & 0 deletions sql/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,40 @@ func TestLoadIndexes(t *testing.T) {
}
}

func TestLoadOutdatedIndexes(t *testing.T) {
require := require.New(t)

d := &loadDriver{id: "d1", indexes: []Index{
&checksumIndex{&dummyIdx{id: "idx1", database: "db1", table: "t1"}, "2"},
&checksumIndex{&dummyIdx{id: "idx2", database: "db1", table: "t2"}, "2"},
}}

registry := NewIndexRegistry()
registry.RegisterIndexDriver(d)

dbs := Databases{
dummyDB{
name: "db1",
tables: map[string]Table{
"t1": &checksumTable{&dummyTable{name: "t1"}, "2"},
"t2": &checksumTable{&dummyTable{name: "t2"}, "1"},
},
},
}

require.NoError(registry.LoadIndexes(dbs))

var result []Index
for _, idx := range registry.indexes {
result = append(result, idx)
}

require.ElementsMatch(d.indexes, result)

require.Equal(registry.statuses[indexKey{"db1", "idx1"}], IndexReady)
require.Equal(registry.statuses[indexKey{"db1", "idx2"}], IndexOutdated)
}

type dummyDB struct {
name string
tables map[string]Table
Expand Down Expand Up @@ -377,3 +411,21 @@ func (dummyExpr) Type() Type { panic("not implemented") }
func (e dummyExpr) WithIndex(idx int) Expression {
return &dummyExpr{idx, e.colName}
}

type checksumTable struct {
Table
checksum string
}

func (t *checksumTable) Checksum() (string, error) {
return t.checksum, nil
}

type checksumIndex struct {
Index
checksum string
}

func (idx *checksumIndex) Checksum() (string, error) {
return idx.checksum, nil
}
9 changes: 8 additions & 1 deletion sql/plan/create_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func getIndexableTable(t sql.Table) (sql.IndexableTable, error) {
case sql.TableWrapper:
return getIndexableTable(t.Underlying())
default:
return nil, ErrInsertIntoNotSupported.New()
return nil, ErrNotIndexable.New()
}
}

Expand Down Expand Up @@ -119,6 +119,13 @@ func (c *CreateIndex) RowIter(ctx *sql.Context) (sql.RowIter, error) {
}
}

if ch, ok := table.Table.(sql.Checksumable); ok {
c.Config[sql.ChecksumKey], err = ch.Checksum()
if err != nil {
return nil, err
}
}

index, err := driver.Create(
c.CurrentDatabase,
table.Name(),
Expand Down
57 changes: 56 additions & 1 deletion sql/plan/create_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,44 @@ func TestCreateIndexSync(t *testing.T) {
require.True(found)
}

func TestCreateIndexChecksum(t *testing.T) {
require := require.New(t)

table := &checksumTable{
mem.NewTable("foo", sql.Schema{
{Name: "a", Source: "foo"},
{Name: "b", Source: "foo"},
{Name: "c", Source: "foo"},
}),
"1",
}

driver := new(mockDriver)
catalog := sql.NewCatalog()
catalog.RegisterIndexDriver(driver)
db := mem.NewDatabase("foo")
db.AddTable("foo", table)
catalog.AddDatabase(db)

exprs := []sql.Expression{
expression.NewGetFieldWithTable(2, sql.Int64, "foo", "c", true),
expression.NewGetFieldWithTable(0, sql.Int64, "foo", "a", true),
}

ci := NewCreateIndex(
"idx", NewResolvedTable(table), exprs, "mock",
map[string]string{"async": "false"},
)
ci.Catalog = catalog
ci.CurrentDatabase = "foo"

_, err := ci.RowIter(sql.NewEmptyContext())
require.NoError(err)

require.Equal([]string{"idx"}, driver.saved)
require.Equal("1", driver.config["idx"][sql.ChecksumKey])
}

func TestCreateIndexWithIter(t *testing.T) {
require := require.New(t)
foo := mem.NewPartitionedTable("foo", sql.Schema{
Expand Down Expand Up @@ -283,14 +321,20 @@ func (i *mockIndex) Has(sql.Partition, ...interface{}) (bool, error) {
func (*mockIndex) Driver() string { return "mock" }

type mockDriver struct {
config map[string]map[string]string
deleted []string
saved []string
}

var _ sql.IndexDriver = (*mockDriver)(nil)

func (*mockDriver) ID() string { return "mock" }
func (*mockDriver) Create(db, table, id string, exprs []sql.Expression, config map[string]string) (sql.Index, error) {
func (d *mockDriver) Create(db, table, id string, exprs []sql.Expression, config map[string]string) (sql.Index, error) {
if d.config == nil {
d.config = make(map[string]map[string]string)
}
d.config[id] = config

return &mockIndex{db, table, id, exprs}, nil
}
func (*mockDriver) LoadAll(db, table string) ([]sql.Index, error) {
Expand All @@ -305,3 +349,14 @@ func (d *mockDriver) Delete(index sql.Index, _ sql.PartitionIter) error {
d.deleted = append(d.deleted, index.ID())
return nil
}

type checksumTable struct {
sql.Table
checksum string
}

func (t *checksumTable) Checksum() (string, error) {
return t.checksum, nil
}

func (t *checksumTable) Underlying() sql.Table { return t.Table }

0 comments on commit ce9e3a5

Please sign in to comment.