From 3577ebffd1a47b13b5dd54876e8ba6d8b273f12e Mon Sep 17 00:00:00 2001 From: JP Phillips Date: Tue, 21 Mar 2017 15:44:41 -0500 Subject: [PATCH] namespace parameter is now a single entity (#317) all references to the namespace in each adaptor/function configuration has been removed and is only expected to contain a single part to serve the purpose of filtering on adaptor namespaces (i.e. collections/tables/etc.) fixes #310 --- CHANGELOG.md | 1 + README.md | 2 + adaptor/adaptor.go | 26 ---------- adaptor/adaptor_test.go | 54 +------------------- adaptor/elasticsearch/elasticsearch.go | 10 +++- adaptor/elasticsearch/elasticsearch_test.go | 16 +++--- cmd/transporter/goja_buider_test.go | 12 ++--- cmd/transporter/goja_builder.go | 15 ++++-- cmd/transporter/init.go | 2 + integration_tests/mongo_to_es/app.js | 6 +-- integration_tests/mongo_to_mongo/app.js | 4 +- integration_tests/mongo_to_rethink/app.js | 4 +- integration_tests/rethink_to_postgres/app.js | 4 +- pipeline/node.go | 16 ++---- pipeline/pipeline_events_integration_test.go | 4 +- pipeline/pipeline_integration_test.go | 4 +- 16 files changed, 55 insertions(+), 125 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a1df04d8f..d5b8b8af5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Transporter no longer requires a YAML file. All configuration is in the JS file - if using transporter as a library, all packages have been moved out of `pkg` to the top-level - `eval` command removed - `list` command removed +- the `namespace` parameter now only expects a single part (the regexp filter), all adaptors have been updated to pull the "database name" from the provided URI ### Features - NEW RabbitMQ adaptor [#298](https://github.com/compose/transporter/pull/298) diff --git a/README.md b/README.md index 981a9d34a..98d06ef01 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,8 @@ var sink = elasticsearch({ }) t.Source(source).Save(sink) +// t.Source("source", source).Save("sink", sink) +// t.Source("source", source, "namespace").Save("sink", sink, "namespace") $ ``` diff --git a/adaptor/adaptor.go b/adaptor/adaptor.go index e030db0d6..c43434e83 100644 --- a/adaptor/adaptor.go +++ b/adaptor/adaptor.go @@ -4,8 +4,6 @@ import ( "encoding/json" "errors" "fmt" - "regexp" - "strings" "sync" "github.com/compose/transporter/client" @@ -90,30 +88,6 @@ func (c Config) GetString(key string) string { return s } -// split a namespace into it's elements -// this covers a few standard cases, elasticsearch, mongo, rethink, but it's -// expected to be all inclusive. -func splitNamespace(ns string) (string, string, error) { - fields := strings.SplitN(ns, ".", 2) - - if len(fields) != 2 { - return "", "", ErrNamespaceMalformed - } - return fields[0], fields[1], nil -} - -// CompileNamespace split's on the first '.' and then compiles the second portion to use as the msg filter -func CompileNamespace(ns string) (string, *regexp.Regexp, error) { - field0, field1, err := splitNamespace(ns) - - if err != nil { - return "", nil, err - } - - compiledNs, err := regexp.Compile(strings.Trim(field1, "/")) - return field0, compiledNs, err -} - // BaseConfig is a standard typed config struct to use for as general purpose config for most databases. type BaseConfig struct { URI string `json:"uri"` diff --git a/adaptor/adaptor_test.go b/adaptor/adaptor_test.go index 8a2a1f78f..a3f0b89ce 100644 --- a/adaptor/adaptor_test.go +++ b/adaptor/adaptor_test.go @@ -2,10 +2,10 @@ package adaptor_test import ( "reflect" - "regexp" "testing" "github.com/compose/transporter/adaptor" + _ "github.com/compose/transporter/log" ) func init() { @@ -87,55 +87,3 @@ func TestConfig(t *testing.T) { } } } - -var compileNamespaceTests = []struct { - name string - cfg adaptor.Config - partOne string - r *regexp.Regexp - err error -}{ - { - "simple ns", - adaptor.Config{"namespace": "a.b"}, - "a", - regexp.MustCompile("b"), - nil, - }, - { - "simple regexp ns", - adaptor.Config{"namespace": "a..*"}, - "a", - regexp.MustCompile(".*"), - nil, - }, - { - "simple regexp ns with /", - adaptor.Config{"namespace": "a./.*/"}, - "a", - regexp.MustCompile(".*"), - nil, - }, - { - "malformed regexp", - adaptor.Config{"namespace": "a"}, - "", - nil, - adaptor.ErrNamespaceMalformed, - }, -} - -func TestCompileNamespace(t *testing.T) { - for _, ct := range compileNamespaceTests { - out, r, err := adaptor.CompileNamespace(ct.cfg.GetString("namespace")) - if !reflect.DeepEqual(out, ct.partOne) { - t.Errorf("[%s] wrong value returned, expected %s, got %s", ct.name, ct.partOne, out) - } - if !reflect.DeepEqual(r, ct.r) { - t.Errorf("[%s] wrong regexp returned, expected %+v, got %+v", ct.name, ct.r, r) - } - if err != ct.err { - t.Errorf("[%s] wrong error returned, expected %+v, got %+v", ct.name, ct.err, err) - } - } -} diff --git a/adaptor/elasticsearch/elasticsearch.go b/adaptor/elasticsearch/elasticsearch.go index 9a44d90d6..3f4b3349f 100644 --- a/adaptor/elasticsearch/elasticsearch.go +++ b/adaptor/elasticsearch/elasticsearch.go @@ -20,7 +20,11 @@ import ( ) const ( - description = "an elasticsearch sink adaptor" + // DefaultIndex is used when there is not one included in the provided URI. + DefaultIndex = "test" + + description = "an elasticsearch sink adaptor" + sampleConfig = `{ "uri": "${ELASTICSEARCH_URI}" // "timeout": "10s", // defaults to 30s @@ -81,6 +85,10 @@ func setupWriter(conf *Elasticsearch) (client.Writer, error) { return nil, client.InvalidURIError{URI: conf.URI, Err: err.Error()} } + if uri.Path == "" { + uri.Path = fmt.Sprintf("/%s", DefaultIndex) + } + hostsAndPorts := strings.Split(uri.Host, ",") stringVersion, err := determineVersion(fmt.Sprintf("%s://%s", uri.Scheme, hostsAndPorts[0]), uri.User) if err != nil { diff --git a/adaptor/elasticsearch/elasticsearch_test.go b/adaptor/elasticsearch/elasticsearch_test.go index 3c0add306..a55adc933 100644 --- a/adaptor/elasticsearch/elasticsearch_test.go +++ b/adaptor/elasticsearch/elasticsearch_test.go @@ -38,7 +38,7 @@ var ( authURI = func() string { uri, _ := url.Parse(authedServer.URL) uri.User = url.UserPassword(testUser, testPwd) - return fmt.Sprintf("%s/test", uri.String()) + return uri.String() } ) var authedServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -73,12 +73,12 @@ var clientTests = []struct { }{ { "base config", - adaptor.Config{"uri": fmt.Sprintf("%s/test", goodVersionServer.URL)}, + adaptor.Config{"uri": fmt.Sprintf("%s", goodVersionServer.URL)}, nil, }, { "timeout config", - adaptor.Config{"uri": fmt.Sprintf("%s/test", goodVersionServer.URL), "timeout": "60s"}, + adaptor.Config{"uri": fmt.Sprintf("%s", goodVersionServer.URL), "timeout": "60s"}, nil, }, { @@ -93,7 +93,7 @@ var clientTests = []struct { }, { "no connection", - adaptor.Config{"uri": "http://localhost:7200/test"}, + adaptor.Config{"uri": "http://localhost:7200"}, client.ConnectError{Reason: "http://localhost:7200"}, }, { @@ -108,13 +108,13 @@ var clientTests = []struct { }, { "bad version", - adaptor.Config{"uri": fmt.Sprintf("%s/test", badVersionServer.URL)}, - client.VersionError{URI: fmt.Sprintf("%s/test", badVersionServer.URL), V: "not a version", Err: "Malformed version: not a version"}, + adaptor.Config{"uri": badVersionServer.URL}, + client.VersionError{URI: badVersionServer.URL, V: "not a version", Err: "Malformed version: not a version"}, }, { "unsupported version", - adaptor.Config{"uri": fmt.Sprintf("%s/test", unsupportedVersionServer.URL)}, - client.VersionError{URI: fmt.Sprintf("%s/test", unsupportedVersionServer.URL), V: "0.9.2", Err: "unsupported client"}, + adaptor.Config{"uri": unsupportedVersionServer.URL}, + client.VersionError{URI: unsupportedVersionServer.URL, V: "0.9.2", Err: "unsupported client"}, }, } diff --git a/cmd/transporter/goja_buider_test.go b/cmd/transporter/goja_buider_test.go index 783eb69b7..abf583a01 100644 --- a/cmd/transporter/goja_buider_test.go +++ b/cmd/transporter/goja_buider_test.go @@ -11,19 +11,19 @@ import ( func TestNewBuilder(t *testing.T) { a := buildAdaptor("mongodb")(map[string]interface{}{"uri": "mongo://localhost:27017"}) - source, err := pipeline.NewNode("source", a.name, "test./.*/", a.a, nil) + source, err := pipeline.NewNode("source", a.name, DefaultNamespace, a.a, nil) if err != nil { t.Fatalf("unexpected error, %s\n", err) } a = buildAdaptor("elasticsearch")(map[string]interface{}{"uri": "http://localhost:9200"}) - sink, err := pipeline.NewNode("sink", a.name, "test./.*/", a.a, source) + sink, err := pipeline.NewNode("sink", a.name, DefaultNamespace, a.a, source) if err != nil { t.Fatalf("unexpected error, %s\n", err) } transformer := buildFunction("transformer")(map[string]interface{}{"filename": "pipeline.js"}) - sink.Transforms = []*pipeline.Transform{&pipeline.Transform{Name: "trans", Fn: transformer, NsFilter: regexp.MustCompile(".*")}} + sink.Transforms = []*pipeline.Transform{&pipeline.Transform{Name: "trans", Fn: transformer, NsFilter: regexp.MustCompile(DefaultNamespace)}} expected := "Transporter:\n" expected += source.String() @@ -41,18 +41,18 @@ func TestNewBuilder(t *testing.T) { func TestNewBuilderWithEnv(t *testing.T) { os.Setenv("TEST_MONGO_URI", "mongo://localhost:27017") a := buildAdaptor("mongodb")(map[string]interface{}{"uri": "mongo://localhost:27017"}) - source, err := pipeline.NewNode("source", a.name, "test./.*/", a.a, nil) + source, err := pipeline.NewNode("source", a.name, DefaultNamespace, a.a, nil) if err != nil { t.Fatalf("unexpected error, %s\n", err) } a = buildAdaptor("elasticsearch")(map[string]interface{}{"uri": "http://localhost:9200"}) - sink, err := pipeline.NewNode("sink", a.name, "test./.*/", a.a, source) + sink, err := pipeline.NewNode("sink", a.name, DefaultNamespace, a.a, source) if err != nil { t.Fatalf("unexpected error, %s\n", err) } transformer := buildFunction("transformer")(map[string]interface{}{"filename": "pipeline.js"}) - sink.Transforms = []*pipeline.Transform{&pipeline.Transform{Name: "trans", Fn: transformer, NsFilter: regexp.MustCompile(".*")}} + sink.Transforms = []*pipeline.Transform{&pipeline.Transform{Name: "trans", Fn: transformer, NsFilter: regexp.MustCompile(DefaultNamespace)}} expected := "Transporter:\n" expected += source.String() diff --git a/cmd/transporter/goja_builder.go b/cmd/transporter/goja_builder.go index 30c64aaac..e3aa39a53 100644 --- a/cmd/transporter/goja_builder.go +++ b/cmd/transporter/goja_builder.go @@ -8,6 +8,7 @@ import ( "os" "os/signal" "regexp" + "strings" "syscall" "time" @@ -20,6 +21,10 @@ import ( "github.com/oklog/oklog/pkg/group" ) +const ( + DefaultNamespace = "/.*/" +) + func NewBuilder(file string) (*Transporter, error) { t := &Transporter{} t.vm = goja.New() @@ -160,7 +165,7 @@ func (t *Transporter) Source(call goja.FunctionCall) goja.Value { func (n *Node) Transform(call goja.FunctionCall) goja.Value { name, f, ns := exportArgs(call.Arguments) - _, nsFilter, err := adaptor.CompileNamespace(ns) + compiledNs, err := regexp.Compile(strings.Trim(ns, "/")) if err != nil { panic(err) } @@ -169,17 +174,17 @@ func (n *Node) Transform(call goja.FunctionCall) goja.Value { source: n.parent, transforms: make([]*pipeline.Transform, 0), } - tf.transforms = append(tf.transforms, &pipeline.Transform{Name: name, Fn: f.(function.Function), NsFilter: nsFilter}) + tf.transforms = append(tf.transforms, &pipeline.Transform{Name: name, Fn: f.(function.Function), NsFilter: compiledNs}) return n.vm.ToValue(tf) } func (tf *Transformer) Transform(call goja.FunctionCall) goja.Value { name, f, ns := exportArgs(call.Arguments) - _, nsFilter, err := adaptor.CompileNamespace(ns) + compiledNs, err := regexp.Compile(strings.Trim(ns, "/")) if err != nil { panic(err) } - t := &pipeline.Transform{Name: name, Fn: f.(function.Function), NsFilter: nsFilter} + t := &pipeline.Transform{Name: name, Fn: f.(function.Function), NsFilter: compiledNs} tf.transforms = append(tf.transforms, t) return tf.vm.ToValue(tf) } @@ -218,7 +223,7 @@ func exportArgs(args []goja.Value) (string, interface{}, string) { uuid, _ := uuid.NewV4() var ( name = uuid.String() - namespace = "test./.*/" + namespace = DefaultNamespace a interface{} ) if n, ok := args[0].Export().(string); ok { diff --git a/cmd/transporter/init.go b/cmd/transporter/init.go index fbf74b4d7..48a869f13 100644 --- a/cmd/transporter/init.go +++ b/cmd/transporter/init.go @@ -35,6 +35,8 @@ func runInit(args []string) error { } } appFileHandle.WriteString(`t.Source(source).Save(sink)`) + appFileHandle.WriteString(`// t.Source("source", source).Save("sink", sink)`) + appFileHandle.WriteString(`// t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")`) appFileHandle.WriteString("\n") return nil } diff --git a/integration_tests/mongo_to_es/app.js b/integration_tests/mongo_to_es/app.js index dc0f3fb4d..72d2907f8 100644 --- a/integration_tests/mongo_to_es/app.js +++ b/integration_tests/mongo_to_es/app.js @@ -4,8 +4,8 @@ enron_source_mongo = mongodb({ }) enron_sink_es = elasticsearch({ - "uri": "https://${ES_ENRON_SINK_USER}:${ES_ENRON_SINK_PASSWORD}@${ES_ENRON_SINK_URI}" + "uri": "https://${ES_ENRON_SINK_USER}:${ES_ENRON_SINK_PASSWORD}@${ES_ENRON_SINK_URI}/enron" }) -t.Source("enron_source_mongo", enron_source_mongo, "enron.emails") - .Save("enron_sink_es", enron_sink_es, "enron.emails"); +t.Source("enron_source_mongo", enron_source_mongo, "emails") + .Save("enron_sink_es", enron_sink_es, "emails"); diff --git a/integration_tests/mongo_to_mongo/app.js b/integration_tests/mongo_to_mongo/app.js index 1eab2b65c..c44f6c95c 100644 --- a/integration_tests/mongo_to_mongo/app.js +++ b/integration_tests/mongo_to_mongo/app.js @@ -11,5 +11,5 @@ enron_sink_mongo = mongodb({ "fsync": true }) -t.Source("enron_source_mongo", enron_source_mongo, "enron.emails") - .Save("enron_sink_mongo", enron_sink_mongo, "enron.emails"); +t.Source("enron_source_mongo", enron_source_mongo, "emails") + .Save("enron_sink_mongo", enron_sink_mongo, "emails"); diff --git a/integration_tests/mongo_to_rethink/app.js b/integration_tests/mongo_to_rethink/app.js index 02719dd91..9907d3ed3 100644 --- a/integration_tests/mongo_to_rethink/app.js +++ b/integration_tests/mongo_to_rethink/app.js @@ -8,5 +8,5 @@ enron_sink_rethink = rethinkdb({ "ssl": true }) -t.Source("enron_source_mongo", enron_source_mongo, "enron.emails") - .Save("enron_sink_rethink", enron_sink_rethink, "enron.emails"); +t.Source("enron_source_mongo", enron_source_mongo, "emails") + .Save("enron_sink_rethink", enron_sink_rethink, "emails"); diff --git a/integration_tests/rethink_to_postgres/app.js b/integration_tests/rethink_to_postgres/app.js index 5c80e0c66..01c018d09 100644 --- a/integration_tests/rethink_to_postgres/app.js +++ b/integration_tests/rethink_to_postgres/app.js @@ -7,5 +7,5 @@ enron_sink_postgres = postgres({ "uri": "postgres://${POSTGRES_ENRON_SINK_USER}:${POSTGRES_ENRON_SINK_PASSWORD}@${POSTGRES_ENRON_SINK_URI}" }) -t.Source("enron_source_rethink", enron_source_rethink, "enron.emails") - .Save("enron_sink_postgres", enron_sink_postgres, "enron.emails"); +t.Source("enron_source_rethink", enron_source_rethink, "emails") + .Save("enron_sink_postgres", enron_sink_postgres, "emails"); diff --git a/pipeline/node.go b/pipeline/node.go index a9d4efcc4..7b2a85b0b 100644 --- a/pipeline/node.go +++ b/pipeline/node.go @@ -9,6 +9,7 @@ package pipeline import ( "fmt" "regexp" + "strings" "sync" "github.com/compose/transporter/adaptor" @@ -59,14 +60,14 @@ type Transform struct { // NewNode creates a new Node struct func NewNode(name, kind, ns string, a adaptor.Adaptor, parent *Node) (*Node, error) { - _, nsFilter, err := adaptor.CompileNamespace(ns) + compiledNs, err := regexp.Compile(strings.Trim(ns, "/")) if err != nil { return nil, err } n := &Node{ Name: name, Type: kind, - nsFilter: nsFilter, + nsFilter: compiledNs, Children: make([]*Node, 0), Transforms: make([]*Transform, 0), done: make(chan struct{}), @@ -144,17 +145,6 @@ func (n *Node) Path() string { return n.Parent.Path() + "/" + n.Name } -// AddTransform adds the provided function.Function to the Node and will be called -// before sending any messages down the pipeline. -func (n *Node) AddTransform(name string, f function.Function, ns string) error { - _, nsFilter, err := adaptor.CompileNamespace(ns) - if err != nil { - return err - } - n.Transforms = append(n.Transforms, &Transform{name, f, nsFilter}) - return nil -} - // Start starts the nodes children in a go routine, and then runs either Start() or Listen() // on the node's adaptor. Root nodes (nodes with no parent) will run Start() // and will emit messages to it's children, diff --git a/pipeline/pipeline_events_integration_test.go b/pipeline/pipeline_events_integration_test.go index 30dff6eba..30ab7ce8d 100644 --- a/pipeline/pipeline_events_integration_test.go +++ b/pipeline/pipeline_events_integration_test.go @@ -67,7 +67,7 @@ func TestEventsBroadcast(t *testing.T) { if err != nil { t.Fatalf("can't create GetAdaptor, got %s", err) } - dummyOutNode, err := NewNode("dummyFileOut", "file", "blah./.*/", f, nil) + dummyOutNode, err := NewNode("dummyFileOut", "file", "/.*/", f, nil) if err != nil { t.Fatalf("can't create NewNode, got %s", err) } @@ -75,7 +75,7 @@ func TestEventsBroadcast(t *testing.T) { if err != nil { t.Fatalf("can't create GetAdaptor, got %s", err) } - _, err = NewNode("dummyFileIn", "file", "blah./.*/", f, dummyOutNode) + _, err = NewNode("dummyFileIn", "file", "/.*/", f, dummyOutNode) if err != nil { t.Fatalf("can't create NewNode, got %s", err) } diff --git a/pipeline/pipeline_integration_test.go b/pipeline/pipeline_integration_test.go index 21394797e..6b52cab4b 100644 --- a/pipeline/pipeline_integration_test.go +++ b/pipeline/pipeline_integration_test.go @@ -44,7 +44,7 @@ func TestFileToFile(t *testing.T) { if err != nil { t.Fatalf("can't create GetAdaptor, got %s", err) } - outNode, err := NewNode("localfileout", "file", "blah./.*/", f, nil) + outNode, err := NewNode("localfileout", "file", "/.*/", f, nil) if err != nil { t.Fatalf("can't create newnode, got %s", err) } @@ -52,7 +52,7 @@ func TestFileToFile(t *testing.T) { if err != nil { t.Fatalf("can't create GetAdaptor, got %s", err) } - _, err = NewNode("localfilein", "file", "blah./.*/", f, outNode) + _, err = NewNode("localfilein", "file", "/.*/", f, outNode) if err != nil { t.Fatalf("can't create newnode, got %s", err) }