Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

29 noid #37

Merged
merged 6 commits into from
Dec 31, 2014
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions pkg/adaptor/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,15 @@ func (e *Elasticsearch) applyOp(msg *message.Msg) (*message.Msg, error) {
if msg.Op == message.Command {
err := e.runCommand(msg)
if err != nil {
e.pipe.Err <- NewError(ERROR, e.path, fmt.Sprintf("Elasticsearch error (%s)", err), msg.Document())
e.pipe.Err <- NewError(ERROR, e.path, fmt.Sprintf("Elasticsearch error (%s)", err), msg.Data)
}
return msg, nil
}

err := e.indexer.Index(e.index, e._type, msg.IDString(), "", nil, msg.Document(), false)
// TODO there might be some inconsistency here. elasticsearch uses the _id field for an primary index,
// and we're just mapping it to a string here.
err := e.indexer.Index(e.index, e._type, msg.IDString("_id"), "", nil, msg.Data, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, I don't know if it's better to use a transformer to remove the source id/_id to allow the sink to use its own way of auto-generating the id/_id or do something with the adaptor's config to tell it to ignore any ids provided, not really something that has to be solved now though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a good default is to just let the adaptor default to good behaviour, and use the data that's it's given. if the adaptor can't get an id, then it will auto create one. it's up to the transformer to hand that.
rethink is the interesting one here, in that
mongo->elasticsearch both use an _id, whereas with mongo->rethink, rethink uses an 'id', while mongo uses an _id

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 it's probably something that can be easily "solved" with good docs and examples

if err != nil {
e.pipe.Err <- NewError(ERROR, e.path, fmt.Sprintf("Elasticsearch error (%s)", err), msg.Document())
e.pipe.Err <- NewError(ERROR, e.path, fmt.Sprintf("Elasticsearch error (%s)", err), msg.Data)
}
return msg, nil
}
Expand Down Expand Up @@ -133,7 +134,11 @@ func (e *Elasticsearch) setupClient() {
}

func (e *Elasticsearch) runCommand(msg *message.Msg) error {
if _, hasKey := msg.Document()["flush"]; hasKey {
if !msg.IsMap() {
return nil
}

if _, hasKey := msg.Map()["flush"]; hasKey {
e.indexer.Flush()
}
return nil
Expand Down
6 changes: 2 additions & 4 deletions pkg/adaptor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package adaptor

import (
"fmt"

"gopkg.in/mgo.v2/bson"
)

// Adaptor errors have levels to indicate their severity.
Expand Down Expand Up @@ -47,11 +45,11 @@ type Error struct {
Lvl ErrorLevel
Str string
Path string
Record bson.M
Record interface{}
}

// NewError creates an Error type with the specificed level, path, message and record
func NewError(lvl ErrorLevel, path, str string, record bson.M) Error {
func NewError(lvl ErrorLevel, path, str string, record interface{}) Error {
return Error{Lvl: lvl, Path: path, Str: str, Record: record}
}

Expand Down
25 changes: 15 additions & 10 deletions pkg/adaptor/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,7 @@ func (d *File) Stop() error {
return nil
}

/*
* read each message from the file
*/
// read each message from the file
func (d *File) readFile() (err error) {
filename := strings.Replace(d.uri, "file://", "", 1)
d.filehandle, err = os.Open(filename)
Expand All @@ -100,18 +98,25 @@ func (d *File) readFile() (err error) {
* dump each message to the file
*/
func (d *File) dumpMessage(msg *message.Msg) (*message.Msg, error) {
jdoc, err := json.Marshal(msg.Document())
if err != nil {
d.pipe.Err <- NewError(ERROR, d.path, fmt.Sprintf("Can't unmarshal document (%s)", err.Error()), msg.Document())
return msg, nil
var line string

if msg.IsMap() {
ba, err := json.Marshal(msg.Map())
if err != nil {
d.pipe.Err <- NewError(ERROR, d.path, fmt.Sprintf("Can't unmarshal document (%s)", err.Error()), msg.Data)
return msg, nil
}
line = string(ba)
} else {
line = fmt.Sprintf("%v", msg.Data)
}

if strings.HasPrefix(d.uri, "stdout://") {
fmt.Println(string(jdoc))
fmt.Println(line)
} else {
_, err = fmt.Fprintln(d.filehandle, string(jdoc))
_, err := fmt.Fprintln(d.filehandle, line)
if err != nil {
d.pipe.Err <- NewError(ERROR, d.path, fmt.Sprintf("Can't unmarshal document (%s)", err.Error()), msg.Document())
d.pipe.Err <- NewError(ERROR, d.path, fmt.Sprintf("Error writing to file (%s)", err.Error()), msg.Data)
return msg, nil
}
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/adaptor/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,19 @@ func (i *Influxdb) Stop() error {
func (i *Influxdb) applyOp(msg *message.Msg) (*message.Msg, error) {
switch msg.Op {
case message.Insert:
docSize := len(msg.Document())
columns := make([]string, 0, docSize)
if !msg.IsMap() {
i.pipe.Err <- NewError(ERROR, i.path, "Rethinkdb error (document must be a json document)", msg.Data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad error message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

return msg, nil
}
doc := msg.Map()

sz := len(doc)
columns := make([]string, 0, sz)
points := make([][]interface{}, 1)
points[0] = make([]interface{}, 0, docSize)
for k := range msg.Document() {
points[0] = make([]interface{}, 0, sz)
for k, v := range doc {
columns = append(columns, k)
points[0] = append(points[0], msg.Document()[k])
points[0] = append(points[0], v)
}
series := &client.Series{
Name: i.seriesName,
Expand Down
28 changes: 19 additions & 9 deletions pkg/adaptor/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,20 @@ func (m *Mongodb) Stop() error {
// caller should pipe the error
func (m *Mongodb) writeMessage(msg *message.Msg) (*message.Msg, error) {
collection := m.mongoSession.DB(m.database).C(m.collection)
err := collection.Insert(msg.Document())

if !msg.IsMap() {
m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("Mongodb error (document must be a bson document, got %T instead)", msg.Data), msg.Data)
return msg, nil
}

doc := msg.Map()

err := collection.Insert(doc)
if mgo.IsDup(err) {
err = collection.Update(bson.M{"_id": msg.ID}, msg.Document())
err = collection.Update(bson.M{"_id": doc["_id"]}, doc)
}
if err != nil {
m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("Mongodb error (%s)", err.Error()), msg.Document())
m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("Mongodb error (%s)", err.Error()), msg.Data)
}
return msg, nil
}
Expand Down Expand Up @@ -192,25 +200,27 @@ func (m *Mongodb) tailData() (err error) {
return
}
if result.validOp() {
msg := message.NewMsg(message.OpTypeFromString(result.Op), nil)
msg.Timestamp = int64(result.Ts) >> 32

var doc bson.M
switch result.Op {
case "i":
msg.SetDocument(result.O)
doc = result.O
case "d":
msg.SetDocument(result.O)
doc = result.O
case "u":
doc, err := m.getOriginalDoc(result.O2)
doc, err = m.getOriginalDoc(result.O2)
if err != nil { // errors aren't fatal here, but we need to send it down the pipe
m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("Mongodb error (%s)", err.Error()), nil)
continue
}
msg.SetDocument(doc)
default:
m.pipe.Err <- NewError(ERROR, m.path, "Mongodb error (unknown op type)", nil)
continue
}

msg := message.NewMsg(message.OpTypeFromString(result.Op), doc)
msg.Timestamp = int64(result.Ts) >> 32

m.oplogTime = result.Ts
m.pipe.Send(msg)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/adaptor/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func init() {
Register("elasticsearch", "an elasticsearch sink adaptor", NewElasticsearch, dbConfig{})
Register("influx", "an InfluxDB sink adaptor", NewInfluxdb, dbConfig{})
Register("transformer", "an adaptor that transforms documents using a javascript function", NewTransformer, TransformerConfig{})
Register("rethinkdb", "a rethinkdb sink adaptor", NewRethinkdb, dbConfig{})
}

// Register registers an adaptor (database adaptor) for use with Transporter
Expand Down
18 changes: 15 additions & 3 deletions pkg/adaptor/rethinkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,19 @@ func (r *Rethinkdb) applyOp(msg *message.Msg) (*message.Msg, error) {
err error
)

if !msg.IsMap() {
r.pipe.Err <- NewError(ERROR, r.path, "Rethinkdb error (document must be a json document)", msg.Data)
return msg, nil
}
doc := msg.Map()

switch msg.Op {
case message.Delete:
resp, err = gorethink.Table(r.table).Get(msg.IDString()).Delete().RunWrite(r.client)
resp, err = gorethink.Table(r.table).Get(msg.IDString("id")).Delete().RunWrite(r.client)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is where people could get tripped up without a transformer to move the _id -> id in the doc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, we need to document the semantics of the adaptors wrt the ids..

case message.Insert:
resp, err = gorethink.Table(r.table).Insert(msg.Document()).RunWrite(r.client)
resp, err = gorethink.Table(r.table).Insert(doc).RunWrite(r.client)
case message.Update:
resp, err = gorethink.Table(r.table).Insert(msg.DocumentWithID("id"), gorethink.InsertOpts{Conflict: "replace"}).RunWrite(r.client)
resp, err = gorethink.Table(r.table).Insert(doc, gorethink.InsertOpts{Conflict: "replace"}).RunWrite(r.client)
}
if err != nil {
return msg, err
Expand All @@ -107,6 +113,9 @@ func (r *Rethinkdb) applyOp(msg *message.Msg) (*message.Msg, error) {

func (r *Rethinkdb) setupClient() (*gorethink.Session, error) {
// set up the clientConfig, we need host:port, username, password, and database name
if r.debug {
fmt.Printf("Connecting to %s\n", r.uri.Host)
}
client, err := gorethink.Connect(gorethink.ConnectOpts{
Address: r.uri.Host,
MaxIdle: 10,
Expand All @@ -116,6 +125,9 @@ func (r *Rethinkdb) setupClient() (*gorethink.Session, error) {
return nil, fmt.Errorf("Unable to connect: %s", err)
}

if r.debug {
fmt.Printf("dropping and creating table '%s' on database '%s'\n", r.table, r.database)
}
gorethink.Db(r.database).TableDrop(r.table).RunWrite(client)
gorethink.Db(r.database).TableCreate(r.table).RunWrite(client)

Expand Down
21 changes: 11 additions & 10 deletions pkg/adaptor/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,13 @@ func (t *Transformer) transformOne(msg *message.Msg) (*message.Msg, error) {
}

now := time.Now().Nanosecond()

if doc, err = mejson.Marshal(msg.Document()); err != nil {
t.pipe.Err <- t.transformerError(ERROR, err, msg)
return msg, nil
if msg.IsMap() {
if doc, err = mejson.Marshal(msg.Data); err != nil {
t.pipe.Err <- t.transformerError(ERROR, err, msg)
return msg, nil
}
} else {
doc = msg.Data
}

if value, err = t.vm.ToValue(doc); err != nil {
Expand Down Expand Up @@ -137,11 +140,9 @@ func (t *Transformer) transformOne(msg *message.Msg) (*message.Msg, error) {
t.pipe.Err <- t.transformerError(ERROR, err, msg)
return msg, nil
}
msg.SetDocument(doc)
msg.Data = doc
default:
if t.debug {
fmt.Println("transformer skipping doc")
}
msg.Data = r
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you're meaning to set msg.Data to the result type?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, got tripped up by the switch semantics

}

if t.debug {
Expand All @@ -154,9 +155,9 @@ func (t *Transformer) transformOne(msg *message.Msg) (*message.Msg, error) {

func (t *Transformer) transformerError(lvl ErrorLevel, err error, msg *message.Msg) error {
if e, ok := err.(*otto.Error); ok {
return NewError(lvl, t.path, fmt.Sprintf("Transformer error (%s)", e.String()), msg.Document())
return NewError(lvl, t.path, fmt.Sprintf("Transformer error (%s)", e.String()), msg.Data)
}
return NewError(lvl, t.path, fmt.Sprintf("Transformer error (%s)", err.Error()), msg.Document())
return NewError(lvl, t.path, fmt.Sprintf("Transformer error (%s)", err.Error()), msg.Data)
}

// TransformerConfig holds config options for a transformer adaptor
Expand Down
7 changes: 2 additions & 5 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package events
import (
"encoding/json"
"fmt"
// "time"

"gopkg.in/mgo.v2/bson"
)

// Event is an interface that describes data which is produced periodically by the running transporter.
Expand Down Expand Up @@ -99,14 +96,14 @@ type ErrorEvent struct {
Path string `json:"path"`

// Record is the document (if any) that was in progress when the error occured
Record bson.M `json:"record,omitempty"`
Record interface{} `json:"record,omitempty"`

// Message is the error message as a string
Message string `json:"message,omitempty"`
}

// NewErrorEvent are events sent to indicate a problem processing on one of the nodes
func NewErrorEvent(ts int64, path string, record bson.M, message string) *ErrorEvent {
func NewErrorEvent(ts int64, path string, record interface{}, message string) *ErrorEvent {
e := &ErrorEvent{
Ts: ts,
Kind: "error",
Expand Down
Loading