Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query parameters support #854

Merged
merged 6 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ The client is tested against the currently [supported versions](https://github.c
* Named and numeric placeholders support
* LZ4/ZSTD compression support
* External data
* [Query parameters](examples/std/query_parameters.go)

Support for the ClickHouse protocol advanced features using `Context`:

* Query ID
* Quota Key
* Settings
* [Query parameters](examples/clickhouse_api/query_parameters.go)
* OpenTelemetry
* Execution events:
* Logs
Expand Down Expand Up @@ -267,14 +269,16 @@ go get -u github.com/ClickHouse/clickhouse-go/v2
* [batch struct](examples/clickhouse_api/append_struct.go)
* [columnar](examples/clickhouse_api/columnar_insert.go)
* [scan struct](examples/clickhouse_api/scan_struct.go)
* [bind params](examples/clickhouse_api/bind.go)
* [query parameters](examples/clickhouse_api/query_parameters.go) (deprecated in favour of native query parameters)
* [bind params](examples/clickhouse_api/bind.go) (deprecated in favour of native query parameters)

### std `database/sql` interface

* [batch](examples/std/batch.go)
* [async insert](examples/std/async.go)
* [open db](examples/std/connect.go)
* [bind params](examples/std/bind.go)
* [query parameters](examples/std/query_parameters.go)
* [bind params](examples/std/bind.go) (deprecated in favour of native query parameters)

## ClickHouse alternatives - ch-go

Expand Down
11 changes: 8 additions & 3 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er

var (
connect = &connect{
id: num,
id: num,
opt: opt,
conn: conn,
debugf: debugf,
Expand All @@ -91,6 +91,11 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er
if err := connect.handshake(opt.Auth.Database, opt.Auth.Username, opt.Auth.Password); err != nil {
return nil, err
}
if connect.revision >= proto.DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM {
if err := connect.sendAddendum(); err != nil {
return nil, err
}
}

// warn only on the first connection in the pool
if num == 1 && !resources.ClientMeta.IsSupportedClickHouseVersion(connect.server.Version) {
Expand All @@ -103,7 +108,7 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er

// https://github.com/ClickHouse/ClickHouse/blob/master/src/Client/Connection.cpp
type connect struct {
id int
id int
opt *Options
conn net.Conn
debugf func(format string, v ...interface{})
Expand Down Expand Up @@ -203,7 +208,7 @@ func (c *connect) sendData(block *proto.Block, name string) error {
return err
}
for i := range block.Columns {
if err := block.EncodeColumn(c.buffer, i); err != nil {
if err := block.EncodeColumn(c.buffer, c.revision, i); err != nil {
return err
}
if len(c.buffer.Buf) >= c.maxCompressionBuffer {
Expand Down
6 changes: 4 additions & 2 deletions conn_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package clickhouse

import (
"context"
"github.com/ClickHouse/clickhouse-go/v2/lib/proto"
"time"
)

func (c *connect) exec(ctx context.Context, query string, args ...interface{}) error {
var (
options = queryOptions(ctx)
body, err = bind(c.server.Timezone, query, args...)
options = queryOptions(ctx)
queryParamsProtocolSupport = c.revision >= proto.DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS
body, err = bindQueryOrAppendParameters(queryParamsProtocolSupport, &options, query, c.server.Timezone, args...)
)
if err != nil {
return err
Expand Down
8 changes: 8 additions & 0 deletions conn_handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,11 @@ func (c *connect) handshake(database, username, password string) error {
c.debugf("[handshake] <- %s", c.server)
return nil
}

func (c *connect) sendAddendum() error {
if c.revision >= proto.DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY {
c.buffer.PutString("") // todo quota key support
}

return c.flush()
}
3 changes: 3 additions & 0 deletions conn_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,9 @@ func (h *httpConnect) prepareRequest(ctx context.Context, reader io.Reader, opti
}
query.Set(key, fmt.Sprint(value))
}
for key, value := range options.parameters {
query.Set(fmt.Sprintf("param_%s", key), value)
}
req.URL.RawQuery = query.Encode()
}

Expand Down
5 changes: 2 additions & 3 deletions conn_http_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ import (
)

func (h *httpConnect) exec(ctx context.Context, query string, args ...interface{}) error {
query, err := bind(h.location, query, args...)
options := queryOptions(ctx)
query, err := bindQueryOrAppendParameters(true, &options, query, h.location, args...)
if err != nil {
return err
}

options := queryOptions(ctx)

res, err := h.sendQuery(ctx, strings.NewReader(query), &options, h.headers)
if res != nil {
defer res.Body.Close()
Expand Down
4 changes: 2 additions & 2 deletions conn_http_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ import (

// release is ignored, because http used by std with empty release function
func (h *httpConnect) query(ctx context.Context, release func(*connect, error), query string, args ...interface{}) (*rows, error) {
query, err := bind(h.location, query, args...)
options := queryOptions(ctx)
query, err := bindQueryOrAppendParameters(true, &options, query, h.location, args...)
if err != nil {
return nil, err
}
options := queryOptions(ctx)
headers := make(map[string]string)
switch h.compression {
case CompressionZSTD, CompressionLZ4:
Expand Down
7 changes: 4 additions & 3 deletions conn_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (

func (c *connect) query(ctx context.Context, release func(*connect, error), query string, args ...interface{}) (*rows, error) {
var (
options = queryOptions(ctx)
onProcess = options.onProcess()
body, err = bind(c.server.Timezone, query, args...)
options = queryOptions(ctx)
onProcess = options.onProcess()
queryParamsProtocolSupport = c.revision >= proto.DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS
body, err = bindQueryOrAppendParameters(queryParamsProtocolSupport, &options, query, c.server.Timezone, args...)
)

if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions conn_send_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (c *connect) sendQuery(body string, o *QueryOptions) error {
Compression: c.compression != CompressionNone,
InitialAddress: c.conn.LocalAddr().String(),
Settings: c.settings(o.settings),
Parameters: parametersToProtoParameters(o.parameters),
}
if err := q.Encode(c.buffer, c.revision); err != nil {
return err
Expand All @@ -48,3 +49,14 @@ func (c *connect) sendQuery(body string, o *QueryOptions) error {
}
return c.flush()
}

func parametersToProtoParameters(parameters Parameters) (s proto.Parameters) {
for k, v := range parameters {
s = append(s, proto.Parameter{
Key: k,
Value: v,
})
}

return s
}
9 changes: 9 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var _contextOptionKey = &QueryOptions{
}

type Settings map[string]interface{}
type Parameters map[string]string
type (
QueryOption func(*QueryOptions) error
QueryOptions struct {
Expand All @@ -49,6 +50,7 @@ type (
profileEvents func([]ProfileEvent)
}
settings Settings
parameters Parameters
external []*ext.Table
blockBufferSize uint8
}
Expand Down Expand Up @@ -89,6 +91,13 @@ func WithSettings(settings Settings) QueryOption {
}
}

func WithParameters(params Parameters) QueryOption {
return func(o *QueryOptions) error {
o.parameters = params
return nil
}
}

func WithLogs(fn func(*Log)) QueryOption {
return func(o *QueryOptions) error {
o.events.logs = fn
Expand Down
4 changes: 4 additions & 0 deletions examples/clickhouse_api/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ func TestQueryRow(t *testing.T) {
require.NoError(t, QueryRow())
}

func TestQueryWithParameters(t *testing.T) {
require.NoError(t, QueryWithParameters())
}

func TestSelectStruct(t *testing.T) {
require.NoError(t, SelectStruct())
}
Expand Down
54 changes: 54 additions & 0 deletions examples/clickhouse_api/query_parameters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to ClickHouse, Inc. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. ClickHouse, Inc. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package clickhouse_api

import (
"context"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
)

func QueryWithParameters() error {
conn, err := GetNativeConnection(nil, nil, nil)
if err != nil {
return err
}

if !clickhouse_tests.CheckMinServerServerVersion(conn, 22, 8, 0) {
return nil
}

chCtx := clickhouse.Context(context.Background(), clickhouse.WithParameters(clickhouse.Parameters{
"num": "42",
"str": "hello",
"array": "['a', 'b', 'c']",
}))

row := conn.QueryRow(chCtx, "SELECT {num:UInt64} v, {str:String} s, {array:Array(String)} a")
var (
col1 uint64
col2 string
col3 []string
)
if err := row.Scan(&col1, &col2, &col3); err != nil {
return err
}
fmt.Printf("row: col1=%d, col2=%s, col3=%s\n", col1, col2, col3)
return nil
}
4 changes: 4 additions & 0 deletions examples/std/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ func TestStdQueryRows(t *testing.T) {
require.NoError(t, QueryRows())
}

func TestStdQueryWithParameters(t *testing.T) {
require.NoError(t, QueryWithParameters())
}

func TestStdAsyncInsert(t *testing.T) {
require.NoError(t, AsyncInsert())
}
Expand Down
52 changes: 52 additions & 0 deletions examples/std/query_parameters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to ClickHouse, Inc. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. ClickHouse, Inc. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package std

import (
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/tests/std"
)

func QueryWithParameters() error {
conn, err := GetStdOpenDBConnection(clickhouse.Native, nil, nil, nil)
if err != nil {
return err
}

if !std.CheckMinServerVersion(conn, 22, 8, 0) {
return nil
}

row := conn.QueryRow(
"SELECT {num:UInt64} v, {str:String} s, {array:Array(String)} a",
clickhouse.Named("num", "42"),
clickhouse.Named("str", "hello"),
clickhouse.Named("array", "['a', 'b', 'c']"),
)
var (
col1 uint64
col2 string
col3 []string
)
if err := row.Scan(&col1, &col2, &col3); err != nil {
return err
}
fmt.Printf("row: col1=%d, col2=%s, col3=%s\n", col1, col2, col3)
return nil
}
23 changes: 21 additions & 2 deletions lib/proto/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,16 @@ func (b *Block) EncodeHeader(buffer *proto.Buffer, revision uint64) (err error)
return nil
}

func (b *Block) EncodeColumn(buffer *proto.Buffer, i int) (err error) {
func (b *Block) EncodeColumn(buffer *proto.Buffer, revision uint64, i int) (err error) {
if i >= 0 && i < len(b.Columns) {
c := b.Columns[i]
buffer.PutString(c.Name())
buffer.PutString(string(c.Type()))

if revision >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION {
buffer.PutBool(false)
}

if serialize, ok := c.(column.CustomSerialization); ok {
if err := serialize.WriteStatePrefix(buffer); err != nil {
return &BlockError{
Expand All @@ -167,7 +172,7 @@ func (b *Block) Encode(buffer *proto.Buffer, revision uint64) (err error) {
return err
}
for i := range b.Columns {
if err := b.EncodeColumn(buffer, i); err != nil {
if err := b.EncodeColumn(buffer, revision, i); err != nil {
return err
}
}
Expand Down Expand Up @@ -213,6 +218,20 @@ func (b *Block) Decode(reader *proto.Reader, revision uint64) (err error) {
if err != nil {
return err
}

if revision >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION {
hasCustom, err := reader.Bool()
if err != nil {
return err
}
if hasCustom {
return &BlockError{
Op: "Decode",
Err: errors.New(fmt.Sprintf("custom serialization for column %s. not supported", columnName)),
}
}
}

if numRows != 0 {
if serialize, ok := c.(column.CustomSerialization); ok {
if err := serialize.ReadStatePrefix(reader); err != nil {
Expand Down
Loading