Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

Commit

Permalink
remaining tests for elasticsearch adaptor
Browse files Browse the repository at this point in the history
  • Loading branch information
jipperinbham committed Jan 21, 2017
1 parent e5cb1ce commit 42a0122
Show file tree
Hide file tree
Showing 2 changed files with 314 additions and 17 deletions.
86 changes: 69 additions & 17 deletions pkg/adaptor/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package elasticsearch

import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
Expand All @@ -22,6 +23,60 @@ import (
version "github.com/hashicorp/go-version"
)

const (
description = "an elasticsearch sink adaptor"
sampleConfig = `
- es:
type: elasticsearch
uri: https://username:password@hostname:port
`
)

var (
_ adaptor.Adaptor = &Elasticsearch{}
)

// InvalidURIError wraps the underlying error when the provided URI is not parsable by url.Parse.
type InvalidURIError struct {
uri string
}

func (e InvalidURIError) Error() string {
return fmt.Sprintf("Invalid URI, %s", e.uri)
}

// ConnectionError wraps any failed calls to the provided uri.
type ConnectionError struct {
uri string
}

func (e ConnectionError) Error() string {
return fmt.Sprintf("failed to connect to %s", e.uri)
}

// VersionError represents any failure in attempting to obtain the version from the provided uri.
type VersionError struct {
uri string
v string
err string
}

func (e VersionError) Error() string {
if e.v == "" {
return fmt.Sprintf("unable to determine version from %s, %s", e.uri, e.err)
}
return fmt.Sprintf("%s running %s, %s", e.uri, e.v, e.err)
}

// InvalidTimeoutError wraps the underlying error when the provided is not parsable time.ParseDuration
type InvalidTimeoutError struct {
timeout string
}

func (e InvalidTimeoutError) Error() string {
return fmt.Sprintf("Invalid Timeout, %s", e.timeout)
}

// Elasticsearch is an adaptor to connect a pipeline to
// an elasticsearch cluster.
type Elasticsearch struct {
Expand All @@ -39,15 +94,9 @@ type Elasticsearch struct {

// Description for the Elasticsearcb adaptor
func (e *Elasticsearch) Description() string {
return "an elasticsearch sink adaptor"
return description
}

const sampleConfig = `
- es:
type: elasticsearch
uri: https://username:password@hostname:port
`

// SampleConfig for elasticsearch adaptor
func (e *Elasticsearch) SampleConfig() string {
return sampleConfig
Expand All @@ -72,11 +121,11 @@ func init() {

e.index, e.typeMatch, err = extra.CompileNamespace()
if err != nil {
return e, adaptor.NewError(adaptor.CRITICAL, path, fmt.Sprintf("can't split namespace into _index and typeMatch (%s)", err.Error()), nil)
return e, adaptor.NewError(adaptor.CRITICAL, path, fmt.Sprintf("can't split namespace into index and typeMatch (%s)", err.Error()), nil)
}

if err := e.setupClient(conf); err != nil {
return nil, adaptor.NewError(adaptor.CRITICAL, path, fmt.Sprintf("unable to setup client (%s)", err), nil)
return nil, err
}

return e, nil
Expand All @@ -85,7 +134,7 @@ func init() {

// Start the adaptor as a source (not implemented)
func (e *Elasticsearch) Start() error {
return fmt.Errorf("elasticsearch can't function as a source")
return errors.New("Start is unsupported for elasticsearch")
}

// Listen starts the listener
Expand Down Expand Up @@ -128,8 +177,9 @@ func (e *Elasticsearch) computeNamespace(Type string) string {
func (e *Elasticsearch) setupClient(conf Config) error {
uri, err := url.Parse(conf.URI)
if err != nil {
return err
return InvalidURIError{conf.URI}
}

hostsAndPorts := strings.Split(uri.Host, ",")
stringVersion, err := determineVersion(fmt.Sprintf("%s://%s", uri.Scheme, hostsAndPorts[0]))
if err != nil {
Expand All @@ -138,14 +188,14 @@ func (e *Elasticsearch) setupClient(conf Config) error {

v, err := version.NewVersion(stringVersion)
if err != nil {
return err
return VersionError{conf.URI, stringVersion, err.Error()}
}

httpClient := http.DefaultClient
if conf.Timeout != "" {
t, err := time.ParseDuration(conf.Timeout)
if err != nil {
return err
return InvalidTimeoutError{conf.Timeout}
}
httpClient = &http.Client{
Timeout: t,
Expand All @@ -170,17 +220,17 @@ func (e *Elasticsearch) setupClient(conf Config) error {
}
}

return fmt.Errorf("no client registered for version %s\n", stringVersion)
return VersionError{conf.URI, stringVersion, "unsupported client"}
}

func determineVersion(uri string) (string, error) {
resp, err := http.DefaultClient.Get(uri)
if err != nil {
return "", err
return "", ConnectionError{uri}
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
return "", VersionError{uri, "", "unable to read response body"}
}
defer resp.Body.Close()
var r struct {
Expand All @@ -191,7 +241,9 @@ func determineVersion(uri string) (string, error) {
}
err = json.Unmarshal(body, &r)
if err != nil {
return "", fmt.Errorf("unable to determine version from response, %s\n", string(body))
return "", VersionError{uri, "", fmt.Sprintf("malformed JSON: %s", body)}
} else if r.Version.Number == "" {
return "", VersionError{uri, "", fmt.Sprintf("missing version: %s", body)}
}
return r.Version.Number, nil
}
Expand Down
Loading

0 comments on commit 42a0122

Please sign in to comment.