diff --git a/conn_http.go b/conn_http.go index c9f2f929b4..3a4e07b573 100644 --- a/conn_http.go +++ b/conn_http.go @@ -32,6 +32,8 @@ import ( "net/http" "net/url" "os" + "regexp" + "strconv" "strings" "sync" "time" @@ -50,6 +52,11 @@ const ( queryIDParamName = "query_id" ) +var ( + dbExceptionMainPattern = regexp.MustCompile(`Code:\s*(\d+)\.\s*DB::Exception:\s*(.*?)\s*\(([A-Z_]+)\)\s*\(version`) + dbExceptionFallbackPattern = regexp.MustCompile(`Code:\s*(\d+)\.\s*DB::Exception:\s*(.*)`) +) + type Pool[T any] struct { pool *sync.Pool } @@ -571,3 +578,47 @@ func (h *httpConnect) close() error { h.client = nil return nil } + +type DBException struct { + Code int + ErrorType string + ErrorMessage string +} + +func (e *DBException) Error() string { + return fmt.Sprintf("ClickHouse DB::Exception (Code: %d, Type: %s): %s", + e.Code, e.ErrorType, e.ErrorMessage) +} + +func checkDBException(body []byte) error { + text := string(body) + + matches := dbExceptionMainPattern.FindStringSubmatch(text) + if len(matches) == 4 { + code, err := strconv.Atoi(matches[1]) + if err != nil { + return nil + } + + return &DBException{ + Code: code, + ErrorType: matches[3], + ErrorMessage: strings.TrimSpace(matches[2]), + } + } + + fallbackMatches := dbExceptionFallbackPattern.FindStringSubmatch(text) + if len(fallbackMatches) == 3 { + code, err := strconv.Atoi(fallbackMatches[1]) + if err != nil { + return nil + } + + return &DBException{ + Code: code, + ErrorMessage: strings.TrimSpace(fallbackMatches[2]), + } + } + + return nil +} diff --git a/conn_http_async_insert.go b/conn_http_async_insert.go index 3e197f0bbd..98c49da815 100644 --- a/conn_http_async_insert.go +++ b/conn_http_async_insert.go @@ -39,11 +39,27 @@ func (h *httpConnect) asyncInsert(ctx context.Context, query string, wait bool, } res, err := h.sendQuery(ctx, query, &options, h.headers) - if res != nil { - defer res.Body.Close() + if res == nil { + return err + } + + defer res.Body.Close() + + if err != nil { // we don't care about result, so just discard it to reuse connection _, _ = io.Copy(io.Discard, res.Body) + + return err + } + + msg, err := h.readRawResponse(res) + if err != nil { + return err + } + + if err = checkDBException(msg); err != nil { + return err } - return err + return nil } diff --git a/conn_http_batch.go b/conn_http_batch.go index b4b27920c7..885ccbbffe 100644 --- a/conn_http_batch.go +++ b/conn_http_batch.go @@ -211,14 +211,29 @@ func (b *httpBatch) Send() (err error) { headers[k] = v } res, err := b.conn.sendStreamQuery(b.ctx, r, &options, headers) + if res == nil { + return err + } - if res != nil { - defer res.Body.Close() + defer res.Body.Close() + + if err != nil { // we don't care about result, so just discard it to reuse connection _, _ = io.Copy(io.Discard, res.Body) + + return err + } + + msg, err := b.conn.readRawResponse(res) + if err != nil { + return err } - return err + if err = checkDBException(msg); err != nil { + return err + } + + return nil } func (b *httpBatch) Rows() int { diff --git a/conn_http_exec.go b/conn_http_exec.go index 75198eb1b2..1d9322d28b 100644 --- a/conn_http_exec.go +++ b/conn_http_exec.go @@ -30,11 +30,27 @@ func (h *httpConnect) exec(ctx context.Context, query string, args ...any) error } res, err := h.sendQuery(ctx, query, &options, h.headers) - if res != nil { - defer res.Body.Close() + if res == nil { + return err + } + + defer res.Body.Close() + + if err != nil { // we don't care about result, so just discard it to reuse connection _, _ = io.Copy(io.Discard, res.Body) + + return err + } + + msg, err := h.readRawResponse(res) + if err != nil { + return err + } + + if err = checkDBException(msg); err != nil { + return err } - return err + return nil }