Skip to content

Commit

Permalink
create destination & improvements (#17)
Browse files Browse the repository at this point in the history
* feat: reads messages from HTTP server

* make it generic

* add more params

* fix linter

* parse and set headers

* set params

* RawData instead of json

* http destination & improvements

* linter fix

* update & fix linter

* fix typo

* address reviews

* go mod tidy

* add documentations

* update readme

* fix bug if param already exists in the URL

* address review

* Update README.md

Co-authored-by: Raúl Barroso <ra.barroso@gmail.com>

* update readme

---------

Co-authored-by: Raúl Barroso <ra.barroso@gmail.com>
  • Loading branch information
maha-hajja and raulb authored Apr 4, 2024
1 parent bb80a3c commit 6408bcc
Show file tree
Hide file tree
Showing 15 changed files with 787 additions and 96 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ jobs:
go-version-file: 'go.mod'

- name: Test
run: make test-integration GOTEST_FLAGS="-v -count=1"
run: make test GOTEST_FLAGS="-v -count=1"
7 changes: 0 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@ build:
test:
go test $(GOTEST_FLAGS) -race ./...

test-integration:
# run required docker containers, execute integration tests, stop containers after tests
docker compose -f test/docker-compose.yml up -d
go test $(GOTEST_FLAGS) -v -race ./...; ret=$$?; \
docker compose -f test/docker-compose.yml down; \
exit $$ret

generate:
go generate ./...

Expand Down
48 changes: 28 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,36 +1,44 @@
# Conduit Connector for <resource>
[Conduit](https://conduit.io) for <resource>.
The HTTP connector is a [Conduit](https://github.com/ConduitIO/conduit) plugin. It provides both, a source
and a destination HTTP connectors.

## How to build?
Run `make build` to build the connector.
Run `make build` to build the connector's binary.

## Testing
Run `make test` to run all the unit tests. Run `make test-integration` to run the integration tests.

The Docker compose file at `test/docker-compose.yml` can be used to run the required resource locally.
Run `make test` to run all the unit tests.

## Source
A source connector pulls data from an external resource and pushes it to downstream resources via Conduit.
The HTTP source connector pulls data from the HTTP URL every `pollingPeriod`, the source adds the `params` and `headers`
to the request, and sends it to the URL with the specified `method` from the `Configuration`. The returned data is
used to create an openCDC record and return it.

Note: when using the `OPTIONS` method, the resulted options will be added to the record's metadata.

### Configuration

| name | description | required | default value |
|-----------------------|---------------------------------------|----------|---------------|
| `source_config_param` | Description of `source_config_param`. | true | 1000 |
| name | description | required | default value |
|-----------------|-------------------------------------------------------------------------------------|----------|---------------|
| `url` | Http URL to send requests to. | true | |
| `method` | Http method to use in the request, supported methods are (`GET`,`HEAD`,`OPTIONS`). | false | `GET` |
| `headers` | Http headers to use in the request, comma separated list of `:` separated pairs. | false | |
| `params` | parameters to use in the request, comma separated list of `:` separated pairs. | false | |
| `pollingperiod` | how often the connector will get data from the url, formatted as a `time.Duration`. | false | "5m" |

## Destination
A destination connector pushes data from upstream resources to an external resource via Conduit.
The HTTP destination connector pushes data from upstream resources to an HTTP URL via Conduit. the destination adds the
`params` and `headers` to the request, and sends it to the URL with the specified `method` from the `Configuration`.

### Configuration
Note: The request `Body` that will be sent is the value under `record.Payload.After`, if you want to change the format
of that or manipulate the field in any way, please check our [Builtin Processors Docs](https://conduit.io/docs/processors/builtin/)
, or check [Standalone Processors Docs](https://conduit.io/docs/processors/standalone/) if you'd like to build your own processor .

| name | description | required | default value |
|----------------------------|--------------------------------------------|----------|---------------|
| `destination_config_param` | Description of `destination_config_param`. | true | 1000 |
### Configuration

## Known Issues & Limitations
* Known issue A
* Limitation A
| name | description | required | default value |
|-----------|-------------------------------------------------------------------------------------------|------------|---------------|
| `url` | Http URL to send requests to. | true | |
| `method` | Http method to use in the request, supported methods are (`POST`,`PUT`,`DELETE`,`PATCH`). | false | `POST` |
| `headers` | Http headers to use in the request, comma separated list of : separated pairs. | false | |
| `params` | parameters to use in the request, comma separated list of : separated pairs. | false | |

## Planned work
- [ ] Item A
- [ ] Item B
75 changes: 75 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package http

import (
"fmt"
"net/http"
"net/url"
"strings"
)

type Config struct {
// Http url to send requests to
URL string `json:"url" validate:"required"`
// Http headers to use in the request, comma separated list of : separated pairs
Headers []string
// parameters to use in the request, comma separated list of : separated pairs
Params []string
}

func (s Config) addParamsToURL() (string, error) {
parsedURL, err := url.Parse(s.URL)
if err != nil {
return s.URL, fmt.Errorf("error parsing URL: %w", err)
}
// Parse existing query parameters
existingParams := parsedURL.Query()
for _, param := range s.Params {
keyValue := strings.Split(param, ":")
if len(keyValue) != 2 {
return s.URL, fmt.Errorf("invalid %q format", "params")
}
key := keyValue[0]
value := keyValue[1]
existingParams.Add(key, value)
}
// Update query parameters in the URL struct
parsedURL.RawQuery = existingParams.Encode()

return parsedURL.String(), nil
}

func (s Config) getHeader() (http.Header, error) {
// create a new empty header
header := http.Header{}

// iterate over the pairs and add them to the header
for _, pair := range s.Headers {
// split each pair into key and value
parts := strings.SplitN(strings.TrimSpace(pair), ":", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("invalid headers value: %s", pair)
}

// trim any spaces from the key and value
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])

// Add to header
header.Add(key, value)
}
return header, nil
}
73 changes: 73 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package http

import (
"net/http"
"testing"

"github.com/matryer/is"
)

func TestConfig_URL(t *testing.T) {
is := is.New(t)
config := Config{
URL: "http://localhost:8082/resource",
Params: []string{"name:resource1", "id:1"},
}
want := "http://localhost:8082/resource?id=1&name=resource1"
got, _ := config.addParamsToURL()
is.True(got == want)
}

func TestConfig_URLParams(t *testing.T) {
is := is.New(t)
config := Config{
// url already has a parameter
URL: "http://localhost:8082/resource?name=resource1",
Params: []string{"id:1"},
}
want := "http://localhost:8082/resource?id=1&name=resource1"
got, err := config.addParamsToURL()
is.NoErr(err)
is.True(got == want)
}

func TestConfig_EmptyParams(t *testing.T) {
is := is.New(t)
config := Config{
URL: "http://localhost:8082/resource?",
Params: []string{"name:resource1", "id:1"},
}
want := "http://localhost:8082/resource?id=1&name=resource1"
got, err := config.addParamsToURL()
is.NoErr(err)
is.True(got == want)
}

func TestConfig_Headers(t *testing.T) {
is := is.New(t)
config := Config{
URL: "http://localhost:8082/resource",
Headers: []string{"header1:val1", "header2:val2"},
}
want := http.Header{}
want.Add("header1", "val1")
want.Add("header2", "val2")
got, err := config.getHeader()
is.NoErr(err)
is.True(got.Get("header1") == want.Get("header1"))
is.True(got.Get("header2") == want.Get("header2"))
}
2 changes: 1 addition & 1 deletion connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ import sdk "github.com/conduitio/conduit-connector-sdk"
var Connector = sdk.Connector{
NewSpecification: Specification,
NewSource: NewSource,
NewDestination: nil,
NewDestination: NewDestination,
}
139 changes: 139 additions & 0 deletions destination.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package http

//go:generate paramgen -output=paramgen_dest.go DestinationConfig

import (
"bytes"
"context"
"fmt"
"io"
"net/http"

sdk "github.com/conduitio/conduit-connector-sdk"
)

type Destination struct {
sdk.UnimplementedDestination

config DestinationConfig
client *http.Client
header http.Header
}

type DestinationConfig struct {
Config

// Http method to use in the request
Method string `default:"POST" validate:"inclusion=POST|PUT|DELETE|PATCH"`
}

func NewDestination() sdk.Destination {
return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...)
}

func (d *Destination) Parameters() map[string]sdk.Parameter {
return d.config.Parameters()
}

func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error {
sdk.Logger(ctx).Info().Msg("Configuring Destination...")
var config DestinationConfig
err := sdk.Util.ParseConfig(cfg, &config)
if err != nil {
return fmt.Errorf("invalid config: %w", err)
}

d.config.URL, err = d.config.addParamsToURL()
if err != nil {
return err
}
d.header, err = config.Config.getHeader()
if err != nil {
return fmt.Errorf("invalid header config: %w", err)
}
d.config = config
return nil
}

func (d *Destination) Open(ctx context.Context) error {
// create client
d.client = &http.Client{}

// check connection
req, err := http.NewRequestWithContext(ctx, http.MethodHead, d.config.URL, nil)
if err != nil {
return fmt.Errorf("error creating HTTP request %q: %w", d.config.URL, err)
}
req.Header = d.header
resp, err := d.client.Do(req)
if err != nil {
return fmt.Errorf("error pinging URL %q: %w", d.config.URL, err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusUnauthorized {
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}
return fmt.Errorf("authorization failed, %s: %s", http.StatusText(http.StatusUnauthorized), string(body))
}

return nil
}

func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, error) {
for i, rec := range records {
err := d.sendRequest(ctx, rec)
if err != nil {
return i, err
}
}
return 0, nil
}

func (d *Destination) sendRequest(ctx context.Context, record sdk.Record) error {
var body io.Reader
if record.Payload.After != nil {
body = bytes.NewReader(record.Payload.After.Bytes())
}

// create request
req, err := http.NewRequestWithContext(ctx, d.config.Method, d.config.URL, body)
if err != nil {
return fmt.Errorf("error creating HTTP %s request: %w", d.config.Method, err)
}
req.Header = d.header

// get response
resp, err := d.client.Do(req)
if err != nil {
return fmt.Errorf("error getting data from URL: %w", err)
}
defer resp.Body.Close()
// check if response status is an error code
if resp.StatusCode >= 400 {
return fmt.Errorf("got an unexpected response status of %q", resp.Status)
}
return nil
}

func (d *Destination) Teardown(ctx context.Context) error {
if d.client != nil {
d.client.CloseIdleConnections()
}
return nil
}
Loading

0 comments on commit 6408bcc

Please sign in to comment.