Skip to content

Commit

Permalink
Add more config options for stdin,file,http
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Nov 20, 2017
1 parent bd39c4c commit 50380aa
Show file tree
Hide file tree
Showing 20 changed files with 906 additions and 541 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ tracker
TODO
target
vendor
key.pem
cert.pem
160 changes: 24 additions & 136 deletions lib/input/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ package input
import (
"bufio"
"os"
"sync/atomic"
"time"

"github.com/jeffail/benthos/lib/types"
"github.com/jeffail/util/log"
"github.com/jeffail/util/metrics"
)
Expand All @@ -39,161 +36,52 @@ func init() {
description: `
The file type reads input from a file. If multipart is set to false each line
is read as a separate message. If multipart is set to true each line is read as
a message part, and an empty line indicates the end of a message.`,
a message part, and an empty line indicates the end of a message.
Alternatively, a custom delimiter can be set that is used instead of line
breaks.`,
}
}

//------------------------------------------------------------------------------

// FileConfig is configuration values for the File input type.
type FileConfig struct {
Path string `json:"path" yaml:"path"`
Multipart bool `json:"multipart" yaml:"multipart"`
MaxBuffer int `json:"max_buffer" yaml:"max_buffer"`
Path string `json:"path" yaml:"path"`
Multipart bool `json:"multipart" yaml:"multipart"`
MaxBuffer int `json:"max_buffer" yaml:"max_buffer"`
CustomDelim string `json:"custom_delimiter" yaml:"custom_delimiter"`
}

// NewFileConfig creates a new FileConfig with default values.
func NewFileConfig() FileConfig {
return FileConfig{
Path: "",
Multipart: false,
MaxBuffer: bufio.MaxScanTokenSize,
Path: "",
Multipart: false,
MaxBuffer: bufio.MaxScanTokenSize,
CustomDelim: "",
}
}

//------------------------------------------------------------------------------

// File is an input type that reads lines from a file, creating a message per
// line.
type File struct {
running int32

conf Config
log log.Modular
stats metrics.Type

messages chan types.Message
responses <-chan types.Response

closeChan chan struct{}
closedChan chan struct{}
}

// NewFile creates a new File input type.
func NewFile(conf Config, log log.Modular, stats metrics.Type) (Type, error) {
f := File{
running: 1,
conf: conf,
log: log.NewModule(".input.file"),
stats: stats,
messages: make(chan types.Message),
responses: nil,
closeChan: make(chan struct{}),
closedChan: make(chan struct{}),
}
return &f, nil
}

//------------------------------------------------------------------------------

// loop is an internal loop that brokers incoming messages to output pipe.
func (f *File) loop() {
defer func() {
atomic.StoreInt32(&f.running, 0)

close(f.messages)
close(f.closedChan)
}()

file, err := os.Open(f.conf.File.Path)
file, err := os.Open(conf.File.Path)
if err != nil {
f.log.Errorf("Read %v error: %v\n", f.conf.File.Path, err)
return
}
defer file.Close()

scanner := bufio.NewScanner(file)
if f.conf.File.MaxBuffer != bufio.MaxScanTokenSize {
scanner.Buffer([]byte{}, f.conf.File.MaxBuffer)
return nil, err
}

f.log.Infof("Reading messages from: %v\n", f.conf.File.Path)

var partsToSend, parts [][]byte

for atomic.LoadInt32(&f.running) == 1 {
if len(partsToSend) == 0 {
if !scanner.Scan() {
if err = scanner.Err(); err != nil {
f.log.Errorf("File read error: %v\n", err)
}
return
}
data := make([]byte, len(scanner.Bytes()))
copy(data, scanner.Bytes())
if len(data) > 0 {
if f.conf.File.Multipart {
parts = append(parts, data)
} else {
partsToSend = append(partsToSend, data)
}
} else if f.conf.File.Multipart {
partsToSend = parts
parts = nil
}
f.stats.Incr("input.file.count", 1)
}
if len(partsToSend) > 0 {
select {
case f.messages <- types.Message{Parts: partsToSend}:
case <-f.closeChan:
return
}
res, open := <-f.responses
if !open {
return
}
if res.Error() == nil {
f.stats.Incr("input.file.send.success", 1)
partsToSend = nil
} else {
f.stats.Incr("input.file.send.error", 1)
}
}
}
}

// StartListening sets the channel used by the input to validate message
// receipt.
func (f *File) StartListening(responses <-chan types.Response) error {
if f.responses != nil {
return types.ErrAlreadyStarted
}
f.responses = responses
go f.loop()
return nil
}

// MessageChan returns the messages channel.
func (f *File) MessageChan() <-chan types.Message {
return f.messages
}

// CloseAsync shuts down the File input and stops processing requests.
func (f *File) CloseAsync() {
if atomic.CompareAndSwapInt32(&f.running, 1, 0) {
close(f.closeChan)
}
}

// WaitForClose blocks until the File input has closed down.
func (f *File) WaitForClose(timeout time.Duration) error {
select {
case <-f.closedChan:
case <-time.After(timeout):
return types.ErrTimeout
delim := []byte("\n")
if len(conf.File.CustomDelim) > 0 {
delim = []byte(conf.File.CustomDelim)
}
return nil
return newReader(
file,
conf.File.MaxBuffer,
conf.File.Multipart,
delim,
log, stats,
)
}

//------------------------------------------------------------------------------
6 changes: 4 additions & 2 deletions lib/input/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,15 @@ func NewHTTPServer(conf Config, log log.Modular, stats metrics.Type) (Type, erro
//------------------------------------------------------------------------------

func (h *HTTPServer) postHandler(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

if atomic.LoadInt32(&h.running) != 1 {
http.Error(w, "Server closing", http.StatusServiceUnavailable)
return
}

h.stats.Incr("input.http_server.count", 1)

if r.Method != "POST" {
http.Error(w, "Incorrect method", http.StatusMethodNotAllowed)
return
Expand Down Expand Up @@ -169,8 +173,6 @@ func (h *HTTPServer) postHandler(w http.ResponseWriter, r *http.Request) {
msg.Parts = [][]byte{msgBytes}
}

h.stats.Incr("input.http_server.count", 1)

select {
case h.messages <- msg:
case <-time.After(time.Millisecond * time.Duration(h.conf.HTTPServer.TimeoutMS)):
Expand Down
34 changes: 0 additions & 34 deletions lib/input/interface_test.go

This file was deleted.

Loading

0 comments on commit 50380aa

Please sign in to comment.