Skip to content

Commit

Permalink
add mTLS (#12)
Browse files Browse the repository at this point in the history
* add mTLS

* fix generated files

* Update paramgen_dest.go

* address reviews

* go mod tidy

* address reviews 2
  • Loading branch information
maha-hajja authored Jun 26, 2023
1 parent 8c3c5b4 commit 7bc051e
Show file tree
Hide file tree
Showing 15 changed files with 490 additions and 41 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: build test test-integration generate install-paramgen proto-generate
.PHONY: build test test-integration generate install-paramgen proto-generate generate-certs

VERSION=$(shell git describe --tags --dirty --always)

Expand All @@ -25,3 +25,6 @@ install-tools: download
@echo Installing tools from tools.go
@go list -f '{{ join .Imports "\n" }}' tools.go | xargs -tI % go install %
@go mod tidy

generate-certs:
sh test/generate-certs.sh
22 changes: 18 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,24 @@ server, then waits for acknowledgments to be received from the server through th

### Configuration

| name | description | required | default value |
|-------------|------------------------------------------------------------------------|----------|---------------|
| `url` | url to gRPC server. | true | |
| `rateLimit` | the bandwidth limit in bytes/second, use "0" to disable rate limiting. | false | 0 |
| name | description | required | default value |
|------------------------|----------------------------------------------------------------------------------------|----------------------------------------|---------------|
| `url` | url to gRPC server. | true | |
| `rateLimit` | the bandwidth limit in bytes/second, use `0` to disable rate limiting. | false | `0` |
| `mtls.disabled` | option to disable mTLS secure connection, set it to `true` for an insecure connection. | false | `false` |
| `mtls.client.certPath` | the client certificate path. | required if `mtls.disabled` is `false` | |
| `mtls.client.keyPath` | the client private key path. | required if `mtls.disabled` is `false` | |
| `mtls.ca.certPath` | the root CA certificate path. | required if `mtls.disabled` is `false` | |

## Mutual TLS (mTLS)
Mutual TLS is used by default to connect to the server, to disable mTLS you can set the parameter `mtls.disabled`
to `true`, this will result in an insecure connection to the server.

This repo contains self-signed certificates that can be used for local testing purposes, you can find them
under `./test/certs`, note that these certificates are not meant to be used in production environment.

To generate your own secure mTLS certificates, check
[this tutorial](https://medium.com/weekly-webtips/how-to-generate-keys-for-mutual-tls-authentication-a90f53bcec64).

## Planned work
- Add a source for gRPC client.
85 changes: 85 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpcclient

import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"os"

sdk "github.com/conduitio/conduit-connector-sdk"
"go.uber.org/multierr"
)

// Config has the generic parameters needed for a gRPC client
type Config struct {
// url to gRPC server
URL string `json:"url" validate:"required"`
// the bandwidth limit in bytes/second, use "0" to disable rate limiting.
RateLimit int `json:"rateLimit" default:"0" validate:"gt=-1"`
// mTLS configurations.
MTLS MTLSConfig `json:"mtls"`
}

type MTLSConfig struct {
// the client certificate path.
ClientCertPath string `json:"client.certPath"`
// the client private key path.
ClientKeyPath string `json:"client.keyPath"`
// the root CA certificate path.
CACertPath string `json:"ca.certPath"`
// option to disable mTLS secure connection, set it to `true` for an insecure connection.
Disabled bool `json:"disabled" default:"false"`
}

// ParseMTLSFiles parses and validates mTLS params values, returns the parsed client certificate, and CA certificate pool,
// and an error if the parsing fails
func (mc *MTLSConfig) ParseMTLSFiles() (tls.Certificate, *x509.CertPool, error) {
err := mc.validateRequiredMTLSParams()
if err != nil {
return tls.Certificate{}, nil, fmt.Errorf("error validating \"mtls\": mTLS security is enabled and some"+
" configurations are missing, if you wish to disable mTLS, set the config option \"mtls.disabled\" to true: %w", err)
}
clientCert, err := tls.LoadX509KeyPair(mc.ClientCertPath, mc.ClientKeyPath)
if err != nil {
return tls.Certificate{}, nil, fmt.Errorf("failed to load client key pair: %w", err)
}
// Load CA certificate
caCert, err := os.ReadFile(mc.CACertPath)
if err != nil {
return tls.Certificate{}, nil, fmt.Errorf("failed to read CA certificate: %w", err)
}
caCertPool := x509.NewCertPool()
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
return tls.Certificate{}, nil, errors.New("failed to append CA certs")
}
return clientCert, caCertPool, nil
}

func (mc *MTLSConfig) validateRequiredMTLSParams() error {
var multiErr error
if mc.CACertPath == "" {
multiErr = multierr.Append(multiErr, fmt.Errorf("error validating \"mtls.ca.certPath\": %w", sdk.ErrRequiredParameterMissing))
}
if mc.ClientCertPath == "" {
multiErr = multierr.Append(multiErr, fmt.Errorf("error validating \"mtls.client.certPath\": %w", sdk.ErrRequiredParameterMissing))
}
if mc.ClientKeyPath == "" {
multiErr = multierr.Append(multiErr, fmt.Errorf("error validating \"mtls.client.keyPath\": %w", sdk.ErrRequiredParameterMissing))
}
return multiErr
}
82 changes: 82 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpcclient

import (
"testing"
)

func TestConfig_ParseMTLSFiles(t *testing.T) {
testCases := []struct {
name string
config MTLSConfig
wantErr bool
}{
{
name: "valid paths",
config: MTLSConfig{
ClientCertPath: "./test/certs/client.crt",
ClientKeyPath: "./test/certs/client.key",
CACertPath: "./test/certs/ca.crt",
},
wantErr: false,
},
{
name: "empty values",
config: MTLSConfig{
ClientCertPath: "",
ClientKeyPath: "",
CACertPath: "",
},
wantErr: true,
},
{
name: "invalid paths",
config: MTLSConfig{
ClientCertPath: "not a file",
ClientKeyPath: "not a file",
CACertPath: "not a file",
},
wantErr: true,
},
{
name: "switched files",
config: MTLSConfig{
ClientCertPath: "./test/certs/client.key", // switched with client crt
ClientKeyPath: "./test/certs/client.crt",
CACertPath: "./test/certs/ca.crt",
},
wantErr: true,
},
{
name: "wrong CA cert path",
config: MTLSConfig{
ClientCertPath: "./test/certs/client.crt",
ClientKeyPath: "./test/certs/client.key",
CACertPath: "./test/certs/ca.key", // key instead of crt, should fail
},
wantErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, _, err := tc.config.ParseMTLSFiles()
if (err != nil) != tc.wantErr {
t.Errorf("ParseMTLSFiles() error = %v, wantErr = %v", err, tc.wantErr)
}
})
}
}
42 changes: 33 additions & 9 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,45 @@

package grpcclient

//go:generate paramgen -output=paramgen_dest.go Config
//go:generate paramgen -output=paramgen_dest.go DestConfig

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net"
"time"

pb "github.com/conduitio-labs/conduit-connector-grpc-client/proto/v1"
"github.com/conduitio-labs/conduit-connector-grpc-client/toproto"
"github.com/conduitio/bwlimit"
"github.com/conduitio/bwlimit/bwgrpc"
sdk "github.com/conduitio/conduit-connector-sdk"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

type Destination struct {
sdk.UnimplementedDestination

config Config
config DestConfig
conn *grpc.ClientConn
stream pb.SourceService_StreamClient

// mTLS
clientCert tls.Certificate
caCertPool *x509.CertPool

// for testing: always empty, unless it's a test
dialer func(ctx context.Context, _ string) (net.Conn, error)
}

type Config struct {
// url to gRPC server
URL string `json:"url" validate:"required"`
// the bandwidth limit in bytes/second, use "0" to disable rate limiting.
RateLimit int `json:"rateLimit" default:"0" validate:"gt=-1"`
type DestConfig struct {
Config
}

// NewDestinationWithDialer for testing purposes.
Expand All @@ -68,20 +74,38 @@ func (d *Destination) Configure(ctx context.Context, cfg map[string]string) erro
if err != nil {
return fmt.Errorf("invalid config: %w", err)
}
if !d.config.MTLS.Disabled {
d.clientCert, d.caCertPool, err = d.config.MTLS.ParseMTLSFiles()
if err != nil {
return err
}
}
return nil
}

func (d *Destination) Open(ctx context.Context) error {
dialOptions := []grpc.DialOption{
grpc.WithContextDialer(d.dialer),
grpc.WithInsecure(), //nolint:staticcheck // todo: will use mTLS with connection
grpc.WithBlock(),
}
if d.config.RateLimit > 0 {
dialOptions = append(dialOptions,
bwgrpc.WithBandwidthLimitedContextDialer(bwlimit.Byte(d.config.RateLimit), bwlimit.Byte(d.config.RateLimit), d.dialer))
}
conn, err := grpc.DialContext(ctx,
if !d.config.MTLS.Disabled {
// create TLS credentials with mTLS configuration
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{d.clientCert},
RootCAs: d.caCertPool,
MinVersion: tls.VersionTLS13,
})
dialOptions = append(dialOptions, grpc.WithTransportCredentials(creds))
} else {
dialOptions = append(dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
ctxTimeout, cancel := context.WithTimeout(ctx, time.Minute) // time.Minute is temporary until backoff retries is merged
defer cancel()
conn, err := grpc.DialContext(ctxTimeout,
d.config.URL,
dialOptions...,
)
Expand Down
Loading

0 comments on commit 7bc051e

Please sign in to comment.