Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Druid HTTP API driver #4255

Merged
merged 19 commits into from
Apr 5, 2024
22 changes: 12 additions & 10 deletions runtime/drivers/druid/druid.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"github.com/rilldata/rill/runtime/pkg/activity"
"go.uber.org/zap"

// Load calcite avatica driver for druid
_ "github.com/apache/calcite-avatica-go/v5"
// Load Druid database/sql driver
_ "github.com/rilldata/rill/runtime/drivers/druid/druidsqldriver"
begelundmuller marked this conversation as resolved.
Show resolved Hide resolved
)

var spec = drivers.Spec{
Expand All @@ -26,15 +26,17 @@ var spec = drivers.Spec{
}

func init() {
drivers.Register("druid", driver{})
drivers.RegisterAsConnector("druid", driver{})
drivers.Register("druid", &driver{})
drivers.RegisterAsConnector("druid", &driver{})
}

type driver struct{}

var _ drivers.Driver = &driver{}

// Open connects to Druid using Avatica.
// Note that the Druid connection string must have the form "http://host/druid/v2/sql/avatica-protobuf/".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to update this comment

func (d driver) Open(config map[string]any, shared bool, client *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
func (d *driver) Open(config map[string]any, shared bool, client *activity.Client, logger *zap.Logger) (drivers.Handle, error) {
if shared {
return nil, fmt.Errorf("druid driver can't be shared")
}
Expand All @@ -43,7 +45,7 @@ func (d driver) Open(config map[string]any, shared bool, client *activity.Client
return nil, fmt.Errorf("require dsn to open druid connection")
}

db, err := sqlx.Open("avatica", dsn)
db, err := sqlx.Open("druid", dsn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the dsn contains /avatica-protobuf, can we try to parse and rewrite it to the new format? (For backwards compatibility.)

if err != nil {
return nil, err
}
Expand All @@ -63,19 +65,19 @@ func (d driver) Open(config map[string]any, shared bool, client *activity.Client
return conn, nil
}

func (d driver) Drop(config map[string]any, logger *zap.Logger) error {
func (d *driver) Drop(config map[string]any, logger *zap.Logger) error {
return drivers.ErrDropNotSupported
}

func (d driver) Spec() drivers.Spec {
func (d *driver) Spec() drivers.Spec {
return spec
}

func (d driver) HasAnonymousSourceAccess(ctx context.Context, src map[string]any, logger *zap.Logger) (bool, error) {
func (d *driver) HasAnonymousSourceAccess(ctx context.Context, src map[string]any, logger *zap.Logger) (bool, error) {
return false, fmt.Errorf("not implemented")
}

func (d driver) TertiarySourceConnectors(ctx context.Context, src map[string]any, logger *zap.Logger) ([]string, error) {
func (d *driver) TertiarySourceConnectors(ctx context.Context, src map[string]any, logger *zap.Logger) ([]string, error) {
return nil, fmt.Errorf("not implemented")
}

Expand Down
6 changes: 3 additions & 3 deletions runtime/drivers/druid/druid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,11 @@ func TestDruid(t *testing.T) {
brokerURL, err := container.PortEndpoint(ctx, "8082/tcp", "http")
require.NoError(t, err)

avaticaURL, err := url.JoinPath(brokerURL, "/druid/v2/sql/avatica-protobuf/")
druidAPIURL, err := url.JoinPath(brokerURL, "/druid/v2/sql")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also update our docs with the new expected API URL format (including username/password)

require.NoError(t, err)

conn, err := driver{}.Open(map[string]any{"dsn": avaticaURL}, false, activity.NewNoopClient(), zap.NewNop())
dd := &driver{}
conn, err := dd.Open(map[string]any{"dsn": druidAPIURL}, false, activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)

olap, ok := conn.AsOLAP("")
Expand All @@ -121,7 +122,6 @@ func TestDruid(t *testing.T) {
t.Run("time floor", func(t *testing.T) { testTimeFloor(t, olap) })

require.NoError(t, conn.Close())
require.Error(t, conn.(*connection).db.Ping())
}

func testIngest(t *testing.T, coordinatorURL string) {
Expand Down
267 changes: 267 additions & 0 deletions runtime/drivers/druid/druidsqldriver/druid_api_sql_driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
package druidsqldriver

import (
"bytes"
"context"
"database/sql"
"database/sql/driver"
"encoding/json"
"fmt"
"io"
"net/http"
"time"

"github.com/google/uuid"
)

type druidSQLDriver struct{}

var _ driver.Driver = &druidSQLDriver{}

func (a *druidSQLDriver) Open(dsn string) (driver.Conn, error) {
client := http.Client{Timeout: time.Second * 10}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems like a very short timeout? What if a query runs for e.g. 2 minutes?


return &sqlConnection{
client: &client,
dsn: dsn,
}, nil
}

func init() {
sql.Register("druid", &druidSQLDriver{})
}

type sqlConnection struct {
client *http.Client
dsn string
}

var _ driver.QueryerContext = &sqlConnection{}

func emptyTransformer(v any) (any, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename to identityTransformer

return v, nil
}

func toStringArray(values []any) []string {
s := make([]string, len(values))
for i, v := range values {
vv, ok := v.(string)
if !ok {
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a case that should error?

s[i] = vv
}
return s
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put util functions at the end of the file. See the style guide on function ordering: https://github.com/uber-go/guide/blob/master/style.md#function-grouping-and-ordering


func (c *sqlConnection) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
b, err := json.Marshal(druidRequest(query, args))
if err != nil {
return nil, err
}

bodyReader := bytes.NewReader(b)

req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.dsn, bodyReader)
if err != nil {
return nil, err
}

req.Header.Add("Content-Type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}

dec := json.NewDecoder(resp.Body)

var obj any
err = dec.Decode(&obj)
if err != nil {
resp.Body.Close()
return nil, err
}
switch v := obj.(type) {
case map[string]any:
resp.Body.Close()
return nil, fmt.Errorf("%v", obj)
case []any:
columns := toStringArray(v)
err = dec.Decode(&obj)
if err != nil {
resp.Body.Close()
return nil, err
}

types := toStringArray(obj.([]any))

transformers := make([]func(any) (any, error), len(columns))
for i, c := range types {
transformers[i] = emptyTransformer
if c == "TIMESTAMP" {
transformers[i] = func(v any) (any, error) {
t, err := time.Parse(time.RFC3339, v.(string))
if err != nil {
return nil, err
}

return t, nil
}
} else if c == "ARRAY" {
transformers[i] = func(v any) (any, error) {
var l []any
err := json.Unmarshal([]byte(v.(string)), &l)
if err != nil {
return nil, err
}
return l, nil
}
} else if c == "OTHER" {
transformers[i] = func(v any) (any, error) {
var l map[string]any
err := json.Unmarshal([]byte(v.(string)), &l)
Comment on lines +109 to +121
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't the decoder already have parsed the value as JSON? Or are the values encoded as JSON at multiple levels in Druid?

Copy link
Contributor Author

@egor-ryashin egor-ryashin Mar 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the decoder parses a string value, because complex types are outputed as strings, see example #4255 (comment)

if err != nil {
return nil, err
}
return l, nil
}
}
}

druidRows := &druidRows{
closer: resp.Body,
dec: dec,
columns: columns,
types: types,
transformers: transformers,
}
return druidRows, nil
default:
resp.Body.Close()
return nil, fmt.Errorf("unexpected response: %v", obj)
}
}

func toType(v any) string {
switch v.(type) {
case int:
return "INTEGER"
case float64:
return "DOUBLE"
case bool:
return "BOOLEAN"
default:
return "VARCHAR"
}
}

type druidRows struct {
closer io.ReadCloser
dec *json.Decoder
columns []string
types []string
transformers []func(any) (any, error)
}

var _ driver.Rows = &druidRows{}

func (dr *druidRows) Columns() []string {
return dr.columns
}

func (dr *druidRows) Close() error {
return dr.closer.Close()
}

func (dr *druidRows) Next(dest []driver.Value) error {
var a []any
err := dr.dec.Decode(&a)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of allocating a new array on each call to Next, could we maybe move it to a property on the druidRows and re-use the same array on each iteration?

if err != nil {
return err
}

for i, v := range a {
v, err := dr.transformers[i](v)
if err != nil {
return err
}

dest[i] = v
}

return nil
}

type stmt struct {
conn *sqlConnection
query string
}

func (c *sqlConnection) Prepare(query string) (driver.Stmt, error) {
return &stmt{
query: query,
conn: c,
}, nil
}

func (c *sqlConnection) Close() error {
return nil
}

func (c *sqlConnection) Begin() (driver.Tx, error) {
return nil, fmt.Errorf("unsupported")
}

func (s *stmt) Close() error {
return nil
}

func (s *stmt) NumInput() int {
return 0
}

type DruidQueryContext struct {
SQLQueryID string `json:"sqlQueryId"`
}

type DruidParameter struct {
Type string `json:"type"`
Value any `json:"value"`
}

type DruidRequest struct {
Query string `json:"query"`
Header bool `json:"header"`
SQLTypesHeader bool `json:"sqlTypesHeader"`
ResultFormat string `json:"resultFormat"`
Parameters []DruidParameter `json:"parameters"`
Context DruidQueryContext `json:"context"`
}

func druidRequest(query string, args []driver.NamedValue) *DruidRequest {
parameters := make([]DruidParameter, len(args))
for i, arg := range args {
parameters[i] = DruidParameter{
Type: toType(arg.Value),
Value: arg.Value,
}
}
return &DruidRequest{
Query: query,
Header: true,
SQLTypesHeader: true,
ResultFormat: "arrayLines",
Parameters: parameters,
Context: DruidQueryContext{
SQLQueryID: uuid.New().String(),
},
}
}

func (s *stmt) Exec(args []driver.Value) (driver.Result, error) {
return nil, fmt.Errorf("unsupported")
}

func (s *stmt) Query(args []driver.Value) (driver.Rows, error) {
return nil, fmt.Errorf("unsupported")
}
6 changes: 3 additions & 3 deletions runtime/drivers/druid/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ func (c *connection) Exec(ctx context.Context, stmt *drivers.Statement) error {

func (c *connection) Execute(ctx context.Context, stmt *drivers.Statement) (*drivers.Result, error) {
if stmt.DryRun {
// TODO: Find way to validate with args
prepared, err := c.db.PrepareContext(ctx, stmt.Query)
rows, err := c.db.QueryxContext(ctx, "EXPLAIN PLAN FOR "+stmt.Query, stmt.Args...)
if err != nil {
return nil, err
}
return nil, prepared.Close()

return nil, rows.Close()
}

var cancelFunc context.CancelFunc
Expand Down
Loading
Loading