Skip to content

Commit

Permalink
GT-575 Transparent compression of requests and responses (#598)
Browse files Browse the repository at this point in the history
  • Loading branch information
jwierzbo authored Mar 28, 2024
1 parent f2af292 commit d8d4036
Show file tree
Hide file tree
Showing 12 changed files with 371 additions and 14 deletions.
10 changes: 10 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ jobs:
test-to-run:
type: string
default: "run-tests-single"
enable-extra-db-features:
type: boolean
default: false
steps:
- checkout
- run:
Expand All @@ -71,6 +74,7 @@ jobs:
GOIMAGE: << pipeline.parameters.goImage >>
ALPINE_IMAGE: << pipeline.parameters.alpineImage >>
STARTER: << pipeline.parameters.starterImage >>
ENABLE_DATABASE_EXTRA_FEATURES: << parameters.enable-extra-db-features >>
TEST_DISALLOW_UNKNOWN_FIELDS: false
VERBOSE: 1

Expand Down Expand Up @@ -123,6 +127,12 @@ workflows:
requires:
- download-demo-data
test-to-run: run-v2-tests-cluster
- run-integration-tests:
name: Test V2 cluster - DB extra features (compression)
requires:
- download-demo-data
test-to-run: run-v2-tests-cluster
enable-extra-db-features: true

- run-integration-tests:
name: Test V1 single
Expand Down
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ __test_go_test:
-e TEST_ENABLE_SHUTDOWN=$(TEST_ENABLE_SHUTDOWN) \
-e TEST_REQUEST_LOG=$(TEST_REQUEST_LOG) \
-e TEST_DISALLOW_UNKNOWN_FIELDS=$(TEST_DISALLOW_UNKNOWN_FIELDS) \
-e ENABLE_DATABASE_EXTRA_FEATURES=$(ENABLE_DATABASE_EXTRA_FEATURES) \
-e GODEBUG=tls13=1 \
-e CGO_ENABLED=$(CGO_ENABLED) \
-w /usr/code/ \
Expand Down Expand Up @@ -441,6 +442,7 @@ __test_v2_go_test:
-e TEST_BACKUP_REMOTE_CONFIG='$(TEST_BACKUP_REMOTE_CONFIG)' \
-e TEST_DEBUG='$(TEST_DEBUG)' \
-e TEST_ENABLE_SHUTDOWN=$(TEST_ENABLE_SHUTDOWN) \
-e ENABLE_DATABASE_EXTRA_FEATURES=$(ENABLE_DATABASE_EXTRA_FEATURES) \
-e GODEBUG=tls13=1 \
-e CGO_ENABLED=$(CGO_ENABLED) \
-w /usr/code/v2/ \
Expand Down Expand Up @@ -475,7 +477,9 @@ ifdef JWTSECRET
echo "$JWTSECRET" > "${JWTSECRETFILE}"
endif
@-docker rm -f -v $(TESTCONTAINER) &> /dev/null
@TESTCONTAINER=$(TESTCONTAINER) ARANGODB=$(ARANGODB) ALPINE_IMAGE=$(ALPINE_IMAGE) ENABLE_BACKUP=$(ENABLE_BACKUP) ARANGO_LICENSE_KEY=$(ARANGO_LICENSE_KEY) STARTER=$(STARTER) STARTERMODE=$(TEST_MODE) TMPDIR="${TMPDIR}" DEBUG_PORT=$(DEBUG_PORT) $(CLUSTERENV) "${ROOTDIR}/test/cluster.sh" start
@TESTCONTAINER=$(TESTCONTAINER) ARANGODB=$(ARANGODB) ALPINE_IMAGE=$(ALPINE_IMAGE) ENABLE_BACKUP=$(ENABLE_BACKUP) \
ARANGO_LICENSE_KEY=$(ARANGO_LICENSE_KEY) STARTER=$(STARTER) STARTERMODE=$(TEST_MODE) TMPDIR="${TMPDIR}" \
ENABLE_DATABASE_EXTRA_FEATURES=$(ENABLE_DATABASE_EXTRA_FEATURES) DEBUG_PORT=$(DEBUG_PORT) $(CLUSTERENV) "${ROOTDIR}/test/cluster.sh" start
endif

__test_cleanup:
Expand Down
4 changes: 2 additions & 2 deletions test/cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ if [ "$CMD" == "start" ]; then
if [ -n "$ENABLE_BACKUP" ]; then
STARTERARGS="$STARTERARGS --all.backup.api-enabled=true"
fi
if [ -n "$ENABLE_DATABASE_EXTENDED_NAMES" ]; then
STARTERARGS="$STARTERARGS --all.database.extended-names-databases=true"
if [ -n "$ENABLE_DATABASE_EXTRA_FEATURES" ]; then
STARTERARGS="$STARTERARGS --all.database.extended-names-databases=true --args.all.http.compress-response-threshold=1 --args.all.http.handle-content-encoding-for-unauthenticated-requests=true"
fi
if [[ "$OSTYPE" == "darwin"* ]]; then
DOCKERPLATFORMARG="--platform linux/x86_64"
Expand Down
1 change: 1 addition & 0 deletions v2/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
- Backup API support
- Admin Cluster API support
- Set Licence API support
- Transparent compression of requests and responses (ArangoDBConfiguration.Compression)


## [2.0.3](https://github.com/arangodb/go-driver/tree/v2.0.3) (2023-10-31)
Expand Down
36 changes: 36 additions & 0 deletions v2/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ package connection
import (
"context"
"io"
"net/http"
)

type EncodingCodec interface {
}

type Wrapper func(c Connection) Connection

type Factory func() (Connection, error)
Expand All @@ -40,8 +44,38 @@ type ArangoDBConfiguration struct {

// DriverFlags configure additional flags for the `x-arango-driver` header
DriverFlags []string

// Compression is used to enable compression between client and server
Compression *CompressionConfig
}

// CompressionConfig is used to enable compression for the connection
type CompressionConfig struct {
// CompressionConfig is used to enable compression for the requests
CompressionType CompressionType

// ResponseCompressionEnabled is used to enable compression for the responses (requires server side adjustments)
ResponseCompressionEnabled bool

// RequestCompressionEnabled is used to enable compression for the requests
RequestCompressionEnabled bool

// RequestCompressionLevel - Sets the compression level between -1 and 9
// Default: 0 (NoCompression). For Reference see: https://pkg.go.dev/compress/flate#pkg-constants
RequestCompressionLevel int
}

type CompressionType string

const (

// RequestCompressionTypeGzip is used to enable gzip compression
RequestCompressionTypeGzip CompressionType = "gzip"

// RequestCompressionTypeDeflate is used to enable deflate compression
RequestCompressionTypeDeflate CompressionType = "deflate"
)

type Connection interface {
// NewRequest initializes Request object
NewRequest(method string, urls ...string) (Request, error)
Expand Down Expand Up @@ -111,4 +145,6 @@ type Response interface {
// Header gets the first value associated with the given key.
// If there are no values associated with the key, Get returns "".
Header(name string) string

RawResponse() *http.Response
}
122 changes: 122 additions & 0 deletions v2/connection/connection_compression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//

package connection

import (
"compress/zlib"
"fmt"
"io"

"github.com/arangodb/go-driver/v2/log"
)

type Compression interface {
ApplyRequestHeaders(r Request)
ApplyRequestCompression(r *httpRequest, rootWriter io.Writer) (io.WriteCloser, error)
}

func newCompression(config *CompressionConfig) Compression {
if config == nil {
return noCompression{}
} else if config.CompressionType == "gzip" {
return gzipCompression{config: config}
} else if config.CompressionType == "deflate" {
return deflateCompression{config: config}
} else {
log.Error(fmt.Errorf("unknown compression type: %s", config.CompressionType), "")
return noCompression{config: config}
}
}

type gzipCompression struct {
config *CompressionConfig
}

func (g gzipCompression) ApplyRequestHeaders(r Request) {
if g.config != nil && g.config.ResponseCompressionEnabled {
if g.config.CompressionType == "gzip" {
r.AddHeader("Accept-Encoding", "gzip")
}
}
}

func (g gzipCompression) ApplyRequestCompression(r *httpRequest, rootWriter io.Writer) (io.WriteCloser, error) {
config := g.config

if config != nil && config.RequestCompressionEnabled {
if config.CompressionType == "deflate" {
r.headers["Content-Encoding"] = "deflate"

zlibWriter, err := zlib.NewWriterLevel(rootWriter, config.RequestCompressionLevel)
if err != nil {
log.Errorf(err, "error creating zlib writer")
return nil, err
}

return zlibWriter, nil
}
}

return nil, nil
}

type deflateCompression struct {
config *CompressionConfig
}

func (g deflateCompression) ApplyRequestHeaders(r Request) {
if g.config != nil && g.config.ResponseCompressionEnabled {
if g.config.CompressionType == "deflate" {
r.AddHeader("Accept-Encoding", "deflate")
}
}
}

func (g deflateCompression) ApplyRequestCompression(r *httpRequest, rootWriter io.Writer) (io.WriteCloser, error) {
config := g.config

if config != nil && config.RequestCompressionEnabled {
if config.CompressionType == "deflate" {
r.headers["Content-Encoding"] = "deflate"

zlibWriter, err := zlib.NewWriterLevel(rootWriter, config.RequestCompressionLevel)
if err != nil {
log.Errorf(err, "error creating zlib writer")
return nil, err
}

return zlibWriter, nil
}
}

return nil, nil
}

type noCompression struct {
config *CompressionConfig
}

func (g noCompression) ApplyRequestHeaders(r Request) {
}

func (g noCompression) ApplyRequestCompression(r *httpRequest, rootWriter io.Writer) (io.WriteCloser, error) {
return nil, nil
}
75 changes: 65 additions & 10 deletions v2/connection/connection_http_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package connection

import (
"bytes"
"compress/gzip"
"compress/zlib"
"context"
"crypto/tls"
"io"
Expand Down Expand Up @@ -248,7 +250,7 @@ func (j *httpConnection) stream(ctx context.Context, req *httpRequest) (*httpRes
ctx = context.Background()
}

reader := j.bodyReadFunc(j.Decoder(j.contentType), req.body, j.streamSender)
reader := j.bodyReadFunc(j.Decoder(j.contentType), req, j.streamSender)
r, err := req.asRequest(ctx, reader)
if err != nil {
return nil, nil, errors.WithStack(err)
Expand All @@ -263,9 +265,19 @@ func (j *httpConnection) stream(ctx context.Context, req *httpRequest) (*httpRes
log.Debugf("(%s) Response received: %d", id, resp.StatusCode)

if b := resp.Body; b != nil {
var body = resp.Body
var resultBody io.ReadCloser

respEncoding := resp.Header.Get("Content-Encoding")
switch respEncoding {
case "gzip":
resultBody, err = gzip.NewReader(resp.Body)
case "deflate":
resultBody, err = zlib.NewReader(resp.Body)
default:
resultBody = resp.Body
}

return &httpResponse{response: resp, request: req}, body, nil
return &httpResponse{response: resp, request: req}, resultBody, nil

}

Expand All @@ -289,8 +301,8 @@ func getDecoderByContentType(contentType string) Decoder {

type bodyReadFactory func() (io.Reader, error)

func (j *httpConnection) bodyReadFunc(decoder Decoder, obj interface{}, stream bool) bodyReadFactory {
if obj == nil {
func (j *httpConnection) bodyReadFunc(decoder Decoder, req *httpRequest, stream bool) bodyReadFactory {
if req.body == nil {
return func() (io.Reader, error) {
return nil, nil
}
Expand All @@ -299,21 +311,64 @@ func (j *httpConnection) bodyReadFunc(decoder Decoder, obj interface{}, stream b
if !stream {
return func() (io.Reader, error) {
b := bytes.NewBuffer([]byte{})
if err := decoder.Encode(b, obj); err != nil {
log.Errorf(err, "error encoding body - OBJ: %v", obj)
compressedWriter, err := newCompression(j.config.Compression).ApplyRequestCompression(req, b)
if err != nil {
log.Errorf(err, "error applying compression")
return nil, err
}

return b, nil
if compressedWriter != nil {
defer func(compressedWriter io.WriteCloser) {
errCompression := compressedWriter.Close()
if errCompression != nil {
log.Error(errCompression, "error closing compressed writer")
if err == nil {
err = errCompression
}
}
}(compressedWriter)

err = decoder.Encode(compressedWriter, req.body)
} else {
err = decoder.Encode(b, req.body)
}

if err != nil {
log.Errorf(err, "error encoding body - OBJ: %v", req.body)
return nil, err
}
return b, err
}
} else {
return func() (io.Reader, error) {
reader, writer := io.Pipe()

compressedWriter, err := newCompression(j.config.Compression).ApplyRequestCompression(req, writer)
if err != nil {
log.Errorf(err, "error applying compression")
return nil, err
}

go func() {
defer writer.Close()

if err := decoder.Encode(writer, obj); err != nil {
log.Errorf(err, "error encoding body (stream) - OBJ: %v", obj)
var encErr error
if compressedWriter != nil {
defer func(compressedWriter io.WriteCloser) {
errCompression := compressedWriter.Close()
if errCompression != nil {
log.Errorf(errCompression, "error closing compressed writer - stream")
writer.CloseWithError(err)
}
}(compressedWriter)

encErr = decoder.Encode(compressedWriter, req.body)
} else {
encErr = decoder.Encode(writer, req.body)
}

if encErr != nil {
log.Errorf(err, "error encoding body stream - OBJ: %v", req.body)
writer.CloseWithError(err)
}
}()
Expand Down
4 changes: 4 additions & 0 deletions v2/connection/connection_http_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,7 @@ func (j *httpResponse) Header(name string) string {
}
return j.response.Header.Get(name)
}

func (j *httpResponse) RawResponse() *http.Response {
return j.response
}
2 changes: 2 additions & 0 deletions v2/connection/modifiers.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ func applyArangoDBConfiguration(config ArangoDBConfiguration, ctx context.Contex
}
}

newCompression(config.Compression).ApplyRequestHeaders(r)

return nil
}
}
Loading

0 comments on commit d8d4036

Please sign in to comment.