Skip to content
This repository has been archived by the owner on Jan 28, 2021. It is now read-only.

Commit

Permalink
Merge pull request #354 from kuba--/enh-343/cancel_by_drop
Browse files Browse the repository at this point in the history
Drop indexes in progress.
  • Loading branch information
ajnavarro authored Sep 7, 2018
2 parents ad0eefd + 459b8b8 commit 56886a2
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 34 deletions.
2 changes: 1 addition & 1 deletion sql/analyzer/index_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"gopkg.in/src-d/go-mysql-server.v0/sql/plan"
)

// indexCatalog sets the catalog in the CreateIndexm, DropIndex and ShowIndexes nodes.
// indexCatalog sets the catalog in the CreateIndex, DropIndex and ShowIndexes nodes.
func indexCatalog(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error) {
if !n.Resolved() {
return n, nil
Expand Down
13 changes: 3 additions & 10 deletions sql/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,7 @@ func (r *IndexRegistry) Index(db, id string) Index {
defer r.mut.RUnlock()

r.retainIndex(db, id)
idx := r.indexes[indexKey{db, strings.ToLower(id)}]
if idx != nil && !r.canUseIndex(idx) {
return nil
}

return idx
return r.indexes[indexKey{db, strings.ToLower(id)}]
}

// IndexesByTable returns a slice of all the indexes existing on the given table.
Expand All @@ -295,9 +290,7 @@ func (r *IndexRegistry) IndexesByTable(db, table string) []Index {
indexes := []Index{}
for _, key := range r.indexOrder {
idx := r.indexes[key]
if idx.Database() == db &&
idx.Table() == table && r.statuses[key] == IndexReady {

if idx.Database() == db && idx.Table() == table {
indexes = append(indexes, idx)
r.retainIndex(db, idx.ID())
}
Expand Down Expand Up @@ -533,7 +526,7 @@ func (r *IndexRegistry) DeleteIndex(db, id string, force bool) (<-chan struct{},

r.rcmut.Lock()
// If no query is using this index just delete it right away
if r.refCounts[key] <= 0 {
if force || r.refCounts[key] <= 0 {
r.mut.Lock()
defer r.mut.Unlock()
defer r.rcmut.Unlock()
Expand Down
27 changes: 20 additions & 7 deletions sql/index/pilosa/driver.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pilosa

import (
"context"
"crypto/sha1"
"fmt"
"io"
Expand Down Expand Up @@ -232,20 +233,21 @@ func (d *Driver) savePartition(
for colID = offset; err == nil; colID++ {
// commit each batch of objects (pilosa and boltdb)
if colID%sql.IndexBatchSize == 0 && colID != 0 {
d.saveBatch(ctx, idx.mapping, colID)
if err = d.saveBatch(ctx, idx.mapping, colID); err != nil {
return 0, err
}
}

select {
case <-ctx.Done():
return 0, ctx.Err()
case <-ctx.Context.Done():
return 0, ctx.Context.Err()

default:
var (
values []interface{}
location []byte
)
values, location, err = kviter.Next()
if err != nil {
if values, location, err = kviter.Next(); err != nil {
break
}

Expand Down Expand Up @@ -291,7 +293,9 @@ func (d *Driver) Save(
if !ok {
return errInvalidIndexType.New(i)
}

idx.wg.Add(1)
defer idx.wg.Done()
ctx.Context, idx.cancel = context.WithCancel(ctx.Context)
processingFile := d.processingFilePath(idx.Database(), idx.Table(), idx.ID())
if err = index.CreateProcessingFile(processingFile); err != nil {
return err
Expand Down Expand Up @@ -337,7 +341,16 @@ func (d *Driver) Save(
}

// Delete the given index for all partitions in the iterator.
func (d *Driver) Delete(idx sql.Index, partitions sql.PartitionIter) error {
func (d *Driver) Delete(i sql.Index, partitions sql.PartitionIter) error {
idx, ok := i.(*pilosaIndex)
if !ok {
return errInvalidIndexType.New(i)
}
if idx.cancel != nil {
idx.cancel()
idx.wg.Wait()
}

if err := os.RemoveAll(filepath.Join(d.root, idx.Database(), idx.Table(), idx.ID())); err != nil {
return err
}
Expand Down
42 changes: 42 additions & 0 deletions sql/index/pilosa/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func withoutMapping(a sql.Index) sql.Index {
if i, ok := a.(*pilosaIndex); ok {
b := *i
b.mapping = nil
b.cancel = nil
return &b
}
return a
Expand Down Expand Up @@ -261,6 +262,41 @@ func TestDelete(t *testing.T) {
require.NoError(err)
}

func TestDeleteInProgress(t *testing.T) {
require := require.New(t)
setup(t)
defer cleanup(t)

db, table, id := "db_name", "table_name", "index_id"

expressions := []sql.Expression{
expression.NewGetFieldWithTable(0, sql.Int64, table, "lang", true),
expression.NewGetFieldWithTable(1, sql.Int64, table, "hash", true),
}

d := NewIndexDriver(tmpDir)
sqlIdx, err := d.Create(db, table, id, expressions, nil)
require.NoError(err)

it := &partitionKeyValueIter{
partitions: 2,
offset: 0,
total: 1024,
expressions: sqlIdx.Expressions(),
location: slowRandLocation,
}

go func() {
if e := d.Save(sql.NewEmptyContext(), sqlIdx, it); e != nil {
t.Log(e)
}
}()

time.Sleep(time.Second)
err = d.Delete(sqlIdx, new(partitionIter))
require.NoError(err)
}

func TestLoadAllDirectoryDoesNotExist(t *testing.T) {
require := require.New(t)
setup(t)
Expand Down Expand Up @@ -873,6 +909,12 @@ func randLocation(partition sql.Partition, offset int) string {
return string(partition.Key()) + "-" + string(b)
}

func slowRandLocation(partition sql.Partition, offset int) string {
defer time.Sleep(200 * time.Millisecond)

return randLocation(partition, offset)
}

func offsetLocation(partition sql.Partition, offset int) string {
return string(partition.Key()) + "-" + fmt.Sprint(offset)
}
Expand Down
5 changes: 5 additions & 0 deletions sql/index/pilosa/index.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package pilosa

import (
"context"
"sync"

errors "gopkg.in/src-d/go-errors.v1"

pilosa "github.com/pilosa/go-pilosa"
Expand All @@ -12,6 +15,8 @@ import (
type pilosaIndex struct {
client *pilosa.Client
mapping *mapping
cancel context.CancelFunc
wg sync.WaitGroup

db string
table string
Expand Down
31 changes: 20 additions & 11 deletions sql/index/pilosalib/driver.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pilosalib

import (
"context"
"crypto/sha1"
"fmt"
"io"
Expand Down Expand Up @@ -272,20 +273,21 @@ func (d *Driver) savePartition(
for colID = offset; err == nil; colID++ {
// commit each batch of objects (pilosa and boltdb)
if colID%sql.IndexBatchSize == 0 && colID != 0 {
d.saveBatch(ctx, idx.mapping, colID)
if err = d.saveBatch(ctx, idx.mapping, colID); err != nil {
return 0, err
}
}

select {
case <-ctx.Done():
return 0, ctx.Err()
case <-ctx.Context.Done():
return 0, ctx.Context.Err()

default:
var (
values []interface{}
location []byte
)
values, location, err = kviter.Next()
if err != nil {
if values, location, err = kviter.Next(); err != nil {
break
}

Expand Down Expand Up @@ -332,6 +334,9 @@ func (d *Driver) Save(
return errInvalidIndexType.New(i)
}

idx.wg.Add(1)
defer idx.wg.Done()
ctx.Context, idx.cancel = context.WithCancel(ctx.Context)
processingFile := d.processingFilePath(i.Database(), i.Table(), i.ID())
if err := index.WriteProcessingFile(
processingFile,
Expand Down Expand Up @@ -377,14 +382,18 @@ func (d *Driver) Save(

// Delete the given index for all partitions in the iterator.
func (d *Driver) Delete(i sql.Index, partitions sql.PartitionIter) error {
if err := os.RemoveAll(filepath.Join(d.root, i.Database(), i.Table(), i.ID())); err != nil {
return err
}

idx, ok := i.(*pilosaIndex)
if !ok {
return errInvalidIndexType.New(i)
}
if idx.cancel != nil {
idx.cancel()
idx.wg.Wait()
}

if err := os.RemoveAll(filepath.Join(d.root, i.Database(), i.Table(), i.ID())); err != nil {
return err
}

err := idx.index.Open()
if err != nil {
Expand Down Expand Up @@ -435,8 +444,8 @@ func (d *Driver) savePilosa(ctx *sql.Context, colID uint64) error {

start := time.Now()

for i, frm := range d.fields {
err := frm.Import(d.bitBatches[i].rows, d.bitBatches[i].cols, nil)
for i, fld := range d.fields {
err := fld.Import(d.bitBatches[i].rows, d.bitBatches[i].cols, nil)
if err != nil {
span.LogKV("error", err)
return err
Expand Down
42 changes: 42 additions & 0 deletions sql/index/pilosalib/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"os"
"testing"
"time"

"github.com/pilosa/pilosa"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -236,6 +237,41 @@ func TestDelete(t *testing.T) {
require.NoError(err)
}

func TestDeleteInProgress(t *testing.T) {
require := require.New(t)
setup(t)
defer cleanup(t)

db, table, id := "db_name", "table_name", "index_id"

expressions := []sql.Expression{
expression.NewGetFieldWithTable(0, sql.Int64, table, "lang", true),
expression.NewGetFieldWithTable(1, sql.Int64, table, "hash", true),
}

d := NewDriver(tmpDir)
sqlIdx, err := d.Create(db, table, id, expressions, nil)
require.NoError(err)

it := &partitionKeyValueIter{
partitions: 2,
offset: 0,
total: 1024,
expressions: sqlIdx.Expressions(),
location: slowRandLocation,
}

go func() {
if e := d.Save(sql.NewEmptyContext(), sqlIdx, it); e != nil {
t.Log(e)
}
}()

time.Sleep(time.Second)
err = d.Delete(sqlIdx, new(partitionIter))
require.NoError(err)
}

func TestLoadAllDirectoryDoesNotExist(t *testing.T) {
require := require.New(t)
setup(t)
Expand Down Expand Up @@ -899,6 +935,12 @@ func randLocation(partition sql.Partition, offset int) string {
return string(partition.Key()) + "-" + string(b)
}

func slowRandLocation(partition sql.Partition, offset int) string {
defer time.Sleep(200 * time.Millisecond)

return randLocation(partition, offset)
}

func offsetLocation(partition sql.Partition, offset int) string {
return string(partition.Key()) + "-" + fmt.Sprint(offset)
}
Expand Down
5 changes: 5 additions & 0 deletions sql/index/pilosalib/index.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package pilosalib

import (
"context"
"sync"

"github.com/pilosa/pilosa"
errors "gopkg.in/src-d/go-errors.v1"
"gopkg.in/src-d/go-mysql-server.v0/sql"
Expand All @@ -15,6 +18,8 @@ var (
type pilosaIndex struct {
index *pilosa.Index
mapping *mapping
cancel context.CancelFunc
wg sync.WaitGroup

db string
table string
Expand Down
2 changes: 1 addition & 1 deletion sql/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestIndexesByTable(t *testing.T) {
r.statuses[indexKey{"oof", "rab_idx_1"}] = IndexReady

indexes := r.IndexesByTable("foo", "bar")
require.Len(indexes, 2)
require.Len(indexes, 3)

for i, idx := range indexes {
expected := r.indexes[r.indexOrder[i]]
Expand Down
2 changes: 1 addition & 1 deletion sql/plan/drop_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (d *DropIndex) RowIter(ctx *sql.Context) (sql.RowIter, error) {
}
d.Catalog.ReleaseIndex(index)

done, err := d.Catalog.DeleteIndex(db.Name(), d.Name, false)
done, err := d.Catalog.DeleteIndex(db.Name(), d.Name, true)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 56886a2

Please sign in to comment.