Skip to content

Commit

Permalink
feat: add virtual DBRP mappings based on bucket name (#23606)
Browse files Browse the repository at this point in the history
* feat: add virtual DBRP mapping based on bucket name

* fix: improve error handling logic

* fix: update physical dbrp tests

* fix: update influxql tests with auto-mapped buckets

* fix: add virtual filtering and testing for virtual dbrps
  • Loading branch information
candrewlee14 authored Aug 3, 2022
1 parent f7b1905 commit adeac8b
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 6 deletions.
2 changes: 1 addition & 1 deletion dbrp/http_server_dbrp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func Test_handleDeleteDBRP(t *testing.T) {
BucketID: influxdbtesting.MustIDBase16("5555f7ed2a035555"),
OrganizationID: influxdbtesting.MustIDBase16("059af7ed2a034000"),
Database: "mydb",
RetentionPolicy: "autogen",
RetentionPolicy: "testrp",
Default: true,
}
if err := svc.Create(ctx, d); err != nil {
Expand Down
79 changes: 77 additions & 2 deletions dbrp/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"bytes"
"context"
"encoding/json"
"strings"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
Expand Down Expand Up @@ -231,11 +232,28 @@ func (s *Service) FindByID(ctx context.Context, orgID, id platform.ID) (*influxd
}
return nil
}); err != nil {
// if not found, fallback to virtual DBRP search
if err == ErrDBRPNotFound {
b, err := s.bucketSvc.FindBucketByID(ctx, id)
if err != nil || b == nil {
return nil, ErrDBRPNotFound
}
return bucketToMapping(b), nil
}
return nil, err
}
return m, nil
}

// parseDBRP parses DB and RP strings out of a bucket name
func parseDBRP(bucketName string) (string, string) {
db, rp, isCut := strings.Cut(bucketName, "/")
if isCut {
return db, rp
}
return bucketName, "autogen"
}

// FindMany returns a list of mappings that match filter and the total count of matching dbrp mappings.
// TODO(affo): find a smart way to apply FindOptions to a list of items.
func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilter, opts ...influxdb.FindOptions) ([]*influxdb.DBRPMapping, int, error) {
Expand Down Expand Up @@ -334,8 +352,65 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte
}
return nil
})
if err != nil {
return ms, len(ms), err
}

buckets, count, err := s.bucketSvc.FindBuckets(ctx, influxdb.BucketFilter{
ID: filter.BucketID,
Name: filter.Database,
OrganizationID: filter.OrgID,
}, opts...)
if (err != nil || count == 0) && filter.Database != nil && filter.RetentionPolicy != nil {
// if the search couldn't find a corresponding dbrp, it could be that the bucket name has a slash (like db/rp)
// instead of just bucket name being the database with "autogen" retention policy
bucketName := *filter.Database + "/" + *filter.RetentionPolicy
buckets, _, err = s.bucketSvc.FindBuckets(ctx, influxdb.BucketFilter{
ID: filter.BucketID,
Name: &bucketName,
OrganizationID: filter.OrgID,
}, opts...)
if err != nil {
// we were unable to find any virtual mappings, so return what physical mappings we have
return ms, len(ms), nil
}
}
OUTER:
for _, bucket := range buckets {
// check if this virtual mapping has been overriden by a custom, physical mapping
for _, m := range ms {
if m.BucketID == bucket.ID {
continue OUTER
}
}
if bucket == nil {
continue
}
mapping := bucketToMapping(bucket)
if filterFunc(mapping, filter) {
ms = append(ms, mapping)
}
}

return ms, len(ms), err
return ms, len(ms), nil
}

func bucketToMapping(bucket *influxdb.Bucket) *influxdb.DBRPMapping {
if bucket == nil {
return nil
}
// for now, virtual DBRPs will use the same ID as their bucket to be able to find them by ID
dbrpID := bucket.ID
db, rp := parseDBRP(bucket.Name)
return &influxdb.DBRPMapping{
ID: dbrpID,
Default: false,
Database: db,
RetentionPolicy: rp,
OrganizationID: bucket.OrgID,
BucketID: bucket.ID,
Virtual: true,
}
}

// Create creates a new mapping.
Expand All @@ -354,7 +429,7 @@ func (s *Service) Create(ctx context.Context, dbrp *influxdb.DBRPMapping) error
}

// If a dbrp with this particular ID already exists an error is returned.
if _, err := s.FindByID(ctx, dbrp.OrganizationID, dbrp.ID); err == nil {
if d, err := s.FindByID(ctx, dbrp.OrganizationID, dbrp.ID); err == nil && !d.Virtual {
return ErrDBRPAlreadyExists("dbrp already exist for this particular ID. If you are trying an update use the right function .Update")
}
// If a dbrp with this orgID, db, and rp exists an error is returned.
Expand Down
3 changes: 3 additions & 0 deletions dbrp/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ func initDBRPMappingService(f itesting.DBRPMappingFields, t *testing.T) (influxd
Name: fmt.Sprintf("bucket-%v", id),
}, nil
},
FindBucketsFn: func(ctx context.Context, bf influxdb.BucketFilter, fo ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
return []*influxdb.Bucket{}, 0, nil
},
}
}

Expand Down
2 changes: 2 additions & 0 deletions dbrp_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type DBRPMapping struct {

// Default indicates if this mapping is the default for the cluster and database.
Default bool `json:"default"`
// Virtual indicates if this is a virtual mapping (tied to bucket name) or physical
Virtual bool `json:"virtual"`

OrganizationID platform.ID `json:"orgID"`
BucketID platform.ID `json:"bucketID"`
Expand Down
2 changes: 1 addition & 1 deletion influxql/v1tests/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestServer_Query_ShowDatabases(t *testing.T) {
&Query{
name: "show databases does not return duplicates",
command: "SHOW DATABASES",
exp: `{"results":[{"statement_id":0,"series":[{"name":"databases","columns":["name"],"values":[["my-bucket"],["telegraf"]]}]}]}`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"databases","columns":["name"],"values":[["my-bucket"],["telegraf"],["_monitoring"],["_tasks"],["db"]]}]}]}`,
},
)

Expand Down
Loading

0 comments on commit adeac8b

Please sign in to comment.