Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generic UDP line-protocol service input #758

Merged
merged 3 commits into from
Mar 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/system"
_ "github.com/influxdata/telegraf/plugins/inputs/trig"
_ "github.com/influxdata/telegraf/plugins/inputs/twemproxy"
_ "github.com/influxdata/telegraf/plugins/inputs/udp_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/win_perf_counters"
_ "github.com/influxdata/telegraf/plugins/inputs/zfs"
_ "github.com/influxdata/telegraf/plugins/inputs/zookeeper"
Expand Down
31 changes: 31 additions & 0 deletions plugins/inputs/udp_listener/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# UDP listener service input plugin

The UDP listener is a service input plugin that listens for messages on a UDP
socket and adds those messages to InfluxDB.
The plugin expects messages in the
[Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).

### Configuration:

This is a sample configuration for the plugin.

```toml
[[inputs.udp_listener]]
## Address and port to host UDP listener on
service_address = ":8125"

## Number of UDP messages allowed to queue up. Once filled, the
## UDP listener will start dropping packets.
allowed_pending_messages = 10000

## UDP packet size for the server to listen for. This will depend
## on the size of the packets that the client is sending, which is
## usually 1500 bytes.
udp_packet_size = 1500

## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```
154 changes: 154 additions & 0 deletions plugins/inputs/udp_listener/udp_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package udp_listener

import (
"log"
"net"
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)

type UdpListener struct {
ServiceAddress string
UDPPacketSize int `toml:"udp_packet_size"`
AllowedPendingMessages int
sync.Mutex

in chan []byte
done chan struct{}

parser parsers.Parser

// Keep the accumulator in this struct
acc telegraf.Accumulator
}

const UDP_PACKET_SIZE int = 1500

var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
"You may want to increase allowed_pending_messages in the config\n"

const sampleConfig = `
## Address and port to host UDP listener on
service_address = ":8125"

## Number of UDP messages allowed to queue up. Once filled, the
## UDP listener will start dropping packets.
allowed_pending_messages = 10000

## UDP packet size for the server to listen for. This will depend
## on the size of the packets that the client is sending, which is
## usually 1500 bytes.
udp_packet_size = 1500

## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`

func (u *UdpListener) SampleConfig() string {
return sampleConfig
}

func (u *UdpListener) Description() string {
return "Generic UDP listener"
}

// All the work is done in the Start() function, so this is just a dummy
// function.
func (u *UdpListener) Gather(_ telegraf.Accumulator) error {
return nil
}

func (u *UdpListener) SetParser(parser parsers.Parser) {
u.parser = parser
}

func (u *UdpListener) Start(acc telegraf.Accumulator) error {
u.Lock()
defer u.Unlock()

u.acc = acc
u.in = make(chan []byte, u.AllowedPendingMessages)
u.done = make(chan struct{})

go u.udpListen()
go u.udpParser()

log.Printf("Started UDP listener service on %s\n", u.ServiceAddress)
return nil
}

func (u *UdpListener) Stop() {
u.Lock()
defer u.Unlock()
close(u.done)
close(u.in)
log.Println("Stopped UDP listener service on ", u.ServiceAddress)
}

func (u *UdpListener) udpListen() error {
address, _ := net.ResolveUDPAddr("udp", u.ServiceAddress)
listener, err := net.ListenUDP("udp", address)
if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err)
}
defer listener.Close()
log.Println("UDP server listening on: ", listener.LocalAddr().String())

for {
select {
case <-u.done:
return nil
default:
buf := make([]byte, u.UDPPacketSize)
n, _, err := listener.ReadFromUDP(buf)
if err != nil {
log.Printf("ERROR: %s\n", err.Error())
}

select {
case u.in <- buf[:n]:
default:
log.Printf(dropwarn, string(buf[:n]))
}
}
}
}

func (u *UdpListener) udpParser() error {
for {
select {
case <-u.done:
return nil
case packet := <-u.in:
metrics, err := u.parser.Parse(packet)
if err == nil {
u.storeMetrics(metrics)
} else {
log.Printf("Malformed packet: [%s], Error: %s\n", packet, err)
}
}
}
}

func (u *UdpListener) storeMetrics(metrics []telegraf.Metric) error {
u.Lock()
defer u.Unlock()
for _, m := range metrics {
u.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
return nil
}

func init() {
inputs.Add("udp_listener", func() telegraf.Input {
return &UdpListener{
UDPPacketSize: UDP_PACKET_SIZE,
}
})
}
112 changes: 112 additions & 0 deletions plugins/inputs/udp_listener/udp_listener_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package udp_listener

import (
"io/ioutil"
"log"
"testing"
"time"

"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
)

func newTestUdpListener() (*UdpListener, chan []byte) {
in := make(chan []byte, 1500)
listener := &UdpListener{
ServiceAddress: ":8125",
UDPPacketSize: 1500,
AllowedPendingMessages: 10000,
in: in,
done: make(chan struct{}),
}
return listener, in
}

func TestRunParser(t *testing.T) {
log.SetOutput(ioutil.Discard)
var testmsg = []byte("cpu_load_short,host=server01 value=12.0 1422568543702900257")

listener, in := newTestUdpListener()
acc := testutil.Accumulator{}
listener.acc = &acc
defer close(listener.done)

listener.parser, _ = parsers.NewInfluxParser()
go listener.udpParser()

in <- testmsg
time.Sleep(time.Millisecond * 25)
listener.Gather(&acc)

if a := acc.NFields(); a != 1 {
t.Errorf("got %v, expected %v", a, 1)
}

acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"},
)
}

func TestRunParserInvalidMsg(t *testing.T) {
log.SetOutput(ioutil.Discard)
var testmsg = []byte("cpu_load_short")

listener, in := newTestUdpListener()
acc := testutil.Accumulator{}
listener.acc = &acc
defer close(listener.done)

listener.parser, _ = parsers.NewInfluxParser()
go listener.udpParser()

in <- testmsg
time.Sleep(time.Millisecond * 25)

if a := acc.NFields(); a != 0 {
t.Errorf("got %v, expected %v", a, 0)
}
}

func TestRunParserGraphiteMsg(t *testing.T) {
log.SetOutput(ioutil.Discard)
var testmsg = []byte("cpu.load.graphite 12 1454780029")

listener, in := newTestUdpListener()
acc := testutil.Accumulator{}
listener.acc = &acc
defer close(listener.done)

listener.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
go listener.udpParser()

in <- testmsg
time.Sleep(time.Millisecond * 25)
listener.Gather(&acc)

acc.AssertContainsFields(t, "cpu_load_graphite",
map[string]interface{}{"value": float64(12)})
}

func TestRunParserJSONMsg(t *testing.T) {
log.SetOutput(ioutil.Discard)
var testmsg = []byte("{\"a\": 5, \"b\": {\"c\": 6}}\n")

listener, in := newTestUdpListener()
acc := testutil.Accumulator{}
listener.acc = &acc
defer close(listener.done)

listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
go listener.udpParser()

in <- testmsg
time.Sleep(time.Millisecond * 25)
listener.Gather(&acc)

acc.AssertContainsFields(t, "udp_json_test",
map[string]interface{}{
"a": float64(5),
"b_c": float64(6),
})
}