Skip to content
This repository has been archived by the owner on Jan 28, 2021. It is now read-only.

sql: use checksums to determine if indexes are outdated #583

Merged
merged 1 commit into from
Jan 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 61 additions & 9 deletions sql/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,23 @@ import (
"strings"
"sync"

"github.com/sirupsen/logrus"
"gopkg.in/src-d/go-errors.v1"
)

// IndexBatchSize is the number of rows to save at a time when creating indexes.
const IndexBatchSize = uint64(10000)

// ChecksumKey is the key in an index config to store the checksum.
const ChecksumKey = "checksum"

// Checksumable provides the checksum of some data.
type Checksumable interface {
// Checksum returns a checksum and an error if there was any problem
// computing or obtaining the checksum.
Checksum() (string, error)
}

// PartitionIndexKeyValueIter is an iterator of partitions that will return
// the partition and the IndexKeyValueIter of that partition.
type PartitionIndexKeyValueIter interface {
Expand Down Expand Up @@ -211,17 +222,43 @@ func (r *IndexRegistry) LoadIndexes(dbs Databases) error {

for _, driver := range r.drivers {
for _, db := range dbs {
for t := range db.Tables() {
indexes, err := driver.LoadAll(db.Name(), t)
for _, t := range db.Tables() {
indexes, err := driver.LoadAll(db.Name(), t.Name())
if err != nil {
return err
}

var checksum string
if c, ok := t.(Checksumable); ok {
checksum, err = c.Checksum()
if err != nil {
return err
}
}

for _, idx := range indexes {
k := indexKey{db.Name(), idx.ID()}
r.indexes[k] = idx
r.indexOrder = append(r.indexOrder, k)
r.statuses[k] = IndexReady

var idxChecksum string
if c, ok := idx.(Checksumable); ok {
idxChecksum, err = c.Checksum()
if err != nil {
return err
}
}

if checksum == "" || checksum == idxChecksum {
r.statuses[k] = IndexReady
} else {
logrus.Warnf(
"index %q is outdated and will not be used, you can remove it using `DROP INDEX %s`",
idx.ID(),
idx.ID(),
)
r.statuses[k] = IndexOutdated
}
}
}
}
Expand All @@ -244,6 +281,18 @@ func (r *IndexRegistry) CanUseIndex(idx Index) bool {
return r.canUseIndex(idx)
}

// CanRemoveIndex returns whether the given index is ready to be removed.
func (r *IndexRegistry) CanRemoveIndex(idx Index) bool {
if idx == nil {
return false
}

r.mut.RLock()
defer r.mut.RUnlock()
status := r.statuses[indexKey{idx.Database(), idx.ID()}]
return status == IndexReady || status == IndexOutdated
}

func (r *IndexRegistry) canUseIndex(idx Index) bool {
if idx == nil {
return false
Expand Down Expand Up @@ -400,8 +449,8 @@ var (
ErrIndexNotFound = errors.NewKind("index %q was not found")

// ErrIndexDeleteInvalidStatus is returned when the index trying to delete
// does not have a ready state.
ErrIndexDeleteInvalidStatus = errors.NewKind("can't delete index %q because it's not ready for usage")
// does not have a ready or outdated state.
ErrIndexDeleteInvalidStatus = errors.NewKind("can't delete index %q because it's not ready for removal")
)

func (r *IndexRegistry) validateIndexToAdd(idx Index) error {
Expand Down Expand Up @@ -507,7 +556,7 @@ func (r *IndexRegistry) DeleteIndex(db, id string, force bool) (<-chan struct{},
var key indexKey
for k, idx := range r.indexes {
if strings.ToLower(id) == idx.ID() {
if !force && !r.CanUseIndex(idx) {
if !force && !r.CanRemoveIndex(idx) {
r.mut.RUnlock()
return nil, ErrIndexDeleteInvalidStatus.New(id)
}
Expand Down Expand Up @@ -563,13 +612,16 @@ func (r *IndexRegistry) DeleteIndex(db, id string, force bool) (<-chan struct{},
}

// IndexStatus represents the current status in which the index is.
type IndexStatus bool
type IndexStatus byte

const (
// IndexNotReady means the index is not ready to be used.
IndexNotReady IndexStatus = false
IndexNotReady IndexStatus = iota
// IndexReady means the index can be used.
IndexReady IndexStatus = true
IndexReady
// IndexOutdated means the index is loaded but will not be used because the
// contents in it are outdated.
IndexOutdated
)

// IsUsable returns whether the index can be used or not based on the status.
Expand Down
14 changes: 14 additions & 0 deletions sql/index/pilosa/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,33 @@ type pilosaIndex struct {
table string
id string
expressions []string
checksum string
}

func newPilosaIndex(idx *pilosa.Index, mapping *mapping, cfg *index.Config) *pilosaIndex {
var checksum string
for _, c := range cfg.Drivers {
if ch, ok := c[sql.ChecksumKey]; ok {
checksum = ch
}
break
}

return &pilosaIndex{
index: newConcurrentPilosaIndex(idx),
db: cfg.DB,
table: cfg.Table,
id: cfg.ID,
expressions: cfg.Expressions,
mapping: mapping,
checksum: checksum,
}
}

func (idx *pilosaIndex) Checksum() (string, error) {
return idx.checksum, nil
}

// Get returns an IndexLookup for the given key in the index.
// If key parameter is not present then the returned iterator
// will go through all the locations on the index.
Expand Down
52 changes: 52 additions & 0 deletions sql/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,40 @@ func TestLoadIndexes(t *testing.T) {
}
}

func TestLoadOutdatedIndexes(t *testing.T) {
require := require.New(t)

d := &loadDriver{id: "d1", indexes: []Index{
&checksumIndex{&dummyIdx{id: "idx1", database: "db1", table: "t1"}, "2"},
&checksumIndex{&dummyIdx{id: "idx2", database: "db1", table: "t2"}, "2"},
}}

registry := NewIndexRegistry()
registry.RegisterIndexDriver(d)

dbs := Databases{
dummyDB{
name: "db1",
tables: map[string]Table{
"t1": &checksumTable{&dummyTable{name: "t1"}, "2"},
"t2": &checksumTable{&dummyTable{name: "t2"}, "1"},
},
},
}

require.NoError(registry.LoadIndexes(dbs))

var result []Index
for _, idx := range registry.indexes {
result = append(result, idx)
}

require.ElementsMatch(d.indexes, result)

require.Equal(registry.statuses[indexKey{"db1", "idx1"}], IndexReady)
require.Equal(registry.statuses[indexKey{"db1", "idx2"}], IndexOutdated)
}

type dummyDB struct {
name string
tables map[string]Table
Expand Down Expand Up @@ -377,3 +411,21 @@ func (dummyExpr) Type() Type { panic("not implemented") }
func (e dummyExpr) WithIndex(idx int) Expression {
return &dummyExpr{idx, e.colName}
}

type checksumTable struct {
Table
checksum string
}

func (t *checksumTable) Checksum() (string, error) {
return t.checksum, nil
}

type checksumIndex struct {
Index
checksum string
}

func (idx *checksumIndex) Checksum() (string, error) {
return idx.checksum, nil
}
9 changes: 8 additions & 1 deletion sql/plan/create_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func getIndexableTable(t sql.Table) (sql.IndexableTable, error) {
case sql.TableWrapper:
return getIndexableTable(t.Underlying())
default:
return nil, ErrInsertIntoNotSupported.New()
return nil, ErrNotIndexable.New()
}
}

Expand Down Expand Up @@ -119,6 +119,13 @@ func (c *CreateIndex) RowIter(ctx *sql.Context) (sql.RowIter, error) {
}
}

if ch, ok := table.Table.(sql.Checksumable); ok {
c.Config[sql.ChecksumKey], err = ch.Checksum()
if err != nil {
return nil, err
}
}

index, err := driver.Create(
c.CurrentDatabase,
table.Name(),
Expand Down
57 changes: 56 additions & 1 deletion sql/plan/create_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,44 @@ func TestCreateIndexSync(t *testing.T) {
require.True(found)
}

func TestCreateIndexChecksum(t *testing.T) {
require := require.New(t)

table := &checksumTable{
mem.NewTable("foo", sql.Schema{
{Name: "a", Source: "foo"},
{Name: "b", Source: "foo"},
{Name: "c", Source: "foo"},
}),
"1",
}

driver := new(mockDriver)
catalog := sql.NewCatalog()
catalog.RegisterIndexDriver(driver)
db := mem.NewDatabase("foo")
db.AddTable("foo", table)
catalog.AddDatabase(db)

exprs := []sql.Expression{
expression.NewGetFieldWithTable(2, sql.Int64, "foo", "c", true),
expression.NewGetFieldWithTable(0, sql.Int64, "foo", "a", true),
}

ci := NewCreateIndex(
"idx", NewResolvedTable(table), exprs, "mock",
map[string]string{"async": "false"},
)
ci.Catalog = catalog
ci.CurrentDatabase = "foo"

_, err := ci.RowIter(sql.NewEmptyContext())
require.NoError(err)

require.Equal([]string{"idx"}, driver.saved)
require.Equal("1", driver.config["idx"][sql.ChecksumKey])
}

func TestCreateIndexWithIter(t *testing.T) {
require := require.New(t)
foo := mem.NewPartitionedTable("foo", sql.Schema{
Expand Down Expand Up @@ -283,14 +321,20 @@ func (i *mockIndex) Has(sql.Partition, ...interface{}) (bool, error) {
func (*mockIndex) Driver() string { return "mock" }

type mockDriver struct {
config map[string]map[string]string
deleted []string
saved []string
}

var _ sql.IndexDriver = (*mockDriver)(nil)

func (*mockDriver) ID() string { return "mock" }
func (*mockDriver) Create(db, table, id string, exprs []sql.Expression, config map[string]string) (sql.Index, error) {
func (d *mockDriver) Create(db, table, id string, exprs []sql.Expression, config map[string]string) (sql.Index, error) {
if d.config == nil {
d.config = make(map[string]map[string]string)
}
d.config[id] = config

return &mockIndex{db, table, id, exprs}, nil
}
func (*mockDriver) LoadAll(db, table string) ([]sql.Index, error) {
Expand All @@ -305,3 +349,14 @@ func (d *mockDriver) Delete(index sql.Index, _ sql.PartitionIter) error {
d.deleted = append(d.deleted, index.ID())
return nil
}

type checksumTable struct {
sql.Table
checksum string
}

func (t *checksumTable) Checksum() (string, error) {
return t.checksum, nil
}

func (t *checksumTable) Underlying() sql.Table { return t.Table }