Skip to content

Commit

Permalink
limit bandwidth + assert records in test (#4)
Browse files Browse the repository at this point in the history
* limit bandwidth + assert records in test

* rename var

* fix comment

* optional rate limiting

* dialOptions
  • Loading branch information
maha-hajja authored May 17, 2023
1 parent 9da8a46 commit b7d5b6b
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 14 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ server, then waits for acknowledgments to be received from the server through th

### Configuration

| name | description | required | default value |
|----------------------|--------------------------------------------|----------|---------------|
| `url` | url to gRPC server. | true | |
| name | description | required | default value |
|-------------|------------------------------------------------------------------------|----------|---------------|
| `url` | url to gRPC server. | true | |
| `rateLimit` | the bandwidth limit in bytes/second, use "0" to disable rate limiting. | false | 0 |

## Planned work
- Add a source for gRPC client.
17 changes: 14 additions & 3 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (

pb "github.com/conduitio-labs/conduit-connector-grpc-client/proto/v1"
"github.com/conduitio-labs/conduit-connector-grpc-client/toproto"
"github.com/conduitio/bwlimit"
"github.com/conduitio/bwlimit/bwgrpc"
sdk "github.com/conduitio/conduit-connector-sdk"
"google.golang.org/grpc"
)
Expand All @@ -43,6 +45,8 @@ type Destination struct {
type Config struct {
// url to gRPC server
URL string `json:"url" validate:"required"`
// the bandwidth limit in bytes/second, use "0" to disable rate limiting.
RateLimit int `json:"rateLimit" default:"0" validate:"gt=-1"`
}

// NewDestinationWithDialer for testing purposes.
Expand All @@ -51,7 +55,7 @@ func NewDestinationWithDialer(dialer func(ctx context.Context, _ string) (net.Co
}

func NewDestination() sdk.Destination {
return sdk.DestinationWithMiddleware(&Destination{})
return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...)
}

func (d *Destination) Parameters() map[string]sdk.Parameter {
Expand All @@ -68,11 +72,18 @@ func (d *Destination) Configure(ctx context.Context, cfg map[string]string) erro
}

func (d *Destination) Open(ctx context.Context) error {
conn, err := grpc.DialContext(ctx,
d.config.URL,
dialOptions := []grpc.DialOption{
grpc.WithContextDialer(d.dialer),
grpc.WithInsecure(), //nolint:staticcheck // todo: will use mTLS with connection
grpc.WithBlock(),
}
if d.config.RateLimit > 0 {
dialOptions = append(dialOptions,
bwgrpc.WithBandwidthLimitedContextDialer(bwlimit.Byte(d.config.RateLimit), bwlimit.Byte(d.config.RateLimit), d.dialer))
}
conn, err := grpc.DialContext(ctx,
d.config.URL,
dialOptions...,
)
if err != nil {
return fmt.Errorf("failed to dial server: %w", err)
Expand Down
28 changes: 20 additions & 8 deletions destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
package grpcclient

import (
"bytes"
"context"
"fmt"
"io"
"log"
"net"
"sync"
"testing"

"github.com/conduitio-labs/conduit-connector-grpc-client/fromproto"
pb "github.com/conduitio-labs/conduit-connector-grpc-client/proto/v1"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/golang/mock/gomock"
Expand All @@ -39,8 +42,6 @@ func TestTeardown_NoOpen(t *testing.T) {

func TestWrite_Success(t *testing.T) {
is := is.New(t)
dest, ctx := prepareServerAndDestination(t)

records := []sdk.Record{
{
Position: sdk.Position("foo"),
Expand All @@ -61,12 +62,13 @@ func TestWrite_Success(t *testing.T) {
},
},
}
dest, ctx := prepareServerAndDestination(t, records)
n, err := dest.Write(ctx, records)
is.NoErr(err)
is.Equal(n, 2)
}

func prepareServerAndDestination(t *testing.T) (sdk.Destination, context.Context) {
func prepareServerAndDestination(t *testing.T, expected []sdk.Record) (sdk.Destination, context.Context) {
is := is.New(t)
// use in-memory connection
lis := bufconn.Listen(1024 * 1024)
Expand All @@ -75,12 +77,12 @@ func prepareServerAndDestination(t *testing.T) (sdk.Destination, context.Context
}

// prepare server
startTestServer(t, lis)
startTestServer(t, lis, expected)

// prepare destination (client)
ctx := context.Background()
dest := NewDestinationWithDialer(dialer)
err := dest.Configure(ctx, map[string]string{"url": "bufnet"})
err := dest.Configure(ctx, map[string]string{"url": "bufnet", "rateLimit": "10000"})
is.NoErr(err)
err = dest.Open(ctx)
is.NoErr(err)
Expand All @@ -94,7 +96,7 @@ func prepareServerAndDestination(t *testing.T) (sdk.Destination, context.Context
return dest, ctx
}

func startTestServer(t *testing.T, lis net.Listener) {
func startTestServer(t *testing.T, lis net.Listener, expected []sdk.Record) {
ctrl := gomock.NewController(t)
srv := grpc.NewServer()

Expand All @@ -104,18 +106,28 @@ func startTestServer(t *testing.T, lis net.Listener) {
Stream(gomock.Any()).
DoAndReturn(
func(stream pb.StreamService_StreamServer) error {
i := 0
for {
// read from the stream to simulate receiving data from the client
req, err := stream.Recv()
rec, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// convert the proto record to sdk.Record to compare with expected records
sdkRec, err := fromproto.Record(rec)
if err != nil {
return err
}
if !bytes.Equal(sdkRec.Bytes(), expected[i].Bytes()) {
return fmt.Errorf("received record doesn't match the expected record")
}
i++

// Write to the stream to simulate sending data to the client
resp := &pb.Ack{AckPosition: req.Position}
resp := &pb.Ack{AckPosition: rec.Position}
if err := stream.Send(resp); err != nil {
return err
}
Expand Down
87 changes: 87 additions & 0 deletions fromproto/record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fromproto

import (
"errors"
"fmt"

opencdcv1 "github.com/conduitio/conduit-connector-protocol/proto/opencdc/v1"
sdk "github.com/conduitio/conduit-connector-sdk"
)

func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
var cTypes [1]struct{}
_ = cTypes[int(sdk.OperationCreate)-int(opencdcv1.Operation_OPERATION_CREATE)]
_ = cTypes[int(sdk.OperationUpdate)-int(opencdcv1.Operation_OPERATION_UPDATE)]
_ = cTypes[int(sdk.OperationDelete)-int(opencdcv1.Operation_OPERATION_DELETE)]
_ = cTypes[int(sdk.OperationSnapshot)-int(opencdcv1.Operation_OPERATION_SNAPSHOT)]
}

func Record(record *opencdcv1.Record) (sdk.Record, error) {
key, err := Data(record.Key)
if err != nil {
return sdk.Record{}, fmt.Errorf("error converting key: %w", err)
}

payload, err := Change(record.Payload)
if err != nil {
return sdk.Record{}, fmt.Errorf("error converting payload: %w", err)
}

out := sdk.Record{
Position: record.Position,
Operation: sdk.Operation(record.Operation),
Metadata: record.Metadata,
Key: key,
Payload: payload,
}
return out, nil
}

func Change(in *opencdcv1.Change) (sdk.Change, error) {
before, err := Data(in.Before)
if err != nil {
return sdk.Change{}, fmt.Errorf("error converting before: %w", err)
}

after, err := Data(in.After)
if err != nil {
return sdk.Change{}, fmt.Errorf("error converting after: %w", err)
}

out := sdk.Change{
Before: before,
After: after,
}
return out, nil
}

func Data(in *opencdcv1.Data) (sdk.Data, error) {
d := in.GetData()
if d == nil {
return nil, nil
}

switch v := d.(type) {
case *opencdcv1.Data_RawData:
return sdk.RawData(v.RawData), nil
case *opencdcv1.Data_StructuredData:
return sdk.StructuredData(v.StructuredData.AsMap()), nil
default:
return nil, errors.New("invalid Data type")
}
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module github.com/conduitio-labs/conduit-connector-grpc-client
go 1.20

require (
github.com/conduitio/bwlimit v0.1.0
github.com/conduitio/bwlimit/bwgrpc v0.1.0
github.com/conduitio/conduit-connector-protocol v0.5.0
github.com/conduitio/conduit-connector-sdk v0.6.0
github.com/golang/mock v1.6.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ github.com/Masterminds/sprig/v3 v3.2.3 h1:eL2fZNezLomi0uOLqjQoN6BfsDD+fyLtgbJMAj
github.com/Masterminds/sprig/v3 v3.2.3/go.mod h1:rXcFaZ2zZbLRJv/xSysmlgIM1u11eBaRMhvYXJNkGuM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/conduitio/bwlimit v0.1.0 h1:x3ijON0TSghQob4tFKaEvKixFmYKfVJQeSpXluC2JvE=
github.com/conduitio/bwlimit v0.1.0/go.mod h1:E+ASZ1/5L33MTb8hJTERs5Xnmh6Ulq3jbRh7LrdbXWU=
github.com/conduitio/bwlimit/bwgrpc v0.1.0 h1:IjFRBHhoTeaA/EjFsYxBZpFug/m0BtNithfoal/y1lM=
github.com/conduitio/bwlimit/bwgrpc v0.1.0/go.mod h1:BHYGkfpXHviTga2+8AnfzjAd6gt1iq5S75boQ1og++4=
github.com/conduitio/conduit-connector-protocol v0.5.0 h1:Rr2SsDAvWDryQArvonwPoXBELQA2wRXr49xBLrAtBaM=
github.com/conduitio/conduit-connector-protocol v0.5.0/go.mod h1:UIhHWxq52hvwwbkvQDaRgZRHfbpDDmU7tZaw0mwLdd4=
github.com/conduitio/conduit-connector-sdk v0.6.0 h1:WK9Pts2j3Y6xInTAz7WccEwwt2eGQWweTxJoITTURTY=
Expand Down
8 changes: 8 additions & 0 deletions paramgen_dest.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b7d5b6b

Please sign in to comment.