Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support a glob resolver for listing and partitioning files in an object store #5305

Merged
merged 16 commits into from
Jul 23, 2024
Merged
9 changes: 7 additions & 2 deletions runtime/compilers/rillv1/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,17 @@ func (a *connectorAnalyzer) analyzeModel(ctx context.Context, r *Resource) {
for _, connector := range otherConnectors {
a.trackConnector(connector, r, false)
}

// Track the incremental state connector
if spec.IncrementalStateResolver != "" && spec.IncrementalStateResolverProperties != nil {
a.analyzeResourceWithResolver(r, spec.IncrementalStateResolver, spec.IncrementalStateResolverProperties)
}
}

// analyzeResourceWithResolver extracts connector metadata for a resource that uses a resolver.
func (a *connectorAnalyzer) analyzeResourceWithResolver(r *Resource, resolver string, resolverProps *structpb.Struct) {
// The "sql" resolver takes an optional "connector" property
if resolver == "sql" {
// The "sql" and "glob" resolvers take an optional "connector" property
if resolver == "sql" || resolver == "glob" {
for k, v := range resolverProps.Fields {
if k == "connector" {
connector := v.GetStringValue()
Expand Down
21 changes: 21 additions & 0 deletions runtime/compilers/rillv1/parse_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/rilldata/rill/runtime/pkg/openapiutil"
"google.golang.org/protobuf/types/known/structpb"
"gopkg.in/yaml.v3"
)

// APIYAML is the raw structure of a API resource defined in YAML (does not include common fields)
Expand Down Expand Up @@ -99,6 +100,7 @@ type DataYAML struct {
MetricsSQL string `yaml:"metrics_sql"`
API string `yaml:"api"`
Args map[string]any `yaml:"args"`
Glob yaml.Node `yaml:"glob"` // Path (string) or properties (map[string]any)
}

// parseDataYAML parses a data resolver and its properties from a DataYAML.
Expand Down Expand Up @@ -136,6 +138,25 @@ func (p *Parser) parseDataYAML(raw *DataYAML) (string, *structpb.Struct, []Resou
}
}

// Handle glob resolver
if !raw.Glob.IsZero() {
var props map[string]any
switch raw.Glob.Kind {
case yaml.ScalarNode:
props = map[string]any{"path": raw.Glob.Value}
default:
props = make(map[string]any)
err := raw.Glob.Decode(props)
if err != nil {
return "", nil, nil, fmt.Errorf("failed to parse glob properties: %w", err)
}
}

count++
resolver = "glob"
resolverProps = props
}

// Validate there was exactly one resolver
if count == 0 {
return "", nil, nil, fmt.Errorf(`the API definition does not specify a resolver (for example, "sql:", "metrics_sql:", ...)`)
Expand Down
29 changes: 29 additions & 0 deletions runtime/compilers/rillv1/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,35 @@ func ResolveTemplate(tmpl string, data TemplateData) (string, error) {
return res.String(), nil
}

// ResolveTemplateRecursively recursively traverses the provided value and applies ResolveTemplate to any string it encounters.
// It may overwrite the provided value in-place.
func ResolveTemplateRecursively(val any, data TemplateData) (any, error) {
switch val := val.(type) {
case string:
return ResolveTemplate(val, data)
case map[string]any:
for k, v := range val {
v, err := ResolveTemplateRecursively(v, data)
if err != nil {
return nil, err
}
val[k] = v
}
return val, nil
case []any:
for i, v := range val {
v, err := ResolveTemplateRecursively(v, data)
if err != nil {
return nil, err
}
val[i] = v
}
return val, nil
default:
return val, nil
}
}

// newFuncMap creates a base func map for templates.
func newFuncMap(environment string, state map[string]any) template.FuncMap {
// Add Sprig template functions (removing functions that leak host info)
Expand Down
277 changes: 1 addition & 276 deletions runtime/drivers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,11 @@ import (
"context"
"errors"
"fmt"
"math"
"net/http"
"net/url"
"os"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/bmatcuk/doublestar/v4"
"github.com/c2h5oh/datasize"

"github.com/mitchellh/mapstructure"
"github.com/rilldata/rill/runtime/drivers"
rillblob "github.com/rilldata/rill/runtime/drivers/blob"
"github.com/rilldata/rill/runtime/pkg/activity"
"github.com/rilldata/rill/runtime/pkg/globutil"
"go.uber.org/zap"
"gocloud.dev/blob"
"gocloud.dev/blob/azureblob"
"gocloud.dev/gcerrors"
)

func init() {
Expand Down Expand Up @@ -237,263 +222,3 @@ func (c *Connection) AsSQLStore() (drivers.SQLStore, bool) {
func (c *Connection) AsNotifier(properties map[string]any) (drivers.Notifier, error) {
return nil, drivers.ErrNotNotifier
}

// DownloadFiles returns a file iterator over objects stored in azure blob storage.
func (c *Connection) DownloadFiles(ctx context.Context, props map[string]any) (drivers.FileIterator, error) {
conf, err := parseSourceProperties(props)
if err != nil {
return nil, fmt.Errorf("failed to parse config: %w", err)
}

client, err := c.getClient(conf)
if err != nil {
return nil, err
}

// Create a *blob.Bucket.
bucketObj, err := azureblob.OpenBucket(ctx, client, nil)
if err != nil {
return nil, err
}
defer bucketObj.Close()

var batchSize datasize.ByteSize
if conf.BatchSize == "-1" {
batchSize = math.MaxInt64 // download everything in one batch
} else {
batchSize, err = datasize.ParseString(conf.BatchSize)
if err != nil {
return nil, err
}
}

// prepare fetch configs
opts := rillblob.Options{
GlobMaxTotalSize: conf.GlobMaxTotalSize,
GlobMaxObjectsMatched: conf.GlobMaxObjectsMatched,
GlobMaxObjectsListed: conf.GlobMaxObjectsListed,
GlobPageSize: conf.GlobPageSize,
GlobPattern: conf.url.Path,
ExtractPolicy: conf.extractPolicy,
BatchSizeBytes: int64(batchSize.Bytes()),
KeepFilesUntilClose: conf.BatchSize == "-1",
TempDir: c.config.TempDir,
}

iter, err := rillblob.NewIterator(ctx, bucketObj, opts, c.logger)
if err != nil {
// If the err is due to not using the anonymous client for a public container, we want to retry.
var respErr *azcore.ResponseError
if gcerrors.Code(err) == gcerrors.Unknown ||
(errors.As(err, &respErr) && respErr.RawResponse.StatusCode == http.StatusForbidden && (respErr.ErrorCode == "AuthorizationPermissionMismatch" || respErr.ErrorCode == "AuthenticationFailed")) {
c.logger.Warn("Azure Blob Storage account does not have permission to list blobs. Falling back to anonymous access.")

client, err = c.createAnonymousClient(conf)
if err != nil {
return nil, err
}

bucketObj, err = azureblob.OpenBucket(ctx, client, nil)
if err != nil {
return nil, err
}

iter, err = rillblob.NewIterator(ctx, bucketObj, opts, c.logger)
}

// If there's still an err, return it
if err != nil {
respErr = nil
if errors.As(err, &respErr) && respErr.StatusCode == http.StatusForbidden {
return nil, drivers.NewPermissionDeniedError(fmt.Sprintf("failed to create iterator: %v", respErr))
}
return nil, err
}
}

return iter, nil
}

type sourceProperties struct {
Path string `mapstructure:"path"`
Account string `mapstructure:"account"`
URI string `mapstructure:"uri"`
Extract map[string]any `mapstructure:"extract"`
GlobMaxTotalSize int64 `mapstructure:"glob.max_total_size"`
GlobMaxObjectsMatched int `mapstructure:"glob.max_objects_matched"`
GlobMaxObjectsListed int64 `mapstructure:"glob.max_objects_listed"`
GlobPageSize int `mapstructure:"glob.page_size"`
BatchSize string `mapstructure:"batch_size"`
url *globutil.URL
extractPolicy *rillblob.ExtractPolicy
}

func parseSourceProperties(props map[string]any) (*sourceProperties, error) {
conf := &sourceProperties{}
err := mapstructure.WeakDecode(props, conf)
if err != nil {
return nil, err
}
if !doublestar.ValidatePattern(conf.Path) {
return nil, fmt.Errorf("glob pattern %q is invalid", conf.Path)
}
bucketURL, err := globutil.ParseBucketURL(conf.Path)
if err != nil {
return nil, fmt.Errorf("failed to parse path %q: %w", conf.Path, err)
}
if bucketURL.Scheme != "azure" {
return nil, fmt.Errorf("invalid scheme %q in path %q", bucketURL.Scheme, conf.Path)
}

conf.url = bucketURL
return conf, nil
}

// getClient returns a new azure blob client.
func (c *Connection) getClient(conf *sourceProperties) (*container.Client, error) {
var accountKey, sasToken, connectionString string

accountName, err := c.getAccountName(conf)
if err != nil {
return nil, err
}

if c.config.AllowHostAccess {
accountKey = os.Getenv("AZURE_STORAGE_KEY")
sasToken = os.Getenv("AZURE_STORAGE_SAS_TOKEN")
connectionString = os.Getenv("AZURE_STORAGE_CONNECTION_STRING")
}

if c.config.Key != "" {
accountKey = c.config.Key
}
if c.config.SASToken != "" {
sasToken = c.config.SASToken
}
if c.config.ConnectionString != "" {
connectionString = c.config.ConnectionString
}

if connectionString != "" {
client, err := container.NewClientFromConnectionString(connectionString, conf.url.Host, nil)
if err != nil {
return nil, fmt.Errorf("failed container.NewClientFromConnectionString: %w", err)
}
return client, nil
}

if accountName != "" {
svcURL := fmt.Sprintf("https://%s.blob.core.windows.net", accountName)
containerURL, err := url.JoinPath(svcURL, conf.url.Host)
if err != nil {
return nil, err
}

var sharedKeyCred *azblob.SharedKeyCredential

if accountKey != "" {
sharedKeyCred, err = azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
return nil, fmt.Errorf("failed azblob.NewSharedKeyCredential: %w", err)
}

client, err := container.NewClientWithSharedKeyCredential(containerURL, sharedKeyCred, nil)
if err != nil {
return nil, fmt.Errorf("failed container.NewClientWithSharedKeyCredential: %w", err)
}
return client, nil
}

if sasToken != "" {
serviceURL, err := azureblob.NewServiceURL(&azureblob.ServiceURLOptions{
AccountName: accountName,
SASToken: sasToken,
})
if err != nil {
return nil, err
}

containerURL, err := url.JoinPath(string(serviceURL), conf.url.Host)
if err != nil {
return nil, err
}

client, err := container.NewClientWithNoCredential(containerURL, nil)
if err != nil {
return nil, fmt.Errorf("failed container.NewClientWithNoCredential: %w", err)
}
return client, nil
}

cred, err := azidentity.NewDefaultAzureCredential(&azidentity.DefaultAzureCredentialOptions{
DisableInstanceDiscovery: true,
})
if err != nil {
return nil, fmt.Errorf("failed azidentity.NewDefaultAzureCredential: %w", err)
}
client, err := container.NewClient(containerURL, cred, nil)
if err != nil {
return nil, fmt.Errorf("failed container.NewClient: %w", err)
}
return client, nil
}

return nil, drivers.NewPermissionDeniedError("can't access remote host without credentials: no credentials provided")
}

// Create anonymous azure blob client.
func (c *Connection) createAnonymousClient(conf *sourceProperties) (*container.Client, error) {
accountName, err := c.getAccountName(conf)
if err != nil {
return nil, err
}

svcURL := fmt.Sprintf("https://%s.blob.core.windows.net", accountName)
containerURL, err := url.JoinPath(svcURL, conf.url.Host)
if err != nil {
return nil, err
}
client, err := container.NewClientWithNoCredential(containerURL, nil)
if err != nil {
return nil, fmt.Errorf("failed container.NewClientWithNoCredential: %w", err)
}

return client, nil
}

func (c *Connection) openBucketWithNoCredentials(ctx context.Context, conf *sourceProperties) (*blob.Bucket, error) {
// Create containerURL object.
accountName, err := c.getAccountName(conf)
if err != nil {
return nil, err
}
containerURL := fmt.Sprintf("https://%s.blob.core.windows.net/%s", accountName, conf.url.Host)
client, err := container.NewClientWithNoCredential(containerURL, nil)
if err != nil {
return nil, err
}

// Create a *blob.Bucket.
bucketObj, err := azureblob.OpenBucket(ctx, client, nil)
if err != nil {
return nil, err
}

return bucketObj, nil
}

func (c *Connection) getAccountName(conf *sourceProperties) (string, error) {
if conf.Account != "" {
return conf.Account, nil
}

if c.config.Account != "" {
return c.config.Account, nil
}

if c.config.AllowHostAccess {
return os.Getenv("AZURE_STORAGE_ACCOUNT"), nil
}

return "", errors.New("account name not found")
}
Loading
Loading