Skip to content

Commit

Permalink
Query parameters support (#854)
Browse files Browse the repository at this point in the history
* first implementation

* protocol compability

* query parameters docs, examples and tests

* query parameters docs, examples and tests

* explicit error on unsupported query arg
  • Loading branch information
jkaflik committed Dec 23, 2022
1 parent 12d1c31 commit a81b180
Show file tree
Hide file tree
Showing 20 changed files with 501 additions and 18 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ The client is tested against the currently [supported versions](https://github.c
* Named and numeric placeholders support
* LZ4/ZSTD compression support
* External data
* [Query parameters](examples/std/query_parameters.go)

Support for the ClickHouse protocol advanced features using `Context`:

* Query ID
* Quota Key
* Settings
* [Query parameters](examples/clickhouse_api/query_parameters.go)
* OpenTelemetry
* Execution events:
* Logs
Expand Down Expand Up @@ -267,14 +269,16 @@ go get -u github.com/ClickHouse/clickhouse-go/v2
* [batch struct](examples/clickhouse_api/append_struct.go)
* [columnar](examples/clickhouse_api/columnar_insert.go)
* [scan struct](examples/clickhouse_api/scan_struct.go)
* [bind params](examples/clickhouse_api/bind.go)
* [query parameters](examples/clickhouse_api/query_parameters.go) (deprecated in favour of native query parameters)
* [bind params](examples/clickhouse_api/bind.go) (deprecated in favour of native query parameters)

### std `database/sql` interface

* [batch](examples/std/batch.go)
* [async insert](examples/std/async.go)
* [open db](examples/std/connect.go)
* [bind params](examples/std/bind.go)
* [query parameters](examples/std/query_parameters.go)
* [bind params](examples/std/bind.go) (deprecated in favour of native query parameters)

## ClickHouse alternatives - ch-go

Expand Down
11 changes: 8 additions & 3 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er

var (
connect = &connect{
id: num,
id: num,
opt: opt,
conn: conn,
debugf: debugf,
Expand All @@ -91,6 +91,11 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er
if err := connect.handshake(opt.Auth.Database, opt.Auth.Username, opt.Auth.Password); err != nil {
return nil, err
}
if connect.revision >= proto.DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM {
if err := connect.sendAddendum(); err != nil {
return nil, err
}
}

// warn only on the first connection in the pool
if num == 1 && !resources.ClientMeta.IsSupportedClickHouseVersion(connect.server.Version) {
Expand All @@ -103,7 +108,7 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er

// https://github.com/ClickHouse/ClickHouse/blob/master/src/Client/Connection.cpp
type connect struct {
id int
id int
opt *Options
conn net.Conn
debugf func(format string, v ...interface{})
Expand Down Expand Up @@ -203,7 +208,7 @@ func (c *connect) sendData(block *proto.Block, name string) error {
return err
}
for i := range block.Columns {
if err := block.EncodeColumn(c.buffer, i); err != nil {
if err := block.EncodeColumn(c.buffer, c.revision, i); err != nil {
return err
}
if len(c.buffer.Buf) >= c.maxCompressionBuffer {
Expand Down
6 changes: 4 additions & 2 deletions conn_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package clickhouse

import (
"context"
"github.com/ClickHouse/clickhouse-go/v2/lib/proto"
"time"
)

func (c *connect) exec(ctx context.Context, query string, args ...interface{}) error {
var (
options = queryOptions(ctx)
body, err = bind(c.server.Timezone, query, args...)
options = queryOptions(ctx)
queryParamsProtocolSupport = c.revision >= proto.DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS
body, err = bindQueryOrAppendParameters(queryParamsProtocolSupport, &options, query, c.server.Timezone, args...)
)
if err != nil {
return err
Expand Down
8 changes: 8 additions & 0 deletions conn_handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,11 @@ func (c *connect) handshake(database, username, password string) error {
c.debugf("[handshake] <- %s", c.server)
return nil
}

func (c *connect) sendAddendum() error {
if c.revision >= proto.DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY {
c.buffer.PutString("") // todo quota key support
}

return c.flush()
}
3 changes: 3 additions & 0 deletions conn_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,9 @@ func (h *httpConnect) prepareRequest(ctx context.Context, reader io.Reader, opti
}
query.Set(key, fmt.Sprint(value))
}
for key, value := range options.parameters {
query.Set(fmt.Sprintf("param_%s", key), value)
}
req.URL.RawQuery = query.Encode()
}

Expand Down
5 changes: 2 additions & 3 deletions conn_http_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ import (
)

func (h *httpConnect) exec(ctx context.Context, query string, args ...interface{}) error {
query, err := bind(h.location, query, args...)
options := queryOptions(ctx)
query, err := bindQueryOrAppendParameters(true, &options, query, h.location, args...)
if err != nil {
return err
}

options := queryOptions(ctx)

res, err := h.sendQuery(ctx, strings.NewReader(query), &options, h.headers)
if res != nil {
defer res.Body.Close()
Expand Down
4 changes: 2 additions & 2 deletions conn_http_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ import (

// release is ignored, because http used by std with empty release function
func (h *httpConnect) query(ctx context.Context, release func(*connect, error), query string, args ...interface{}) (*rows, error) {
query, err := bind(h.location, query, args...)
options := queryOptions(ctx)
query, err := bindQueryOrAppendParameters(true, &options, query, h.location, args...)
if err != nil {
return nil, err
}
options := queryOptions(ctx)
headers := make(map[string]string)
switch h.compression {
case CompressionZSTD, CompressionLZ4:
Expand Down
7 changes: 4 additions & 3 deletions conn_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (

func (c *connect) query(ctx context.Context, release func(*connect, error), query string, args ...interface{}) (*rows, error) {
var (
options = queryOptions(ctx)
onProcess = options.onProcess()
body, err = bind(c.server.Timezone, query, args...)
options = queryOptions(ctx)
onProcess = options.onProcess()
queryParamsProtocolSupport = c.revision >= proto.DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS
body, err = bindQueryOrAppendParameters(queryParamsProtocolSupport, &options, query, c.server.Timezone, args...)
)

if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions conn_send_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (c *connect) sendQuery(body string, o *QueryOptions) error {
Compression: c.compression != CompressionNone,
InitialAddress: c.conn.LocalAddr().String(),
Settings: c.settings(o.settings),
Parameters: parametersToProtoParameters(o.parameters),
}
if err := q.Encode(c.buffer, c.revision); err != nil {
return err
Expand All @@ -48,3 +49,14 @@ func (c *connect) sendQuery(body string, o *QueryOptions) error {
}
return c.flush()
}

func parametersToProtoParameters(parameters Parameters) (s proto.Parameters) {
for k, v := range parameters {
s = append(s, proto.Parameter{
Key: k,
Value: v,
})
}

return s
}
9 changes: 9 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var _contextOptionKey = &QueryOptions{
}

type Settings map[string]interface{}
type Parameters map[string]string
type (
QueryOption func(*QueryOptions) error
QueryOptions struct {
Expand All @@ -49,6 +50,7 @@ type (
profileEvents func([]ProfileEvent)
}
settings Settings
parameters Parameters
external []*ext.Table
blockBufferSize uint8
}
Expand Down Expand Up @@ -89,6 +91,13 @@ func WithSettings(settings Settings) QueryOption {
}
}

func WithParameters(params Parameters) QueryOption {
return func(o *QueryOptions) error {
o.parameters = params
return nil
}
}

func WithLogs(fn func(*Log)) QueryOption {
return func(o *QueryOptions) error {
o.events.logs = fn
Expand Down
4 changes: 4 additions & 0 deletions examples/clickhouse_api/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ func TestQueryRow(t *testing.T) {
require.NoError(t, QueryRow())
}

func TestQueryWithParameters(t *testing.T) {
require.NoError(t, QueryWithParameters())
}

func TestSelectStruct(t *testing.T) {
require.NoError(t, SelectStruct())
}
Expand Down
54 changes: 54 additions & 0 deletions examples/clickhouse_api/query_parameters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to ClickHouse, Inc. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. ClickHouse, Inc. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package clickhouse_api

import (
"context"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
)

func QueryWithParameters() error {
conn, err := GetNativeConnection(nil, nil, nil)
if err != nil {
return err
}

if !clickhouse_tests.CheckMinServerServerVersion(conn, 22, 8, 0) {
return nil
}

chCtx := clickhouse.Context(context.Background(), clickhouse.WithParameters(clickhouse.Parameters{
"num": "42",
"str": "hello",
"array": "['a', 'b', 'c']",
}))

row := conn.QueryRow(chCtx, "SELECT {num:UInt64} v, {str:String} s, {array:Array(String)} a")
var (
col1 uint64
col2 string
col3 []string
)
if err := row.Scan(&col1, &col2, &col3); err != nil {
return err
}
fmt.Printf("row: col1=%d, col2=%s, col3=%s\n", col1, col2, col3)
return nil
}
4 changes: 4 additions & 0 deletions examples/std/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ func TestStdQueryRows(t *testing.T) {
require.NoError(t, QueryRows())
}

func TestStdQueryWithParameters(t *testing.T) {
require.NoError(t, QueryWithParameters())
}

func TestStdAsyncInsert(t *testing.T) {
require.NoError(t, AsyncInsert())
}
Expand Down
52 changes: 52 additions & 0 deletions examples/std/query_parameters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to ClickHouse, Inc. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. ClickHouse, Inc. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package std

import (
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/tests/std"
)

func QueryWithParameters() error {
conn, err := GetStdOpenDBConnection(clickhouse.Native, nil, nil, nil)
if err != nil {
return err
}

if !std.CheckMinServerVersion(conn, 22, 8, 0) {
return nil
}

row := conn.QueryRow(
"SELECT {num:UInt64} v, {str:String} s, {array:Array(String)} a",
clickhouse.Named("num", "42"),
clickhouse.Named("str", "hello"),
clickhouse.Named("array", "['a', 'b', 'c']"),
)
var (
col1 uint64
col2 string
col3 []string
)
if err := row.Scan(&col1, &col2, &col3); err != nil {
return err
}
fmt.Printf("row: col1=%d, col2=%s, col3=%s\n", col1, col2, col3)
return nil
}
23 changes: 21 additions & 2 deletions lib/proto/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,16 @@ func (b *Block) EncodeHeader(buffer *proto.Buffer, revision uint64) (err error)
return nil
}

func (b *Block) EncodeColumn(buffer *proto.Buffer, i int) (err error) {
func (b *Block) EncodeColumn(buffer *proto.Buffer, revision uint64, i int) (err error) {
if i >= 0 && i < len(b.Columns) {
c := b.Columns[i]
buffer.PutString(c.Name())
buffer.PutString(string(c.Type()))

if revision >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION {
buffer.PutBool(false)
}

if serialize, ok := c.(column.CustomSerialization); ok {
if err := serialize.WriteStatePrefix(buffer); err != nil {
return &BlockError{
Expand All @@ -167,7 +172,7 @@ func (b *Block) Encode(buffer *proto.Buffer, revision uint64) (err error) {
return err
}
for i := range b.Columns {
if err := b.EncodeColumn(buffer, i); err != nil {
if err := b.EncodeColumn(buffer, revision, i); err != nil {
return err
}
}
Expand Down Expand Up @@ -213,6 +218,20 @@ func (b *Block) Decode(reader *proto.Reader, revision uint64) (err error) {
if err != nil {
return err
}

if revision >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION {
hasCustom, err := reader.Bool()
if err != nil {
return err
}
if hasCustom {
return &BlockError{
Op: "Decode",
Err: errors.New(fmt.Sprintf("custom serialization for column %s. not supported", columnName)),
}
}
}

if numRows != 0 {
if serialize, ok := c.(column.CustomSerialization); ok {
if err := serialize.ReadStatePrefix(reader); err != nil {
Expand Down
Loading

0 comments on commit a81b180

Please sign in to comment.