Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runtime: Pass instanceID to drivers.Open for use in telemetry #4633

Merged
merged 2 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/cmd/runtime/install_duckdb_extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func InstallDuckDBExtensionsCmd(ch *cmdutil.Helper) *cobra.Command {
Use: "install-duckdb-extensions",
RunE: func(cmd *cobra.Command, args []string) error {
cfg := map[string]any{"dsn": ":memory:"} // In-memory
h, err := drivers.Open("duckdb", cfg, false, activity.NewNoopClient(), zap.NewNop())
h, err := drivers.Open("duckdb", "default", cfg, activity.NewNoopClient(), zap.NewNop())
if err != nil {
return fmt.Errorf("failed to open ephemeral duckdb: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cli/pkg/cmdutil/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// RepoForProjectPath creates an ad-hoc drivers.RepoStore for a local project file path
func RepoForProjectPath(path string) (drivers.RepoStore, string, error) {
instanceID := "default"
repoHandle, err := drivers.Open("file", map[string]any{"dsn": path}, false, activity.NewNoopClient(), zap.NewNop())
repoHandle, err := drivers.Open("file", instanceID, map[string]any{"dsn": path}, activity.NewNoopClient(), zap.NewNop())
if err != nil {
return nil, "", err
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/compilers/rillv1/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1478,7 +1478,7 @@ func requireResourcesAndErrors(t testing.TB, p *Parser, wantResources []*Resourc

func makeRepo(t testing.TB, files map[string]string) drivers.RepoStore {
root := t.TempDir()
handle, err := drivers.Open("file", map[string]any{"dsn": root}, false, activity.NewNoopClient(), zap.NewNop())
handle, err := drivers.Open("file", "default", map[string]any{"dsn": root}, activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)

repo, ok := handle.AsRepoStore("")
Expand Down
11 changes: 5 additions & 6 deletions runtime/connection_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ var (
)

type cachedConnectionConfig struct {
instanceID string
instanceID string // Empty if connection is shared
driver string
shared bool
config map[string]any
}

Expand Down Expand Up @@ -66,11 +65,11 @@ func (r *Runtime) newConnectionCache() conncache.Cache {
}

// getConnection returns a cached connection for the given driver configuration.
func (r *Runtime) getConnection(ctx context.Context, instanceID, driver string, config map[string]any, shared bool) (drivers.Handle, func(), error) {
// If instanceID is empty, the connection is considered shared (see drivers.Open for details).
func (r *Runtime) getConnection(ctx context.Context, instanceID, driver string, config map[string]any) (drivers.Handle, func(), error) {
cfg := cachedConnectionConfig{
instanceID: instanceID,
driver: driver,
shared: shared,
config: config,
}

Expand All @@ -94,7 +93,7 @@ func (r *Runtime) evictInstanceConnections(instanceID string) {
func (r *Runtime) openAndMigrate(ctx context.Context, cfg cachedConnectionConfig) (drivers.Handle, error) {
logger := r.logger
activityClient := r.activity
if !cfg.shared {
if cfg.instanceID != "" { // Not shared across multiple instances
inst, err := r.Instance(ctx, cfg.instanceID)
if err != nil {
return nil, err
Expand All @@ -111,7 +110,7 @@ func (r *Runtime) openAndMigrate(ctx context.Context, cfg cachedConnectionConfig
}
}

handle, err := drivers.Open(cfg.driver, cfg.config, cfg.shared, activityClient, logger)
handle, err := drivers.Open(cfg.driver, cfg.instanceID, cfg.config, activityClient, logger)
if err == nil && ctx.Err() != nil {
err = fmt.Errorf("timed out while opening driver %q", cfg.driver)
}
Expand Down
4 changes: 2 additions & 2 deletions runtime/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (r *Runtime) AcquireSystemHandle(ctx context.Context, connector string) (dr
cfg[strings.ToLower(k)] = v
}
cfg["allow_host_access"] = r.opts.AllowHostAccess
return r.getConnection(ctx, "", c.Type, cfg, true)
return r.getConnection(ctx, "", c.Type, cfg)
}
}
return nil, nil, fmt.Errorf("connector %s doesn't exist", connector)
Expand All @@ -40,7 +40,7 @@ func (r *Runtime) AcquireHandle(ctx context.Context, instanceID, connector strin
// So we take this moment to make sure the ctx gets checked for cancellation at least every once in a while.
return nil, nil, ctx.Err()
}
return r.getConnection(ctx, instanceID, cfg.Driver, cfg.Resolve(), false)
return r.getConnection(ctx, instanceID, cfg.Driver, cfg.Resolve())
}

func (r *Runtime) Repo(ctx context.Context, instanceID string) (drivers.RepoStore, func(), error) {
Expand Down
8 changes: 4 additions & 4 deletions runtime/drivers/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ type configProperties struct {
TempDir string `mapstructure:"temp_dir"`
}

func (d driver) Open(cfgMap map[string]any, shared bool, ac *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
if shared {
return nil, fmt.Errorf("admin driver can't be shared")
func (d driver) Open(instanceID string, config map[string]any, ac *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
if instanceID == "" {
return nil, errors.New("admin driver can't be shared")
}

cfg := &configProperties{}
err := mapstructure.WeakDecode(cfgMap, cfg)
err := mapstructure.WeakDecode(config, cfg)
if err != nil {
return nil, err
}
Expand Down
9 changes: 5 additions & 4 deletions runtime/drivers/athena/athena.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package athena

import (
"context"
"fmt"
"errors"

"github.com/mitchellh/mapstructure"
"github.com/rilldata/rill/runtime/drivers"
Expand Down Expand Up @@ -76,10 +76,11 @@ type configProperties struct {
AllowHostAccess bool `mapstructure:"allow_host_access"`
}

func (d driver) Open(config map[string]any, shared bool, _ *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
if shared {
return nil, fmt.Errorf("athena driver can't be shared")
func (d driver) Open(instanceID string, config map[string]any, client *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
if instanceID == "" {
return nil, errors.New("athena driver can't be shared")
}

conf := &configProperties{}
err := mapstructure.WeakDecode(config, conf)
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions runtime/drivers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ type configProperties struct {
AllowHostAccess bool `mapstructure:"allow_host_access"`
}

func (d driver) Open(config map[string]any, shared bool, client *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
if shared {
return nil, fmt.Errorf("azure driver does not support shared connections")
func (d driver) Open(instanceID string, config map[string]any, client *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
if instanceID == "" {
return nil, errors.New("azure driver can't be shared")
}

conf := &configProperties{}
err := mapstructure.WeakDecode(config, conf)
if err != nil {
Expand All @@ -115,12 +116,11 @@ func (d driver) HasAnonymousSourceAccess(ctx context.Context, props map[string]a
return false, fmt.Errorf("failed to parse config: %w", err)
}

c, err := d.Open(map[string]any{}, false, activity.NewNoopClient(), logger)
if err != nil {
return false, err
conn := &Connection{
config: &configProperties{},
logger: logger,
}

conn := c.(*Connection)
bucketObj, err := conn.openBucketWithNoCredentials(ctx, conf)
if err != nil {
return false, fmt.Errorf("failed to open container %q, %w", conf.url.Host, err)
Expand Down
8 changes: 5 additions & 3 deletions runtime/drivers/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bigquery

import (
"context"
"errors"
"fmt"

"cloud.google.com/go/bigquery"
Expand Down Expand Up @@ -67,10 +68,11 @@ type configProperties struct {
AllowHostAccess bool `mapstructure:"allow_host_access"`
}

func (d driver) Open(config map[string]any, shared bool, client *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
if shared {
return nil, fmt.Errorf("bigquery driver can't be shared")
func (d driver) Open(instanceID string, config map[string]any, client *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
if instanceID == "" {
return nil, errors.New("bigquery driver can't be shared")
}

conf := &configProperties{}
err := mapstructure.WeakDecode(config, conf)
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions runtime/drivers/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clickhouse

import (
"context"
"errors"
"fmt"

"github.com/jmoiron/sqlx"
Expand Down Expand Up @@ -54,13 +55,13 @@ type configProperties struct {

// Open connects to Clickhouse using std API.
// Connection string format : https://github.com/ClickHouse/clickhouse-go?tab=readme-ov-file#dsn
func (d driver) Open(confMap map[string]any, shared bool, client *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
if shared {
return nil, fmt.Errorf("clickhouse driver can't be shared")
func (d driver) Open(instanceID string, config map[string]any, client *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
if instanceID == "" {
return nil, errors.New("clickhouse driver can't be shared")
}

conf := &configProperties{}
err := mapstructure.WeakDecode(confMap, conf)
err := mapstructure.WeakDecode(config, conf)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/drivers/clickhouse/information_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestInformationSchema(t *testing.T) {
port, err := clickHouseContainer.MappedPort(ctx, "9000/tcp")
require.NoError(t, err)

conn, err := driver{}.Open(map[string]any{"dsn": fmt.Sprintf("clickhouse://clickhouse:clickhouse@%v:%v", host, port.Port())}, false, activity.NewNoopClient(), zap.NewNop())
conn, err := driver{}.Open("default", map[string]any{"dsn": fmt.Sprintf("clickhouse://clickhouse:clickhouse@%v:%v", host, port.Port())}, activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)
prepareConn(t, conn)
t.Run("testInformationSchemaAll", func(t *testing.T) { testInformationSchemaAll(t, conn) })
Expand Down
11 changes: 7 additions & 4 deletions runtime/drivers/drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ func Register(name string, driver Driver) {
Drivers[name] = driver
}

// Open opens a new connection
func Open(driver string, config map[string]any, shared bool, client *activity.Client, logger *zap.Logger) (Handle, error) {
// Open opens a new connection.
// If instanceID is empty, the connection is considered shared and its As...() functions may be invoked with different instance IDs.
// If instanceID is not empty, the connection is considered instance-specific and its As...() functions will only be invoked with the same instance ID.
func Open(driver, instanceID string, config map[string]any, client *activity.Client, logger *zap.Logger) (Handle, error) {
d, ok := Drivers[driver]
if !ok {
return nil, fmt.Errorf("unknown driver: %s", driver)
}

conn, err := d.Open(config, shared, client, logger)
conn, err := d.Open(instanceID, config, client, logger)
if err != nil {
return nil, err
}
Expand All @@ -53,7 +55,8 @@ type Driver interface {
Spec() Spec

// Open opens a new handle.
Open(config map[string]any, shared bool, client *activity.Client, logger *zap.Logger) (Handle, error)
// If instanceID is empty, the connection is considered shared and its As...() functions may be invoked with different instance IDs.
Open(instanceID string, config map[string]any, client *activity.Client, logger *zap.Logger) (Handle, error)

// HasAnonymousSourceAccess returns true if the driver can access the data identified by srcProps without any additional configuration.
HasAnonymousSourceAccess(ctx context.Context, srcProps map[string]any, logger *zap.Logger) (bool, error)
Expand Down
22 changes: 11 additions & 11 deletions runtime/drivers/drivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// This should be the only "real" test in the package. Other tests should be added
// as subtests of TestAll.
func TestAll(t *testing.T) {
var matrix = []func(t *testing.T, fn func(driver string, shared bool, cfg map[string]any)) error{
var matrix = []func(t *testing.T, fn func(driver, instanceID string, cfg map[string]any)) error{
withDuckDB,
withFile,
withPostgres,
Expand All @@ -29,9 +29,9 @@ func TestAll(t *testing.T) {
}

for _, withDriver := range matrix {
err := withDriver(t, func(driver string, shared bool, cfg map[string]any) {
err := withDriver(t, func(driver, instanceID string, cfg map[string]any) {
// Open
conn, err := drivers.Open(driver, cfg, shared, activity.NewNoopClient(), zap.NewNop())
conn, err := drivers.Open(driver, instanceID, cfg, activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)
require.NotNil(t, conn)

Expand Down Expand Up @@ -63,26 +63,26 @@ func TestAll(t *testing.T) {
}
}

func withDuckDB(t *testing.T, fn func(driver string, shared bool, cfg map[string]any)) error {
fn("duckdb", false, map[string]any{"dsn": ":memory:?access_mode=read_write", "pool_size": 4})
func withDuckDB(t *testing.T, fn func(driver, instanceID string, cfg map[string]any)) error {
fn("duckdb", "default", map[string]any{"dsn": ":memory:?access_mode=read_write", "pool_size": 4})
return nil
}

func withFile(t *testing.T, fn func(driver string, shared bool, cfg map[string]any)) error {
func withFile(t *testing.T, fn func(driver, instanceID string, cfg map[string]any)) error {
dsn := t.TempDir()
fn("file", false, map[string]any{"dsn": dsn})
fn("file", "default", map[string]any{"dsn": dsn})
return nil
}

func withPostgres(t *testing.T, fn func(driver string, shared bool, cfg map[string]any)) error {
func withPostgres(t *testing.T, fn func(driver, instanceID string, cfg map[string]any)) error {
pg := pgtestcontainer.New(t)
defer pg.Terminate(t)

fn("postgres", true, map[string]any{"dsn": pg.DatabaseURL})
fn("postgres", "default", map[string]any{"dsn": pg.DatabaseURL})
return nil
}

func withSQLite(t *testing.T, fn func(driver string, shared bool, cfg map[string]any)) error {
fn("sqlite", true, map[string]any{"dsn": ":memory:"})
func withSQLite(t *testing.T, fn func(driver, instanceID string, cfg map[string]any)) error {
fn("sqlite", "", map[string]any{"dsn": ":memory:"})
return nil
}
9 changes: 5 additions & 4 deletions runtime/drivers/druid/druid.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package druid

import (
"context"
"errors"
"fmt"
"net/url"
"strings"
Expand Down Expand Up @@ -51,13 +52,13 @@ type configProperties struct {

// Opens a connection to Apache Druid using HTTP API.
// Note that the Druid connection string must have the form "http://user:password@host:port/druid/v2/sql".
func (d *driver) Open(confMap map[string]any, shared bool, client *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
if shared {
return nil, fmt.Errorf("druid driver can't be shared")
func (d driver) Open(instanceID string, config map[string]any, client *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
if instanceID == "" {
return nil, errors.New("druid driver can't be shared")
}

conf := &configProperties{}
err := mapstructure.WeakDecode(confMap, conf)
err := mapstructure.WeakDecode(config, conf)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/drivers/druid/druid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestDruid(t *testing.T) {
require.NoError(t, err)

dd := &driver{}
conn, err := dd.Open(map[string]any{"dsn": druidAPIURL}, false, activity.NewNoopClient(), zap.NewNop())
conn, err := dd.Open("default", map[string]any{"dsn": druidAPIURL}, activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)

olap, ok := conn.AsOLAP("")
Expand Down
15 changes: 8 additions & 7 deletions runtime/drivers/druid/sql_driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package druid

import (
"context"
"github.com/rilldata/rill/runtime/drivers"
"github.com/stretchr/testify/require"
"strings"
"testing"

"github.com/rilldata/rill/runtime/drivers"
"github.com/stretchr/testify/require"

"github.com/rilldata/rill/runtime/pkg/activity"
"github.com/rilldata/rill/runtime/pkg/pbutil"
"go.uber.org/zap"
Expand All @@ -18,7 +19,7 @@ import (
*/
func Ignore_TestDriver_types(t *testing.T) {
driver := &driver{}
handle, err := driver.Open(map[string]any{"pool_size": 2, "dsn": "http://localhost:8888/druid/v2/sql"}, false, activity.NewNoopClient(), zap.NewNop())
handle, err := driver.Open("default", map[string]any{"pool_size": 2, "dsn": "http://localhost:8888/druid/v2/sql"}, activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)

olap, ok := handle.AsOLAP("")
Expand Down Expand Up @@ -55,7 +56,7 @@ func Ignore_TestDriver_types(t *testing.T) {

func Ignore_TestDriver_array_type(t *testing.T) {
driver := &driver{}
handle, err := driver.Open(map[string]any{"pool_size": 2, "dsn": "http://localhost:8888/druid/v2/sql"}, false, activity.NewNoopClient(), zap.NewNop())
handle, err := driver.Open("default", map[string]any{"pool_size": 2, "dsn": "http://localhost:8888/druid/v2/sql"}, activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)

olap, ok := handle.AsOLAP("")
Expand All @@ -81,7 +82,7 @@ func Ignore_TestDriver_array_type(t *testing.T) {

func Ignore_TestDriver_json_type(t *testing.T) {
driver := &driver{}
handle, err := driver.Open(map[string]any{"pool_size": 2, "dsn": "http://localhost:8888/druid/v2/sql"}, false, activity.NewNoopClient(), zap.NewNop())
handle, err := driver.Open("default", map[string]any{"pool_size": 2, "dsn": "http://localhost:8888/druid/v2/sql"}, activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)

olap, ok := handle.AsOLAP("")
Expand All @@ -106,7 +107,7 @@ func Ignore_TestDriver_json_type(t *testing.T) {

func Ignore_TestDriver_multiple_rows(t *testing.T) {
driver := &driver{}
handle, err := driver.Open(map[string]any{"pool_size": 2, "dsn": "http://localhost:8888/druid/v2/sql"}, false, activity.NewNoopClient(), zap.NewNop())
handle, err := driver.Open("default", map[string]any{"pool_size": 2, "dsn": "http://localhost:8888/druid/v2/sql"}, activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)

olap, ok := handle.AsOLAP("")
Expand Down Expand Up @@ -142,7 +143,7 @@ func Ignore_TestDriver_multiple_rows(t *testing.T) {

func Ignore_TestDriver_error(t *testing.T) {
driver := &driver{}
handle, err := driver.Open(map[string]any{"pool_size": 2, "dsn": "http://localhost:8888/druid/v2/sql"}, false, activity.NewNoopClient(), zap.NewNop())
handle, err := driver.Open("default", map[string]any{"pool_size": 2, "dsn": "http://localhost:8888/druid/v2/sql"}, activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)

olap, ok := handle.AsOLAP("")
Expand Down
Loading
Loading