Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

Commit

Permalink
add support AWS request signing for ES adaptor, fixes #159
Browse files Browse the repository at this point in the history
  • Loading branch information
jipperinbham committed Jan 23, 2017
1 parent 42a0122 commit 4ca2ce1
Show file tree
Hide file tree
Showing 471 changed files with 48,459 additions and 10,297 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ nodes:
### Bugfixes
- [#211](https://github.com/compose/transporter/pull/211): defer bulk channel init for mongo node reuse
- [#213](https://github.com/compose/transporter/pull/213): track mongodb \_id field so we can attempt to reissue queries
- [#233](https://github.com/compose/transporter/pull/233): update elasticsearch adaptor with better support
for multiple versions of elasticsearch as well as better performance with bulk indexing for most versions.
Addresses [#209](https://github.com/compose/transporter/issues/209), [#222](https://github.com/compose/transporter/issues/222)
and [#159](https://github.com/compose/transporter/issues/159).
## v0.1.1 [2015-08-27]
Expand Down
10 changes: 6 additions & 4 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ import:
version: ^0.1.1
subpackages:
- pkg/group
- package: github.com/Sirupsen/logrus
version: ^0.11.0
- package: gopkg.in/olivere/elastic.v5
version: ^5.0.13
- package: gopkg.in/olivere/elastic.v3
version: ^5.0.13
- package: gopkg.in/olivere/elastic.v2
version: ^5.0.18
- package: github.com/smartystreets/go-aws-auth
21 changes: 18 additions & 3 deletions pkg/adaptor/elasticsearch/README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,34 @@
# Elasticsearch adaptor

The [elasticsearch](https://www.elastic.co/) adaptor sends data to defined endpoints.
The [elasticsearch](https://www.elastic.co/) adaptor sends data to defined endpoints. List of
supported versions is below.

| Version | Note |
| --- | --- |
| 1.X | This version does not support bulk operations and will thus be much slower. |
| 2.X | Will only receive bug fixes, please consider upgrading. |
| 5.X | Most recent and supported version. |

***IMPORTANT***

It is currently not possible to overwrite the auto-generated `_id` field for elasticsearch but if you would like to retain
the originating `_id` from the source, you'll need to include a transform function as follows (assumes MongoDB source):
If you want to keep the source `_id` as the elasticsearch document `_id`, transporter will
automatically do this. If you wish to use the auto-generated `_id` field for elasticsearch but would
like to retain the originating `_id` from the source, you'll need to include a transform function
similar to the following (assumes MongoDB source):

```javascript
module.exports = function(msg) {
msg.data["mongo_id"] = msg.data._id['$oid']
msg.data = _.omit(msg.data, ["_id"]);
return msg;
}
```

***NOTE***
By using the elasticsearch auto-generated `_id`, it is not currently possible for transporter to
process update/delete operations. Future work is planned in [#39](https://github.com/compose/transporter/issues/39)
to address this problem.

### Configuration:
```yaml
- es:
Expand Down
43 changes: 24 additions & 19 deletions pkg/adaptor/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
- es:
type: elasticsearch
uri: https://username:password@hostname:port
timeout: 10s # optional, defaults to 30s
aws_access_key: XXX # optional, used for signing requests to AWS Elasticsearch service
aws_access_secret: XXX # optional, used for signing requests to AWS Elasticsearch service
`
)

Expand Down Expand Up @@ -69,13 +72,13 @@ func (e VersionError) Error() string {
}

// InvalidTimeoutError wraps the underlying error when the provided is not parsable time.ParseDuration
type InvalidTimeoutError struct {
timeout string
}

func (e InvalidTimeoutError) Error() string {
return fmt.Sprintf("Invalid Timeout, %s", e.timeout)
}
// type InvalidTimeoutError struct {
// timeout string
// }
//
// func (e InvalidTimeoutError) Error() string {
// return fmt.Sprintf("Invalid Timeout, %s", e.timeout)
// }

// Elasticsearch is an adaptor to connect a pipeline to
// an elasticsearch cluster.
Expand Down Expand Up @@ -191,15 +194,15 @@ func (e *Elasticsearch) setupClient(conf Config) error {
return VersionError{conf.URI, stringVersion, err.Error()}
}

httpClient := http.DefaultClient
if conf.Timeout != "" {
t, err := time.ParseDuration(conf.Timeout)
if err != nil {
return InvalidTimeoutError{conf.Timeout}
}
httpClient = &http.Client{
Timeout: t,
}
timeout, err := time.ParseDuration(conf.Timeout)
if err != nil {
log.Infof("failed to parse duration, %s, falling back to default timeout of 30s", conf.Timeout)
timeout = 30 * time.Second
}

httpClient := &http.Client{
Timeout: timeout,
Transport: newTransport(conf.AWSAccessKeyID, conf.AWSAccessSecret),
}

for _, vc := range clients.Clients {
Expand Down Expand Up @@ -251,7 +254,9 @@ func determineVersion(uri string) (string, error) {
// Config provides configuration options for an elasticsearch adaptor
// the notable difference between this and dbConfig is the presence of the Timeout option
type Config struct {
URI string `json:"uri" doc:"the uri to connect to, in the form mongodb://user:password@host.com:27017/auth_database"`
Namespace string `json:"namespace" doc:"mongo namespace to read/write"`
Timeout string `json:"timeout" doc:"timeout for establishing connection, format must be parsable by time.ParseDuration and defaults to 10s"`
URI string `json:"uri" doc:"the uri to connect to, in the form mongodb://user:password@host.com:27017/auth_database"`
Namespace string `json:"namespace" doc:"mongo namespace to read/write"`
Timeout string `json:"timeout" doc:"timeout for establishing connection, format must be parsable by time.ParseDuration and defaults to 10s"`
AWSAccessKeyID string `json:"aws_access_key" doc:"credentials for use with AWS Elasticsearch service"`
AWSAccessSecret string `json:"aws_access_secret" doc:"credentials for use with AWS Elasticsearch service"`
}
17 changes: 1 addition & 16 deletions pkg/adaptor/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ var errorTests = []struct {
"http://localhost:9200 running 0.9.2, its bad",
VersionError{"http://localhost:9200", "0.9.2", "its bad"},
},
{
"InvalidTimeoutError",
"Invalid Timeout, blah",
InvalidTimeoutError{"blah"},
},
}

func TestErrors(t *testing.T) {
Expand Down Expand Up @@ -92,7 +87,7 @@ var initTests = []struct {
},
{
"timeout config",
adaptor.Config{"uri": goodVersionServer.URL, "namespace": "test.test", "timeout": "30s"},
adaptor.Config{"uri": goodVersionServer.URL, "namespace": "test.test", "timeout": "60s"},
nil,
},
{
Expand Down Expand Up @@ -132,10 +127,6 @@ var unsupportedVersionServer = httptest.NewServer(http.HandlerFunc(func(w http.R
fmt.Fprint(w, "{\"version\":{\"number\":\"0.9.2\"}}")
}))

var badTimeoutServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "{\"version\":{\"number\":\"2.0.0\"}}")
}))

var badClientTests = []struct {
name string
cfg adaptor.Config
Expand Down Expand Up @@ -178,12 +169,6 @@ var badClientTests = []struct {
VersionError{unsupportedVersionServer.URL, "0.9.2", "unsupported client"},
func() { unsupportedVersionServer.Close() },
},
{
"bad timeout",
adaptor.Config{"uri": badTimeoutServer.URL, "namespace": "test.test", "timeout": "xyz"},
InvalidTimeoutError{"xyz"},
func() { badTimeoutServer.Close() },
},
}

func TestFailedClient(t *testing.T) {
Expand Down
33 changes: 33 additions & 0 deletions pkg/adaptor/elasticsearch/transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package elasticsearch

import (
"net/http"

awsauth "github.com/smartystreets/go-aws-auth"
)

// AWSTransport handles wrapping requests to AWS Elasticsearch service
type AWSTransport struct {
Credentials awsauth.Credentials
transport http.RoundTripper
}

func newTransport(accessKeyID, secretAccessKey string) http.RoundTripper {
t := http.DefaultTransport
if accessKeyID != "" && secretAccessKey != "" {
return &AWSTransport{
Credentials: awsauth.Credentials{
AccessKeyID: accessKeyID,
SecretAccessKey: secretAccessKey,
},
transport: t,
}
}
return t
}

// RoundTrip implementation
func (a AWSTransport) RoundTrip(req *http.Request) (*http.Response, error) {
awsauth.Sign4(req, a.Credentials)
return a.transport.RoundTrip(req)
}
65 changes: 65 additions & 0 deletions pkg/adaptor/elasticsearch/transport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package elasticsearch

import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
)

const (
awsHmacHeader = "AWS4-HMAC-SHA256 Credential=accessKeyID"
awsAccessKey = "accessKeyID"
awsSecretKey = "secretAccessKey"
)

var mockServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectAWSRequest := r.URL.Path == "/aws"
if isAWSRequest(r) != expectAWSRequest {
w.WriteHeader(http.StatusBadRequest)
return
}
fmt.Fprint(w, "{\"ok\":1}")
}))

func isAWSRequest(r *http.Request) bool {
return strings.HasPrefix(r.Header.Get("Authorization"), awsHmacHeader) &&
r.Header.Get("X-Amz-Content-Sha256") != "" &&
r.Header.Get("X-Amz-Date") != ""
}

var transportTests = []struct {
path string
c *http.Client
}{
{
"/aws",
&http.Client{Transport: newTransport(awsAccessKey, awsSecretKey)},
},
{
"/other",
&http.Client{Transport: newTransport("", "")},
},
}

func TestTransport(t *testing.T) {
defer mockServer.Close()

for _, tt := range transportTests {
req, err := http.NewRequest(
http.MethodGet,
fmt.Sprintf("%s%s", mockServer.URL, tt.path),
nil,
)
if err != nil {
t.Fatalf("unable to build request, %s", err)
}
resp, err := tt.c.Do(req)
if err != nil {
t.Errorf("failed to send request, %s", err)
} else if resp.StatusCode == http.StatusBadRequest {
t.Errorf("bad request sent for %s", tt.path)
}
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions vendor/github.com/coreos/etcd/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vendor/github.com/coreos/etcd/Documentation/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4ca2ce1

Please sign in to comment.