Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

29 noid #37

Merged
merged 6 commits into from
Dec 31, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions pkg/adaptor/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,19 @@ func (e *Elasticsearch) applyOp(msg *message.Msg) (*message.Msg, error) {
if msg.Op == message.Command {
err := e.runCommand(msg)
if err != nil {
e.pipe.Err <- NewError(ERROR, e.path, fmt.Sprintf("Elasticsearch error (%s)", err), msg.Document())
e.pipe.Err <- NewError(ERROR, e.path, fmt.Sprintf("Elasticsearch error (%s)", err), msg.Data)
}
return msg, nil
}

err := e.indexer.Index(e.index, e._type, msg.IDString(), "", nil, msg.Document(), false)
// TODO there might be some inconsistency here. elasticsearch uses the _id field for an primary index,
// and we're just mapping it to a string here.
id, err := msg.IDString("_id")
if err != nil {
id = ""
}
err = e.indexer.Index(e.index, e._type, id, "", nil, msg.Data, false)
if err != nil {
e.pipe.Err <- NewError(ERROR, e.path, fmt.Sprintf("Elasticsearch error (%s)", err), msg.Document())
e.pipe.Err <- NewError(ERROR, e.path, fmt.Sprintf("Elasticsearch error (%s)", err), msg.Data)
}
return msg, nil
}
Expand Down Expand Up @@ -133,7 +138,11 @@ func (e *Elasticsearch) setupClient() {
}

func (e *Elasticsearch) runCommand(msg *message.Msg) error {
if _, hasKey := msg.Document()["flush"]; hasKey {
if !msg.IsMap() {
return nil
}

if _, hasKey := msg.Map()["flush"]; hasKey {
e.indexer.Flush()
}
return nil
Expand Down
6 changes: 2 additions & 4 deletions pkg/adaptor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package adaptor

import (
"fmt"

"gopkg.in/mgo.v2/bson"
)

// Adaptor errors have levels to indicate their severity.
Expand Down Expand Up @@ -47,11 +45,11 @@ type Error struct {
Lvl ErrorLevel
Str string
Path string
Record bson.M
Record interface{}
}

// NewError creates an Error type with the specificed level, path, message and record
func NewError(lvl ErrorLevel, path, str string, record bson.M) Error {
func NewError(lvl ErrorLevel, path, str string, record interface{}) Error {
return Error{Lvl: lvl, Path: path, Str: str, Record: record}
}

Expand Down
25 changes: 15 additions & 10 deletions pkg/adaptor/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,7 @@ func (d *File) Stop() error {
return nil
}

/*
* read each message from the file
*/
// read each message from the file
func (d *File) readFile() (err error) {
filename := strings.Replace(d.uri, "file://", "", 1)
d.filehandle, err = os.Open(filename)
Expand All @@ -100,18 +98,25 @@ func (d *File) readFile() (err error) {
* dump each message to the file
*/
func (d *File) dumpMessage(msg *message.Msg) (*message.Msg, error) {
jdoc, err := json.Marshal(msg.Document())
if err != nil {
d.pipe.Err <- NewError(ERROR, d.path, fmt.Sprintf("Can't unmarshal document (%s)", err.Error()), msg.Document())
return msg, nil
var line string

if msg.IsMap() {
ba, err := json.Marshal(msg.Map())
if err != nil {
d.pipe.Err <- NewError(ERROR, d.path, fmt.Sprintf("Can't unmarshal document (%s)", err.Error()), msg.Data)
return msg, nil
}
line = string(ba)
} else {
line = fmt.Sprintf("%v", msg.Data)
}

if strings.HasPrefix(d.uri, "stdout://") {
fmt.Println(string(jdoc))
fmt.Println(line)
} else {
_, err = fmt.Fprintln(d.filehandle, string(jdoc))
_, err := fmt.Fprintln(d.filehandle, line)
if err != nil {
d.pipe.Err <- NewError(ERROR, d.path, fmt.Sprintf("Can't unmarshal document (%s)", err.Error()), msg.Document())
d.pipe.Err <- NewError(ERROR, d.path, fmt.Sprintf("Error writing to file (%s)", err.Error()), msg.Data)
return msg, nil
}
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/adaptor/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,19 @@ func (i *Influxdb) Stop() error {
func (i *Influxdb) applyOp(msg *message.Msg) (*message.Msg, error) {
switch msg.Op {
case message.Insert:
docSize := len(msg.Document())
columns := make([]string, 0, docSize)
if !msg.IsMap() {
i.pipe.Err <- NewError(ERROR, i.path, "Influxdb error (document must be a json document)", msg.Data)
return msg, nil
}
doc := msg.Map()

sz := len(doc)
columns := make([]string, 0, sz)
points := make([][]interface{}, 1)
points[0] = make([]interface{}, 0, docSize)
for k := range msg.Document() {
points[0] = make([]interface{}, 0, sz)
for k, v := range doc {
columns = append(columns, k)
points[0] = append(points[0], msg.Document()[k])
points[0] = append(points[0], v)
}
series := &client.Series{
Name: i.seriesName,
Expand Down
28 changes: 19 additions & 9 deletions pkg/adaptor/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,20 @@ func (m *Mongodb) Stop() error {
// caller should pipe the error
func (m *Mongodb) writeMessage(msg *message.Msg) (*message.Msg, error) {
collection := m.mongoSession.DB(m.database).C(m.collection)
err := collection.Insert(msg.Document())

if !msg.IsMap() {
m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("Mongodb error (document must be a bson document, got %T instead)", msg.Data), msg.Data)
return msg, nil
}

doc := msg.Map()

err := collection.Insert(doc)
if mgo.IsDup(err) {
err = collection.Update(bson.M{"_id": msg.ID}, msg.Document())
err = collection.Update(bson.M{"_id": doc["_id"]}, doc)
}
if err != nil {
m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("Mongodb error (%s)", err.Error()), msg.Document())
m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("Mongodb error (%s)", err.Error()), msg.Data)
}
return msg, nil
}
Expand Down Expand Up @@ -192,25 +200,27 @@ func (m *Mongodb) tailData() (err error) {
return
}
if result.validOp() {
msg := message.NewMsg(message.OpTypeFromString(result.Op), nil)
msg.Timestamp = int64(result.Ts) >> 32

var doc bson.M
switch result.Op {
case "i":
msg.SetDocument(result.O)
doc = result.O
case "d":
msg.SetDocument(result.O)
doc = result.O
case "u":
doc, err := m.getOriginalDoc(result.O2)
doc, err = m.getOriginalDoc(result.O2)
if err != nil { // errors aren't fatal here, but we need to send it down the pipe
m.pipe.Err <- NewError(ERROR, m.path, fmt.Sprintf("Mongodb error (%s)", err.Error()), nil)
continue
}
msg.SetDocument(doc)
default:
m.pipe.Err <- NewError(ERROR, m.path, "Mongodb error (unknown op type)", nil)
continue
}

msg := message.NewMsg(message.OpTypeFromString(result.Op), doc)
msg.Timestamp = int64(result.Ts) >> 32

m.oplogTime = result.Ts
m.pipe.Send(msg)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/adaptor/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func init() {
Register("elasticsearch", "an elasticsearch sink adaptor", NewElasticsearch, dbConfig{})
Register("influx", "an InfluxDB sink adaptor", NewInfluxdb, dbConfig{})
Register("transformer", "an adaptor that transforms documents using a javascript function", NewTransformer, TransformerConfig{})
Register("rethinkdb", "a rethinkdb sink adaptor", NewRethinkdb, dbConfig{})
}

// Register registers an adaptor (database adaptor) for use with Transporter
Expand Down
33 changes: 28 additions & 5 deletions pkg/adaptor/rethinkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,43 @@ func (r *Rethinkdb) applyOp(msg *message.Msg) (*message.Msg, error) {
err error
)

if !msg.IsMap() {
r.pipe.Err <- NewError(ERROR, r.path, "Rethinkdb error (document must be a json document)", msg.Data)
return msg, nil
}
doc := msg.Map()

switch msg.Op {
case message.Delete:
resp, err = gorethink.Table(r.table).Get(msg.IDString()).Delete().RunWrite(r.client)
id, err := msg.IDString("id")
if err != nil {
r.pipe.Err <- NewError(ERROR, r.path, "Rethinkdb error (cannot delete an object with a nil id)", msg.Data)
return msg, nil
}
resp, err = gorethink.Table(r.table).Get(id).Delete().RunWrite(r.client)
case message.Insert:
resp, err = gorethink.Table(r.table).Insert(msg.Document()).RunWrite(r.client)
resp, err = gorethink.Table(r.table).Insert(doc).RunWrite(r.client)
case message.Update:
resp, err = gorethink.Table(r.table).Insert(msg.DocumentWithID("id"), gorethink.InsertOpts{Conflict: "replace"}).RunWrite(r.client)
resp, err = gorethink.Table(r.table).Insert(doc, gorethink.InsertOpts{Conflict: "replace"}).RunWrite(r.client)
}
if err != nil {
r.pipe.Err <- NewError(ERROR, r.path, "Rethinkdb error (%s)", err)
return msg, nil
}

err = r.handleResponse(&resp)
if err != nil {
return msg, err
r.pipe.Err <- NewError(ERROR, r.path, "Rethinkdb error (%s)", err)
}

return msg, r.handleResponse(&resp)
return msg, nil
}

func (r *Rethinkdb) setupClient() (*gorethink.Session, error) {
// set up the clientConfig, we need host:port, username, password, and database name
if r.debug {
fmt.Printf("Connecting to %s\n", r.uri.Host)
}
client, err := gorethink.Connect(gorethink.ConnectOpts{
Address: r.uri.Host,
MaxIdle: 10,
Expand All @@ -116,6 +136,9 @@ func (r *Rethinkdb) setupClient() (*gorethink.Session, error) {
return nil, fmt.Errorf("Unable to connect: %s", err)
}

if r.debug {
fmt.Printf("dropping and creating table '%s' on database '%s'\n", r.table, r.database)
}
gorethink.Db(r.database).TableDrop(r.table).RunWrite(client)
gorethink.Db(r.database).TableCreate(r.table).RunWrite(client)

Expand Down
42 changes: 28 additions & 14 deletions pkg/adaptor/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Transformer struct {
}

// NewTransformer creates a new transformer object
func NewTransformer(p *pipe.Pipe, path string, extra Config) (StopStartListener, error) {
func NewTransformer(pipe *pipe.Pipe, path string, extra Config) (StopStartListener, error) {
var (
conf TransformerConfig
err error
Expand All @@ -36,7 +36,7 @@ func NewTransformer(p *pipe.Pipe, path string, extra Config) (StopStartListener,
return nil, err
}

t := &Transformer{pipe: p, path: path}
t := &Transformer{pipe: pipe, path: path}

if conf.Filename == "" {
return t, fmt.Errorf("No filename specified")
Expand All @@ -56,6 +56,15 @@ func NewTransformer(p *pipe.Pipe, path string, extra Config) (StopStartListener,
// transformers it into mejson, and then uses the supplied javascript module.exports function
// to transform the document. The document is then emited to this adaptor's children
func (t *Transformer) Listen() (err error) {
if err = t.initEnvironment(); err != nil {
return err
}

return t.pipe.Listen(t.transformOne)
}

// initEvironment prepares the javascript vm and compiles the transformer script
func (t *Transformer) initEnvironment() (err error) {
t.vm = otto.New()

// set up the vm environment, make `module = {}`
Expand All @@ -73,8 +82,7 @@ func (t *Transformer) Listen() (err error) {
if err != nil {
return t.transformerError(CRITICAL, err, nil)
}

return t.pipe.Listen(t.transformOne)
return
}

// Start the adaptor as a source (not implemented for this adaptor)
Expand Down Expand Up @@ -104,10 +112,13 @@ func (t *Transformer) transformOne(msg *message.Msg) (*message.Msg, error) {
}

now := time.Now().Nanosecond()

if doc, err = mejson.Marshal(msg.Document()); err != nil {
t.pipe.Err <- t.transformerError(ERROR, err, msg)
return msg, nil
if msg.IsMap() {
if doc, err = mejson.Marshal(msg.Data); err != nil {
t.pipe.Err <- t.transformerError(ERROR, err, msg)
return msg, nil
}
} else {
doc = msg.Data
}

if value, err = t.vm.ToValue(doc); err != nil {
Expand Down Expand Up @@ -137,11 +148,9 @@ func (t *Transformer) transformOne(msg *message.Msg) (*message.Msg, error) {
t.pipe.Err <- t.transformerError(ERROR, err, msg)
return msg, nil
}
msg.SetDocument(doc)
msg.Data = map[string]interface{}(doc)
default:
if t.debug {
fmt.Println("transformer skipping doc")
}
msg.Data = r
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you're meaning to set msg.Data to the result type?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, got tripped up by the switch semantics

}

if t.debug {
Expand All @@ -153,10 +162,15 @@ func (t *Transformer) transformOne(msg *message.Msg) (*message.Msg, error) {
}

func (t *Transformer) transformerError(lvl ErrorLevel, err error, msg *message.Msg) error {
var data interface{}
if msg != nil {
data = msg.Data
}

if e, ok := err.(*otto.Error); ok {
return NewError(lvl, t.path, fmt.Sprintf("Transformer error (%s)", e.String()), msg.Document())
return NewError(lvl, t.path, fmt.Sprintf("Transformer error (%s)", e.String()), data)
}
return NewError(lvl, t.path, fmt.Sprintf("Transformer error (%s)", err.Error()), msg.Document())
return NewError(lvl, t.path, fmt.Sprintf("Transformer error (%s)", err.Error()), data)
}

// TransformerConfig holds config options for a transformer adaptor
Expand Down
Loading