Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

Commit

Permalink
namespace parameter is now a single entity (#317)
Browse files Browse the repository at this point in the history
all references to the namespace in each adaptor/function configuration has been removed and is only expected to contain a single part to serve the purpose of filtering on adaptor namespaces (i.e. collections/tables/etc.)

fixes #310
  • Loading branch information
jipperinbham authored Mar 21, 2017
1 parent 8f0d800 commit 3577ebf
Show file tree
Hide file tree
Showing 16 changed files with 55 additions and 125 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Transporter no longer requires a YAML file. All configuration is in the JS file
- if using transporter as a library, all packages have been moved out of `pkg` to the top-level
- `eval` command removed
- `list` command removed
- the `namespace` parameter now only expects a single part (the regexp filter), all adaptors have been updated to pull the "database name" from the provided URI

### Features
- NEW RabbitMQ adaptor [#298](https://github.com/compose/transporter/pull/298)
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ var sink = elasticsearch({
})
t.Source(source).Save(sink)
// t.Source("source", source).Save("sink", sink)
// t.Source("source", source, "namespace").Save("sink", sink, "namespace")
$
```

Expand Down
26 changes: 0 additions & 26 deletions adaptor/adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"encoding/json"
"errors"
"fmt"
"regexp"
"strings"
"sync"

"github.com/compose/transporter/client"
Expand Down Expand Up @@ -90,30 +88,6 @@ func (c Config) GetString(key string) string {
return s
}

// split a namespace into it's elements
// this covers a few standard cases, elasticsearch, mongo, rethink, but it's
// expected to be all inclusive.
func splitNamespace(ns string) (string, string, error) {
fields := strings.SplitN(ns, ".", 2)

if len(fields) != 2 {
return "", "", ErrNamespaceMalformed
}
return fields[0], fields[1], nil
}

// CompileNamespace split's on the first '.' and then compiles the second portion to use as the msg filter
func CompileNamespace(ns string) (string, *regexp.Regexp, error) {
field0, field1, err := splitNamespace(ns)

if err != nil {
return "", nil, err
}

compiledNs, err := regexp.Compile(strings.Trim(field1, "/"))
return field0, compiledNs, err
}

// BaseConfig is a standard typed config struct to use for as general purpose config for most databases.
type BaseConfig struct {
URI string `json:"uri"`
Expand Down
54 changes: 1 addition & 53 deletions adaptor/adaptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package adaptor_test

import (
"reflect"
"regexp"
"testing"

"github.com/compose/transporter/adaptor"
_ "github.com/compose/transporter/log"
)

func init() {
Expand Down Expand Up @@ -87,55 +87,3 @@ func TestConfig(t *testing.T) {
}
}
}

var compileNamespaceTests = []struct {
name string
cfg adaptor.Config
partOne string
r *regexp.Regexp
err error
}{
{
"simple ns",
adaptor.Config{"namespace": "a.b"},
"a",
regexp.MustCompile("b"),
nil,
},
{
"simple regexp ns",
adaptor.Config{"namespace": "a..*"},
"a",
regexp.MustCompile(".*"),
nil,
},
{
"simple regexp ns with /",
adaptor.Config{"namespace": "a./.*/"},
"a",
regexp.MustCompile(".*"),
nil,
},
{
"malformed regexp",
adaptor.Config{"namespace": "a"},
"",
nil,
adaptor.ErrNamespaceMalformed,
},
}

func TestCompileNamespace(t *testing.T) {
for _, ct := range compileNamespaceTests {
out, r, err := adaptor.CompileNamespace(ct.cfg.GetString("namespace"))
if !reflect.DeepEqual(out, ct.partOne) {
t.Errorf("[%s] wrong value returned, expected %s, got %s", ct.name, ct.partOne, out)
}
if !reflect.DeepEqual(r, ct.r) {
t.Errorf("[%s] wrong regexp returned, expected %+v, got %+v", ct.name, ct.r, r)
}
if err != ct.err {
t.Errorf("[%s] wrong error returned, expected %+v, got %+v", ct.name, ct.err, err)
}
}
}
10 changes: 9 additions & 1 deletion adaptor/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ import (
)

const (
description = "an elasticsearch sink adaptor"
// DefaultIndex is used when there is not one included in the provided URI.
DefaultIndex = "test"

description = "an elasticsearch sink adaptor"

sampleConfig = `{
"uri": "${ELASTICSEARCH_URI}"
// "timeout": "10s", // defaults to 30s
Expand Down Expand Up @@ -81,6 +85,10 @@ func setupWriter(conf *Elasticsearch) (client.Writer, error) {
return nil, client.InvalidURIError{URI: conf.URI, Err: err.Error()}
}

if uri.Path == "" {
uri.Path = fmt.Sprintf("/%s", DefaultIndex)
}

hostsAndPorts := strings.Split(uri.Host, ",")
stringVersion, err := determineVersion(fmt.Sprintf("%s://%s", uri.Scheme, hostsAndPorts[0]), uri.User)
if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions adaptor/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (
authURI = func() string {
uri, _ := url.Parse(authedServer.URL)
uri.User = url.UserPassword(testUser, testPwd)
return fmt.Sprintf("%s/test", uri.String())
return uri.String()
}
)
var authedServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -73,12 +73,12 @@ var clientTests = []struct {
}{
{
"base config",
adaptor.Config{"uri": fmt.Sprintf("%s/test", goodVersionServer.URL)},
adaptor.Config{"uri": fmt.Sprintf("%s", goodVersionServer.URL)},
nil,
},
{
"timeout config",
adaptor.Config{"uri": fmt.Sprintf("%s/test", goodVersionServer.URL), "timeout": "60s"},
adaptor.Config{"uri": fmt.Sprintf("%s", goodVersionServer.URL), "timeout": "60s"},
nil,
},
{
Expand All @@ -93,7 +93,7 @@ var clientTests = []struct {
},
{
"no connection",
adaptor.Config{"uri": "http://localhost:7200/test"},
adaptor.Config{"uri": "http://localhost:7200"},
client.ConnectError{Reason: "http://localhost:7200"},
},
{
Expand All @@ -108,13 +108,13 @@ var clientTests = []struct {
},
{
"bad version",
adaptor.Config{"uri": fmt.Sprintf("%s/test", badVersionServer.URL)},
client.VersionError{URI: fmt.Sprintf("%s/test", badVersionServer.URL), V: "not a version", Err: "Malformed version: not a version"},
adaptor.Config{"uri": badVersionServer.URL},
client.VersionError{URI: badVersionServer.URL, V: "not a version", Err: "Malformed version: not a version"},
},
{
"unsupported version",
adaptor.Config{"uri": fmt.Sprintf("%s/test", unsupportedVersionServer.URL)},
client.VersionError{URI: fmt.Sprintf("%s/test", unsupportedVersionServer.URL), V: "0.9.2", Err: "unsupported client"},
adaptor.Config{"uri": unsupportedVersionServer.URL},
client.VersionError{URI: unsupportedVersionServer.URL, V: "0.9.2", Err: "unsupported client"},
},
}

Expand Down
12 changes: 6 additions & 6 deletions cmd/transporter/goja_buider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@ import (

func TestNewBuilder(t *testing.T) {
a := buildAdaptor("mongodb")(map[string]interface{}{"uri": "mongo://localhost:27017"})
source, err := pipeline.NewNode("source", a.name, "test./.*/", a.a, nil)
source, err := pipeline.NewNode("source", a.name, DefaultNamespace, a.a, nil)
if err != nil {
t.Fatalf("unexpected error, %s\n", err)
}

a = buildAdaptor("elasticsearch")(map[string]interface{}{"uri": "http://localhost:9200"})
sink, err := pipeline.NewNode("sink", a.name, "test./.*/", a.a, source)
sink, err := pipeline.NewNode("sink", a.name, DefaultNamespace, a.a, source)
if err != nil {
t.Fatalf("unexpected error, %s\n", err)
}

transformer := buildFunction("transformer")(map[string]interface{}{"filename": "pipeline.js"})
sink.Transforms = []*pipeline.Transform{&pipeline.Transform{Name: "trans", Fn: transformer, NsFilter: regexp.MustCompile(".*")}}
sink.Transforms = []*pipeline.Transform{&pipeline.Transform{Name: "trans", Fn: transformer, NsFilter: regexp.MustCompile(DefaultNamespace)}}

expected := "Transporter:\n"
expected += source.String()
Expand All @@ -41,18 +41,18 @@ func TestNewBuilder(t *testing.T) {
func TestNewBuilderWithEnv(t *testing.T) {
os.Setenv("TEST_MONGO_URI", "mongo://localhost:27017")
a := buildAdaptor("mongodb")(map[string]interface{}{"uri": "mongo://localhost:27017"})
source, err := pipeline.NewNode("source", a.name, "test./.*/", a.a, nil)
source, err := pipeline.NewNode("source", a.name, DefaultNamespace, a.a, nil)
if err != nil {
t.Fatalf("unexpected error, %s\n", err)
}
a = buildAdaptor("elasticsearch")(map[string]interface{}{"uri": "http://localhost:9200"})
sink, err := pipeline.NewNode("sink", a.name, "test./.*/", a.a, source)
sink, err := pipeline.NewNode("sink", a.name, DefaultNamespace, a.a, source)
if err != nil {
t.Fatalf("unexpected error, %s\n", err)
}

transformer := buildFunction("transformer")(map[string]interface{}{"filename": "pipeline.js"})
sink.Transforms = []*pipeline.Transform{&pipeline.Transform{Name: "trans", Fn: transformer, NsFilter: regexp.MustCompile(".*")}}
sink.Transforms = []*pipeline.Transform{&pipeline.Transform{Name: "trans", Fn: transformer, NsFilter: regexp.MustCompile(DefaultNamespace)}}

expected := "Transporter:\n"
expected += source.String()
Expand Down
15 changes: 10 additions & 5 deletions cmd/transporter/goja_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"os/signal"
"regexp"
"strings"
"syscall"
"time"

Expand All @@ -20,6 +21,10 @@ import (
"github.com/oklog/oklog/pkg/group"
)

const (
DefaultNamespace = "/.*/"
)

func NewBuilder(file string) (*Transporter, error) {
t := &Transporter{}
t.vm = goja.New()
Expand Down Expand Up @@ -160,7 +165,7 @@ func (t *Transporter) Source(call goja.FunctionCall) goja.Value {

func (n *Node) Transform(call goja.FunctionCall) goja.Value {
name, f, ns := exportArgs(call.Arguments)
_, nsFilter, err := adaptor.CompileNamespace(ns)
compiledNs, err := regexp.Compile(strings.Trim(ns, "/"))
if err != nil {
panic(err)
}
Expand All @@ -169,17 +174,17 @@ func (n *Node) Transform(call goja.FunctionCall) goja.Value {
source: n.parent,
transforms: make([]*pipeline.Transform, 0),
}
tf.transforms = append(tf.transforms, &pipeline.Transform{Name: name, Fn: f.(function.Function), NsFilter: nsFilter})
tf.transforms = append(tf.transforms, &pipeline.Transform{Name: name, Fn: f.(function.Function), NsFilter: compiledNs})
return n.vm.ToValue(tf)
}

func (tf *Transformer) Transform(call goja.FunctionCall) goja.Value {
name, f, ns := exportArgs(call.Arguments)
_, nsFilter, err := adaptor.CompileNamespace(ns)
compiledNs, err := regexp.Compile(strings.Trim(ns, "/"))
if err != nil {
panic(err)
}
t := &pipeline.Transform{Name: name, Fn: f.(function.Function), NsFilter: nsFilter}
t := &pipeline.Transform{Name: name, Fn: f.(function.Function), NsFilter: compiledNs}
tf.transforms = append(tf.transforms, t)
return tf.vm.ToValue(tf)
}
Expand Down Expand Up @@ -218,7 +223,7 @@ func exportArgs(args []goja.Value) (string, interface{}, string) {
uuid, _ := uuid.NewV4()
var (
name = uuid.String()
namespace = "test./.*/"
namespace = DefaultNamespace
a interface{}
)
if n, ok := args[0].Export().(string); ok {
Expand Down
2 changes: 2 additions & 0 deletions cmd/transporter/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ func runInit(args []string) error {
}
}
appFileHandle.WriteString(`t.Source(source).Save(sink)`)
appFileHandle.WriteString(`// t.Source("source", source).Save("sink", sink)`)
appFileHandle.WriteString(`// t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")`)
appFileHandle.WriteString("\n")
return nil
}
6 changes: 3 additions & 3 deletions integration_tests/mongo_to_es/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ enron_source_mongo = mongodb({
})

enron_sink_es = elasticsearch({
"uri": "https://${ES_ENRON_SINK_USER}:${ES_ENRON_SINK_PASSWORD}@${ES_ENRON_SINK_URI}"
"uri": "https://${ES_ENRON_SINK_USER}:${ES_ENRON_SINK_PASSWORD}@${ES_ENRON_SINK_URI}/enron"
})

t.Source("enron_source_mongo", enron_source_mongo, "enron.emails")
.Save("enron_sink_es", enron_sink_es, "enron.emails");
t.Source("enron_source_mongo", enron_source_mongo, "emails")
.Save("enron_sink_es", enron_sink_es, "emails");
4 changes: 2 additions & 2 deletions integration_tests/mongo_to_mongo/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ enron_sink_mongo = mongodb({
"fsync": true
})

t.Source("enron_source_mongo", enron_source_mongo, "enron.emails")
.Save("enron_sink_mongo", enron_sink_mongo, "enron.emails");
t.Source("enron_source_mongo", enron_source_mongo, "emails")
.Save("enron_sink_mongo", enron_sink_mongo, "emails");
4 changes: 2 additions & 2 deletions integration_tests/mongo_to_rethink/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ enron_sink_rethink = rethinkdb({
"ssl": true
})

t.Source("enron_source_mongo", enron_source_mongo, "enron.emails")
.Save("enron_sink_rethink", enron_sink_rethink, "enron.emails");
t.Source("enron_source_mongo", enron_source_mongo, "emails")
.Save("enron_sink_rethink", enron_sink_rethink, "emails");
4 changes: 2 additions & 2 deletions integration_tests/rethink_to_postgres/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ enron_sink_postgres = postgres({
"uri": "postgres://${POSTGRES_ENRON_SINK_USER}:${POSTGRES_ENRON_SINK_PASSWORD}@${POSTGRES_ENRON_SINK_URI}"
})

t.Source("enron_source_rethink", enron_source_rethink, "enron.emails")
.Save("enron_sink_postgres", enron_sink_postgres, "enron.emails");
t.Source("enron_source_rethink", enron_source_rethink, "emails")
.Save("enron_sink_postgres", enron_sink_postgres, "emails");
16 changes: 3 additions & 13 deletions pipeline/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package pipeline
import (
"fmt"
"regexp"
"strings"
"sync"

"github.com/compose/transporter/adaptor"
Expand Down Expand Up @@ -59,14 +60,14 @@ type Transform struct {

// NewNode creates a new Node struct
func NewNode(name, kind, ns string, a adaptor.Adaptor, parent *Node) (*Node, error) {
_, nsFilter, err := adaptor.CompileNamespace(ns)
compiledNs, err := regexp.Compile(strings.Trim(ns, "/"))
if err != nil {
return nil, err
}
n := &Node{
Name: name,
Type: kind,
nsFilter: nsFilter,
nsFilter: compiledNs,
Children: make([]*Node, 0),
Transforms: make([]*Transform, 0),
done: make(chan struct{}),
Expand Down Expand Up @@ -144,17 +145,6 @@ func (n *Node) Path() string {
return n.Parent.Path() + "/" + n.Name
}

// AddTransform adds the provided function.Function to the Node and will be called
// before sending any messages down the pipeline.
func (n *Node) AddTransform(name string, f function.Function, ns string) error {
_, nsFilter, err := adaptor.CompileNamespace(ns)
if err != nil {
return err
}
n.Transforms = append(n.Transforms, &Transform{name, f, nsFilter})
return nil
}

// Start starts the nodes children in a go routine, and then runs either Start() or Listen()
// on the node's adaptor. Root nodes (nodes with no parent) will run Start()
// and will emit messages to it's children,
Expand Down
Loading

0 comments on commit 3577ebf

Please sign in to comment.