From 3d2c375625f3ee8244888543e92b9b15af724ef4 Mon Sep 17 00:00:00 2001 From: kuba-- Date: Fri, 19 Oct 2018 00:26:10 +0200 Subject: [PATCH] No holder. Signed-off-by: kuba-- --- sql/index/pilosa/driver.go | 46 ++++--- sql/index/pilosa/driver_test.go | 221 ++++++++++++++++++++++++++++++++ sql/index/pilosa/lookup.go | 10 +- 3 files changed, 258 insertions(+), 19 deletions(-) diff --git a/sql/index/pilosa/driver.go b/sql/index/pilosa/driver.go index 179b9a329..9e91a11a4 100644 --- a/sql/index/pilosa/driver.go +++ b/sql/index/pilosa/driver.go @@ -69,19 +69,15 @@ type ( // Driver implements sql.IndexDriver interface. Driver struct { - root string - holder *pilosa.Holder + root string } ) // NewDriver returns a new instance of pilosa.Driver // which satisfies sql.IndexDriver interface func NewDriver(root string) *Driver { - h := pilosa.NewHolder() - h.Path = filepath.Join(root, "."+DriverID) return &Driver{ - root: root, - holder: h, + root: root, } } @@ -116,16 +112,12 @@ func (d *Driver) Create( return nil, err } - idx, err := d.holder.CreateIndexIfNotExists( - indexName(db, table), - pilosa.IndexOptions{}, - ) + idx, err := d.newPilosaIndex(db, table) if err != nil { return nil, err } mapping := newMapping(d.mappingFilePath(db, table, id)) - processingFile := d.processingFilePath(db, table, id) if err := index.WriteProcessingFile( processingFile, @@ -173,11 +165,14 @@ func (d *Driver) LoadAll(db, table string) ([]sql.Index, error) { } func (d *Driver) loadIndex(db, table, id string) (*pilosaIndex, error) { - name := indexName(db, table) - idx := d.holder.Index(name) - if idx == nil { - return nil, errLoadingIndex.New(name) + idx, err := d.newPilosaIndex(db, table) + if err != nil { + return nil, err } + if err := idx.Open(); err != nil { + return nil, err + } + defer idx.Close() dir := filepath.Join(d.root, db, table, id) config := d.configFilePath(db, table, id) @@ -328,6 +323,12 @@ func (d *Driver) Save( if !ok { return errInvalidIndexType.New(i) } + + if err := idx.index.Open(); err != nil { + return err + } + defer idx.index.Close() + idx.wg.Add(1) defer idx.wg.Done() @@ -386,6 +387,11 @@ func (d *Driver) Delete(i sql.Index, partitions sql.PartitionIter) error { idx.wg.Wait() } + if err := idx.index.Open(); err != nil { + return err + } + defer idx.index.Close() + if err := os.RemoveAll(filepath.Join(d.root, i.Database(), i.Table(), i.ID())); err != nil { return err } @@ -534,3 +540,13 @@ func (d *Driver) processingFilePath(db, table, id string) string { func (d *Driver) mappingFilePath(db, table, id string) string { return filepath.Join(d.root, db, table, id, MappingFileName) } + +func (d *Driver) newPilosaIndex(db, table string) (*pilosa.Index, error) { + name := indexName(db, table) + path := filepath.Join(d.root, "."+DriverID, name) + idx, err := pilosa.NewIndex(path, name) + if err != nil { + return nil, err + } + return idx, nil +} diff --git a/sql/index/pilosa/driver_test.go b/sql/index/pilosa/driver_test.go index 698251b8b..80a716fa7 100644 --- a/sql/index/pilosa/driver_test.go +++ b/sql/index/pilosa/driver_test.go @@ -99,6 +99,66 @@ func TestLoadAll(t *testing.T) { require.NoError(err) } +func TestLoadAllWithMultipleDrivers(t *testing.T) { + require := require.New(t) + setup(t) + defer cleanup(t) + + d1 := NewDriver(tmpDir) + idx1, err := d1.Create("db", "table", "id1", makeExpressions("table", "hash1"), nil) + require.NoError(err) + it1 := &partitionKeyValueIter{ + partitions: 2, + offset: 0, + total: 64, + expressions: idx1.Expressions(), + location: randLocation, + } + require.NoError(d1.Save(sql.NewEmptyContext(), idx1, it1)) + + d2 := NewDriver(tmpDir) + idx2, err := d2.Create("db", "table", "id2", makeExpressions("table", "hash1"), nil) + require.NoError(err) + it2 := &partitionKeyValueIter{ + partitions: 2, + offset: 0, + total: 64, + expressions: idx2.Expressions(), + location: randLocation, + } + require.NoError(d2.Save(sql.NewEmptyContext(), idx2, it2)) + + d := NewDriver(tmpDir) + indexes, err := d.LoadAll("db", "table") + require.NoError(err) + + require.Equal(2, len(indexes)) + i1, ok := idx1.(*pilosaIndex) + require.True(ok) + i2, ok := idx2.(*pilosaIndex) + require.True(ok) + + require.Equal(i1.index.Name(), i2.index.Name()) + + // Load index from another table. Previously this panicked as the same + // pilosa.Holder was used for all indexes. + + d3 := NewDriver(tmpDir) + idx3, err := d3.Create("db", "table2", "id1", makeExpressions("table2", "hash1"), nil) + require.NoError(err) + it3 := &partitionKeyValueIter{ + partitions: 2, + offset: 0, + total: 64, + expressions: idx3.Expressions(), + location: randLocation, + } + require.NoError(d3.Save(sql.NewEmptyContext(), idx3, it3)) + + indexes, err = d.LoadAll("db", "table2") + require.NoError(err) +} + type logLoc struct { loc []byte err error @@ -214,6 +274,39 @@ func TestSaveAndGetAll(t *testing.T) { require.True(errInvalidKeys.Is(err)) } +func TestSaveAndGetAllWithMultipleDrivers(t *testing.T) { + require := require.New(t) + setup(t) + defer cleanup(t) + + db, table, id := "db_name", "table_name", "index_id" + expressions := makeExpressions(table, "lang", "hash") + + d1 := NewDriver(tmpDir) + sqlIdx, err := d1.Create(db, table, id, expressions, nil) + require.NoError(err) + + it := &partitionKeyValueIter{ + partitions: 2, + offset: 0, + total: 64, + expressions: sqlIdx.Expressions(), + location: randLocation, + } + + err = d1.Save(sql.NewEmptyContext(), sqlIdx, it) + require.NoError(err) + + d2 := NewDriver(tmpDir) + indexes, err := d2.LoadAll(db, table) + require.NoError(err) + require.Equal(1, len(indexes)) + + _, err = sqlIdx.Get() + require.Error(err) + require.True(errInvalidKeys.Is(err)) +} + func TestLoadCorruptedIndex(t *testing.T) { require := require.New(t) setup(t) @@ -254,6 +347,58 @@ func TestDelete(t *testing.T) { require.NoError(err) } +func TestDeleteWithMultipleDrivers(t *testing.T) { + require := require.New(t) + setup(t) + defer cleanup(t) + + db, table, id := "db_name", "table_name", "index_id" + + expressions := []sql.Expression{ + expression.NewGetFieldWithTable(0, sql.Int64, table, "lang", true), + expression.NewGetFieldWithTable(1, sql.Int64, table, "field", true), + } + + d := NewDriver(tmpDir) + sqlIdx, err := d.Create(db, table, id, expressions, nil) + require.NoError(err) + + d = NewDriver(tmpDir) + err = d.Delete(sqlIdx, new(partitionIter)) + require.NoError(err) +} + +func TestDeleteAndLoadAll(t *testing.T) { + require := require.New(t) + setup(t) + defer cleanup(t) + + db, table, id := "db_name", "table_name", "index_id" + expressions := makeExpressions(table, "lang", "hash") + + d := NewDriver(tmpDir) + sqlIdx, err := d.Create(db, table, id, expressions, nil) + require.NoError(err) + + it := &partitionKeyValueIter{ + partitions: 2, + offset: 0, + total: 64, + expressions: sqlIdx.Expressions(), + location: randLocation, + } + + err = d.Save(sql.NewEmptyContext(), sqlIdx, it) + require.NoError(err) + + err = d.Delete(sqlIdx, new(partitionIter)) + require.NoError(err) + + indexes, err := d.LoadAll(db, table) + require.NoError(err) + require.Equal(0, len(indexes)) +} + func TestDeleteInProgress(t *testing.T) { require := require.New(t) setup(t) @@ -449,6 +594,82 @@ func TestIntersection(t *testing.T) { require.NoError(interIt.Close()) } +func TestIntersectionWithMultipleDrivers(t *testing.T) { + ctx := sql.NewContext(context.Background()) + require := require.New(t) + setup(t) + defer cleanup(t) + + db, table := "db_name", "table_name" + idxLang, expLang := "idx_lang", makeExpressions(table, "lang") + idxPath, expPath := "idx_path", makeExpressions(table, "path") + + d1 := NewDriver(tmpDir) + sqlIdxLang, err := d1.Create(db, table, idxLang, expLang, nil) + require.NoError(err) + + d2 := NewDriver(tmpDir) + sqlIdxPath, err := d2.Create(db, table, idxPath, expPath, nil) + require.NoError(err) + + itLang := &partitionKeyValueIter{ + partitions: 2, + offset: 0, + total: 10, + expressions: sqlIdxLang.Expressions(), + location: offsetLocation, + } + + itPath := &partitionKeyValueIter{ + partitions: 2, + offset: 0, + total: 10, + expressions: sqlIdxPath.Expressions(), + location: offsetLocation, + } + + err = d1.Save(ctx, sqlIdxLang, itLang) + require.NoError(err) + + err = d2.Save(ctx, sqlIdxPath, itPath) + require.NoError(err) + + lookupLang, err := sqlIdxLang.Get(itLang.records[0][0].values...) + require.NoError(err) + lookupPath, err := sqlIdxPath.Get(itPath.records[0][itPath.total-1].values...) + require.NoError(err) + + m, ok := lookupLang.(sql.Mergeable) + require.True(ok) + require.True(m.IsMergeable(lookupPath)) + + interLookup, ok := lookupLang.(sql.SetOperations) + require.True(ok) + interIt, err := interLookup.Intersection(lookupPath).Values(testPartition(0)) + require.NoError(err) + loc, err := interIt.Next() + + require.True(err == io.EOF) + require.NoError(interIt.Close()) + + lookupLang, err = sqlIdxLang.Get(itLang.records[0][0].values...) + require.NoError(err) + lookupPath, err = sqlIdxPath.Get(itPath.records[0][0].values...) + require.NoError(err) + + interLookup, ok = lookupPath.(sql.SetOperations) + require.True(ok) + interIt, err = interLookup.Intersection(lookupLang).Values(testPartition(0)) + require.NoError(err) + loc, err = interIt.Next() + require.NoError(err) + require.Equal(loc, itPath.records[0][0].location) + _, err = interIt.Next() + require.True(err == io.EOF) + + require.NoError(interIt.Close()) +} + func TestUnion(t *testing.T) { require := require.New(t) setup(t) diff --git a/sql/index/pilosa/lookup.go b/sql/index/pilosa/lookup.go index 8145ee9d5..1e0e8e82f 100644 --- a/sql/index/pilosa/lookup.go +++ b/sql/index/pilosa/lookup.go @@ -84,9 +84,11 @@ func (l *indexLookup) values(p sql.Partition) (*pilosa.Row, error) { if err := l.mapping.open(); err != nil { return nil, err } - defer l.mapping.close() + if err := l.index.Open(); err != nil { + return nil, err + } var row *pilosa.Row for i, expr := range l.expressions { field := l.index.Field(fieldName(l.id, expr, p)) @@ -105,6 +107,9 @@ func (l *indexLookup) values(p sql.Partition) (*pilosa.Row, error) { row = intersect(row, r) } + if err := l.index.Close(); err != nil { + return nil, err + } // evaluate composition of operations for _, op := range l.operations { @@ -131,9 +136,6 @@ func (l *indexLookup) values(p sql.Partition) (*pilosa.Row, error) { // Values implements sql.IndexLookup.Values func (l *indexLookup) Values(p sql.Partition) (sql.IndexValueIter, error) { - l.index.Open() - defer l.index.Close() - row, err := l.values(p) if err != nil { return nil, err