-
Notifications
You must be signed in to change notification settings - Fork 117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Druid HTTP API driver #4255
Druid HTTP API driver #4255
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also how can we use this to add support for query priorities in Druid?
} | ||
|
||
req.Header.Add("Content-Type", "application/json") | ||
// nolint:all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the need for nolint
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
type druidSQLDriver struct{} | ||
|
||
var _ driver.Driver = &druidSQLDriver{} | ||
|
||
func (a *druidSQLDriver) Open(dsn string) (driver.Conn, error) { | ||
client := http.Client{Timeout: time.Second * 10} | ||
|
||
return &sqlConnection{ | ||
client: &client, | ||
dsn: dsn, | ||
}, nil | ||
} | ||
|
||
func init() { | ||
sql.Register("druid", &druidSQLDriver{}) | ||
} | ||
|
||
type sqlConnection struct { | ||
client *http.Client | ||
dsn string | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A Druid driver for database/sql
should be a standalone component, so can we put this in runtime/drivers/druid/druidsqldriver
or something like that?
runtime/drivers/druid/druid.go
Outdated
drivers.Register("druid", &driversDriver{}) | ||
} | ||
|
||
type driver struct{} | ||
type driversDriver struct{} | ||
|
||
var _ drivers.Driver = &driversDriver{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep the old type name here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
druid/druid.go:20:6: renaming this type "driversDriver" to "driver" would conflict druid/druid_api_sql_driver.go:7:2: with this imported package name
runtime/drivers/druid/druid.go
Outdated
// Open connects to Druid using Avatica. | ||
// Note that the Druid connection string must have the form "http://host/druid/v2/sql/avatica-protobuf/". | ||
func (d driver) Open(config map[string]any, shared bool, client activity.Client, logger *zap.Logger) (drivers.Handle, error) { | ||
func (d *driversDriver) Open(config map[string]any, shared bool, client activity.Client, logger *zap.Logger) (drivers.Handle, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the new DSN connection format that is expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like this http://localhost:8888/druid/v2/sql
type JSONWalker struct { | ||
dec *json.Decoder | ||
err error | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The walker makes the logic pretty hard to follow – I'm guessing it's for performance? I wonder if there's a way to have Druid return NDJSON instead or some other kind of result pagination, so we could just use normal struct deserialization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ND arrays are possible:
["__time","aos","id","EXPR$3"]
["TIMESTAMP","OTHER","BIGINT","BOOLEAN"]
["2010-01-01T00:00:00.000Z","[{\"b\":\"gamma\",\"a\":1},{\"b\":\"beta\",\"a\":2}]",111,true]
Objects are possible but it's a waste of CPU and network:
{"__time":{"sqlType":"TIMESTAMP"},"aos":{"sqlType":"OTHER"},"id":{"sqlType":"BIGINT"},"EXPR$3":{"sqlType":"BOOLEAN"}}
{"__time":"2010-01-01T00:00:00.000Z","aos":"[{\"b\":\"gamma\",\"a\":1},{\"b\":\"beta\",\"a\":2}]","id":111,"EXPR$3":true}
we can add |
runtime/drivers/druid/olap.go
Outdated
if stmt.DryRun { | ||
// TODO: Find way to validate with args | ||
prepared, err := c.db.PrepareContext(ctx, stmt.Query) | ||
rows, err := c.db.QueryxContext(ctx, stmt.Query, stmt.Args...) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return nil, prepared.Close() | ||
|
||
return nil, rows.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a dry run, so it can't execute the query. The dry runs are called a lot during metric expression validation.
var _ driver.Driver = &druidSQLDriver{} | ||
|
||
func (a *druidSQLDriver) Open(dsn string) (driver.Conn, error) { | ||
client := http.Client{Timeout: time.Second * 10} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems like a very short timeout? What if a query runs for e.g. 2 minutes?
func emptyTransformer(v any) (any, error) { | ||
return v, nil | ||
} | ||
|
||
func toStringArray(values []any) []string { | ||
s := make([]string, len(values)) | ||
for i, v := range values { | ||
vv, ok := v.(string) | ||
if !ok { | ||
return nil | ||
} | ||
s[i] = vv | ||
} | ||
return s | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: put util functions at the end of the file. See the style guide on function ordering: https://github.com/uber-go/guide/blob/master/style.md#function-grouping-and-ordering
if !ok { | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a case that should error?
|
||
var _ driver.QueryerContext = &sqlConnection{} | ||
|
||
func emptyTransformer(v any) (any, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to identityTransformer
} else if c == "ARRAY" { | ||
transformers[i] = func(v any) (any, error) { | ||
var l []any | ||
err := json.Unmarshal([]byte(v.(string)), &l) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return l, nil | ||
} | ||
} else if c == "OTHER" { | ||
transformers[i] = func(v any) (any, error) { | ||
var l map[string]any | ||
err := json.Unmarshal([]byte(v.(string)), &l) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't the decoder already have parsed the value as JSON? Or are the values encoded as JSON at multiple levels in Druid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the decoder parses a string value, because complex types are outputed as strings, see example #4255 (comment)
var a []any | ||
err := dr.dec.Decode(&a) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of allocating a new array on each call to Next
, could we maybe move it to a property on the druidRows
and re-use the same array on each iteration?
@@ -104,10 +104,11 @@ func TestDruid(t *testing.T) { | |||
brokerURL, err := container.PortEndpoint(ctx, "8082/tcp", "http") | |||
require.NoError(t, err) | |||
|
|||
avaticaURL, err := url.JoinPath(brokerURL, "/druid/v2/sql/avatica-protobuf/") | |||
druidAPIURL, err := url.JoinPath(brokerURL, "/druid/v2/sql") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also update our docs with the new expected API URL format (including username/password)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just missing updating the DSN format in the docs in rill/docs/docs/reference/olap-engines/druid.md
, then we can merge this.
does not look like it supports immediate query cancellation, we will have to send a delete request with queryId when context is cancelled. |
can you check this, but getting this error locally when connecting to metrics cluster, this project
its originating at |
added |
issue was that my project was still using older avatica connection string |
|
||
As an example, this typically looks something like: | ||
|
||
```bash | ||
|
||
connector.druid.dsn="https://<hostname>:<port>/druid/v2/sql/avatica-protobuf?authentication=BASIC&avaticaUser=<username>&avaticaPassword=<password>" | ||
connector.druid.dsn="https://user1:2lkj1%40fs@localhost:8888/druid/v2/sql" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we put username:password
here instead of user1:2lkj1%40fs
to clarify?
runtime/drivers/druid/druid.go
Outdated
} | ||
|
||
type driver struct{} | ||
|
||
var _ drivers.Driver = &driver{} | ||
|
||
// Open connects to Druid using Avatica. | ||
// Note that the Druid connection string must have the form "http://host/druid/v2/sql/avatica-protobuf/". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to update this comment
@@ -43,7 +45,7 @@ func (d driver) Open(config map[string]any, shared bool, client *activity.Client | |||
return nil, fmt.Errorf("require dsn to open druid connection") | |||
} | |||
|
|||
db, err := sqlx.Open("avatica", dsn) | |||
db, err := sqlx.Open("druid", dsn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the dsn
contains /avatica-protobuf
, can we try to parse and rewrite it to the new format? (For backwards compatibility.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved, but there's a linter error that needs fixing
Tested manually using dashboards UI.