Skip to content
This repository has been archived by the owner on Jan 28, 2021. It is now read-only.

No holder! #478

Merged
merged 1 commit into from
Oct 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions sql/index/pilosa/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,15 @@ type (

// Driver implements sql.IndexDriver interface.
Driver struct {
root string
holder *pilosa.Holder
root string
}
)

// NewDriver returns a new instance of pilosa.Driver
// which satisfies sql.IndexDriver interface
func NewDriver(root string) *Driver {
h := pilosa.NewHolder()
h.Path = filepath.Join(root, "."+DriverID)
return &Driver{
root: root,
holder: h,
root: root,
}
}

Expand Down Expand Up @@ -116,16 +112,12 @@ func (d *Driver) Create(
return nil, err
}

idx, err := d.holder.CreateIndexIfNotExists(
indexName(db, table),
pilosa.IndexOptions{},
)
idx, err := d.newPilosaIndex(db, table)
if err != nil {
return nil, err
}

mapping := newMapping(d.mappingFilePath(db, table, id))

processingFile := d.processingFilePath(db, table, id)
if err := index.WriteProcessingFile(
processingFile,
Expand Down Expand Up @@ -173,11 +165,14 @@ func (d *Driver) LoadAll(db, table string) ([]sql.Index, error) {
}

func (d *Driver) loadIndex(db, table, id string) (*pilosaIndex, error) {
name := indexName(db, table)
idx := d.holder.Index(name)
if idx == nil {
return nil, errLoadingIndex.New(name)
idx, err := d.newPilosaIndex(db, table)
if err != nil {
return nil, err
}
if err := idx.Open(); err != nil {
return nil, err
}
defer idx.Close()

dir := filepath.Join(d.root, db, table, id)
config := d.configFilePath(db, table, id)
Expand Down Expand Up @@ -328,6 +323,12 @@ func (d *Driver) Save(
if !ok {
return errInvalidIndexType.New(i)
}

if err := idx.index.Open(); err != nil {
return err
}
defer idx.index.Close()

idx.wg.Add(1)
defer idx.wg.Done()

Expand Down Expand Up @@ -386,6 +387,11 @@ func (d *Driver) Delete(i sql.Index, partitions sql.PartitionIter) error {
idx.wg.Wait()
}

if err := idx.index.Open(); err != nil {
return err
}
defer idx.index.Close()

if err := os.RemoveAll(filepath.Join(d.root, i.Database(), i.Table(), i.ID())); err != nil {
return err
}
Expand Down Expand Up @@ -534,3 +540,13 @@ func (d *Driver) processingFilePath(db, table, id string) string {
func (d *Driver) mappingFilePath(db, table, id string) string {
return filepath.Join(d.root, db, table, id, MappingFileName)
}

func (d *Driver) newPilosaIndex(db, table string) (*pilosa.Index, error) {
name := indexName(db, table)
path := filepath.Join(d.root, "."+DriverID, name)
idx, err := pilosa.NewIndex(path, name)
if err != nil {
return nil, err
}
return idx, nil
}
221 changes: 221 additions & 0 deletions sql/index/pilosa/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,66 @@ func TestLoadAll(t *testing.T) {
require.NoError(err)
}

func TestLoadAllWithMultipleDrivers(t *testing.T) {
require := require.New(t)
setup(t)
defer cleanup(t)

d1 := NewDriver(tmpDir)
idx1, err := d1.Create("db", "table", "id1", makeExpressions("table", "hash1"), nil)
require.NoError(err)
it1 := &partitionKeyValueIter{
partitions: 2,
offset: 0,
total: 64,
expressions: idx1.Expressions(),
location: randLocation,
}
require.NoError(d1.Save(sql.NewEmptyContext(), idx1, it1))

d2 := NewDriver(tmpDir)
idx2, err := d2.Create("db", "table", "id2", makeExpressions("table", "hash1"), nil)
require.NoError(err)
it2 := &partitionKeyValueIter{
partitions: 2,
offset: 0,
total: 64,
expressions: idx2.Expressions(),
location: randLocation,
}
require.NoError(d2.Save(sql.NewEmptyContext(), idx2, it2))

d := NewDriver(tmpDir)
indexes, err := d.LoadAll("db", "table")
require.NoError(err)

require.Equal(2, len(indexes))
i1, ok := idx1.(*pilosaIndex)
require.True(ok)
i2, ok := idx2.(*pilosaIndex)
require.True(ok)

require.Equal(i1.index.Name(), i2.index.Name())

// Load index from another table. Previously this panicked as the same
// pilosa.Holder was used for all indexes.

d3 := NewDriver(tmpDir)
idx3, err := d3.Create("db", "table2", "id1", makeExpressions("table2", "hash1"), nil)
require.NoError(err)
it3 := &partitionKeyValueIter{
partitions: 2,
offset: 0,
total: 64,
expressions: idx3.Expressions(),
location: randLocation,
}
require.NoError(d3.Save(sql.NewEmptyContext(), idx3, it3))

indexes, err = d.LoadAll("db", "table2")
require.NoError(err)
}

type logLoc struct {
loc []byte
err error
Expand Down Expand Up @@ -214,6 +274,39 @@ func TestSaveAndGetAll(t *testing.T) {
require.True(errInvalidKeys.Is(err))
}

func TestSaveAndGetAllWithMultipleDrivers(t *testing.T) {
require := require.New(t)
setup(t)
defer cleanup(t)

db, table, id := "db_name", "table_name", "index_id"
expressions := makeExpressions(table, "lang", "hash")

d1 := NewDriver(tmpDir)
sqlIdx, err := d1.Create(db, table, id, expressions, nil)
require.NoError(err)

it := &partitionKeyValueIter{
partitions: 2,
offset: 0,
total: 64,
expressions: sqlIdx.Expressions(),
location: randLocation,
}

err = d1.Save(sql.NewEmptyContext(), sqlIdx, it)
require.NoError(err)

d2 := NewDriver(tmpDir)
indexes, err := d2.LoadAll(db, table)
require.NoError(err)
require.Equal(1, len(indexes))

_, err = sqlIdx.Get()
require.Error(err)
require.True(errInvalidKeys.Is(err))
}

func TestLoadCorruptedIndex(t *testing.T) {
require := require.New(t)
setup(t)
Expand Down Expand Up @@ -254,6 +347,58 @@ func TestDelete(t *testing.T) {
require.NoError(err)
}

func TestDeleteWithMultipleDrivers(t *testing.T) {
require := require.New(t)
setup(t)
defer cleanup(t)

db, table, id := "db_name", "table_name", "index_id"

expressions := []sql.Expression{
expression.NewGetFieldWithTable(0, sql.Int64, table, "lang", true),
expression.NewGetFieldWithTable(1, sql.Int64, table, "field", true),
}

d := NewDriver(tmpDir)
sqlIdx, err := d.Create(db, table, id, expressions, nil)
require.NoError(err)

d = NewDriver(tmpDir)
err = d.Delete(sqlIdx, new(partitionIter))
require.NoError(err)
}

func TestDeleteAndLoadAll(t *testing.T) {
require := require.New(t)
setup(t)
defer cleanup(t)

db, table, id := "db_name", "table_name", "index_id"
expressions := makeExpressions(table, "lang", "hash")

d := NewDriver(tmpDir)
sqlIdx, err := d.Create(db, table, id, expressions, nil)
require.NoError(err)

it := &partitionKeyValueIter{
partitions: 2,
offset: 0,
total: 64,
expressions: sqlIdx.Expressions(),
location: randLocation,
}

err = d.Save(sql.NewEmptyContext(), sqlIdx, it)
require.NoError(err)

err = d.Delete(sqlIdx, new(partitionIter))
require.NoError(err)

indexes, err := d.LoadAll(db, table)
require.NoError(err)
require.Equal(0, len(indexes))
}

func TestDeleteInProgress(t *testing.T) {
require := require.New(t)
setup(t)
Expand Down Expand Up @@ -449,6 +594,82 @@ func TestIntersection(t *testing.T) {
require.NoError(interIt.Close())
}

func TestIntersectionWithMultipleDrivers(t *testing.T) {
ctx := sql.NewContext(context.Background())
require := require.New(t)
setup(t)
defer cleanup(t)

db, table := "db_name", "table_name"
idxLang, expLang := "idx_lang", makeExpressions(table, "lang")
idxPath, expPath := "idx_path", makeExpressions(table, "path")

d1 := NewDriver(tmpDir)
sqlIdxLang, err := d1.Create(db, table, idxLang, expLang, nil)
require.NoError(err)

d2 := NewDriver(tmpDir)
sqlIdxPath, err := d2.Create(db, table, idxPath, expPath, nil)
require.NoError(err)

itLang := &partitionKeyValueIter{
partitions: 2,
offset: 0,
total: 10,
expressions: sqlIdxLang.Expressions(),
location: offsetLocation,
}

itPath := &partitionKeyValueIter{
partitions: 2,
offset: 0,
total: 10,
expressions: sqlIdxPath.Expressions(),
location: offsetLocation,
}

err = d1.Save(ctx, sqlIdxLang, itLang)
require.NoError(err)

err = d2.Save(ctx, sqlIdxPath, itPath)
require.NoError(err)

lookupLang, err := sqlIdxLang.Get(itLang.records[0][0].values...)
require.NoError(err)
lookupPath, err := sqlIdxPath.Get(itPath.records[0][itPath.total-1].values...)
require.NoError(err)

m, ok := lookupLang.(sql.Mergeable)
require.True(ok)
require.True(m.IsMergeable(lookupPath))

interLookup, ok := lookupLang.(sql.SetOperations)
require.True(ok)
interIt, err := interLookup.Intersection(lookupPath).Values(testPartition(0))
require.NoError(err)
loc, err := interIt.Next()

require.True(err == io.EOF)
require.NoError(interIt.Close())

lookupLang, err = sqlIdxLang.Get(itLang.records[0][0].values...)
require.NoError(err)
lookupPath, err = sqlIdxPath.Get(itPath.records[0][0].values...)
require.NoError(err)

interLookup, ok = lookupPath.(sql.SetOperations)
require.True(ok)
interIt, err = interLookup.Intersection(lookupLang).Values(testPartition(0))
require.NoError(err)
loc, err = interIt.Next()
require.NoError(err)
require.Equal(loc, itPath.records[0][0].location)
_, err = interIt.Next()
require.True(err == io.EOF)

require.NoError(interIt.Close())
}

func TestUnion(t *testing.T) {
require := require.New(t)
setup(t)
Expand Down
10 changes: 6 additions & 4 deletions sql/index/pilosa/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,11 @@ func (l *indexLookup) values(p sql.Partition) (*pilosa.Row, error) {
if err := l.mapping.open(); err != nil {
return nil, err
}

defer l.mapping.close()

if err := l.index.Open(); err != nil {
return nil, err
}
var row *pilosa.Row
for i, expr := range l.expressions {
field := l.index.Field(fieldName(l.id, expr, p))
Expand All @@ -105,6 +107,9 @@ func (l *indexLookup) values(p sql.Partition) (*pilosa.Row, error) {

row = intersect(row, r)
}
if err := l.index.Close(); err != nil {
return nil, err
}

// evaluate composition of operations
for _, op := range l.operations {
Expand All @@ -131,9 +136,6 @@ func (l *indexLookup) values(p sql.Partition) (*pilosa.Row, error) {

// Values implements sql.IndexLookup.Values
func (l *indexLookup) Values(p sql.Partition) (sql.IndexValueIter, error) {
l.index.Open()
defer l.index.Close()

row, err := l.values(p)
if err != nil {
return nil, err
Expand Down