diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 5af18fcffbba0..262de37acba6c 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -49,6 +49,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/system" _ "github.com/influxdata/telegraf/plugins/inputs/trig" _ "github.com/influxdata/telegraf/plugins/inputs/twemproxy" + _ "github.com/influxdata/telegraf/plugins/inputs/udp_listener" _ "github.com/influxdata/telegraf/plugins/inputs/win_perf_counters" _ "github.com/influxdata/telegraf/plugins/inputs/zfs" _ "github.com/influxdata/telegraf/plugins/inputs/zookeeper" diff --git a/plugins/inputs/udp_listener/README.md b/plugins/inputs/udp_listener/README.md new file mode 100644 index 0000000000000..e2fe846f9f6d3 --- /dev/null +++ b/plugins/inputs/udp_listener/README.md @@ -0,0 +1,31 @@ +# UDP listener service input plugin + +The UDP listener is a service input plugin that listens for messages on a UDP +socket and adds those messages to InfluxDB. +The plugin expects messages in the +[Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md). + +### Configuration: + +This is a sample configuration for the plugin. + +```toml +[[inputs.udp_listener]] + ## Address and port to host UDP listener on + service_address = ":8125" + + ## Number of UDP messages allowed to queue up. Once filled, the + ## UDP listener will start dropping packets. + allowed_pending_messages = 10000 + + ## UDP packet size for the server to listen for. This will depend + ## on the size of the packets that the client is sending, which is + ## usually 1500 bytes. + udp_packet_size = 1500 + + ## Data format to consume. This can be "json", "influx" or "graphite" + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +``` diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go new file mode 100644 index 0000000000000..c6d483d4848a7 --- /dev/null +++ b/plugins/inputs/udp_listener/udp_listener.go @@ -0,0 +1,154 @@ +package udp_listener + +import ( + "log" + "net" + "sync" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" +) + +type UdpListener struct { + ServiceAddress string + UDPPacketSize int `toml:"udp_packet_size"` + AllowedPendingMessages int + sync.Mutex + + in chan []byte + done chan struct{} + + parser parsers.Parser + + // Keep the accumulator in this struct + acc telegraf.Accumulator +} + +const UDP_PACKET_SIZE int = 1500 + +var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + + "You may want to increase allowed_pending_messages in the config\n" + +const sampleConfig = ` + ## Address and port to host UDP listener on + service_address = ":8125" + + ## Number of UDP messages allowed to queue up. Once filled, the + ## UDP listener will start dropping packets. + allowed_pending_messages = 10000 + + ## UDP packet size for the server to listen for. This will depend + ## on the size of the packets that the client is sending, which is + ## usually 1500 bytes. + udp_packet_size = 1500 + + ## Data format to consume. This can be "json", "influx" or "graphite" + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" +` + +func (u *UdpListener) SampleConfig() string { + return sampleConfig +} + +func (u *UdpListener) Description() string { + return "Generic UDP listener" +} + +// All the work is done in the Start() function, so this is just a dummy +// function. +func (u *UdpListener) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (u *UdpListener) SetParser(parser parsers.Parser) { + u.parser = parser +} + +func (u *UdpListener) Start(acc telegraf.Accumulator) error { + u.Lock() + defer u.Unlock() + + u.acc = acc + u.in = make(chan []byte, u.AllowedPendingMessages) + u.done = make(chan struct{}) + + go u.udpListen() + go u.udpParser() + + log.Printf("Started UDP listener service on %s\n", u.ServiceAddress) + return nil +} + +func (u *UdpListener) Stop() { + u.Lock() + defer u.Unlock() + close(u.done) + close(u.in) + log.Println("Stopped UDP listener service on ", u.ServiceAddress) +} + +func (u *UdpListener) udpListen() error { + address, _ := net.ResolveUDPAddr("udp", u.ServiceAddress) + listener, err := net.ListenUDP("udp", address) + if err != nil { + log.Fatalf("ERROR: ListenUDP - %s", err) + } + defer listener.Close() + log.Println("UDP server listening on: ", listener.LocalAddr().String()) + + for { + select { + case <-u.done: + return nil + default: + buf := make([]byte, u.UDPPacketSize) + n, _, err := listener.ReadFromUDP(buf) + if err != nil { + log.Printf("ERROR: %s\n", err.Error()) + } + + select { + case u.in <- buf[:n]: + default: + log.Printf(dropwarn, string(buf[:n])) + } + } + } +} + +func (u *UdpListener) udpParser() error { + for { + select { + case <-u.done: + return nil + case packet := <-u.in: + metrics, err := u.parser.Parse(packet) + if err == nil { + u.storeMetrics(metrics) + } else { + log.Printf("Malformed packet: [%s], Error: %s\n", packet, err) + } + } + } +} + +func (u *UdpListener) storeMetrics(metrics []telegraf.Metric) error { + u.Lock() + defer u.Unlock() + for _, m := range metrics { + u.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } + return nil +} + +func init() { + inputs.Add("udp_listener", func() telegraf.Input { + return &UdpListener{ + UDPPacketSize: UDP_PACKET_SIZE, + } + }) +} diff --git a/plugins/inputs/udp_listener/udp_listener_test.go b/plugins/inputs/udp_listener/udp_listener_test.go new file mode 100644 index 0000000000000..2f0f6fae513a2 --- /dev/null +++ b/plugins/inputs/udp_listener/udp_listener_test.go @@ -0,0 +1,112 @@ +package udp_listener + +import ( + "io/ioutil" + "log" + "testing" + "time" + + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/testutil" +) + +func newTestUdpListener() (*UdpListener, chan []byte) { + in := make(chan []byte, 1500) + listener := &UdpListener{ + ServiceAddress: ":8125", + UDPPacketSize: 1500, + AllowedPendingMessages: 10000, + in: in, + done: make(chan struct{}), + } + return listener, in +} + +func TestRunParser(t *testing.T) { + log.SetOutput(ioutil.Discard) + var testmsg = []byte("cpu_load_short,host=server01 value=12.0 1422568543702900257") + + listener, in := newTestUdpListener() + acc := testutil.Accumulator{} + listener.acc = &acc + defer close(listener.done) + + listener.parser, _ = parsers.NewInfluxParser() + go listener.udpParser() + + in <- testmsg + time.Sleep(time.Millisecond * 25) + listener.Gather(&acc) + + if a := acc.NFields(); a != 1 { + t.Errorf("got %v, expected %v", a, 1) + } + + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": "server01"}, + ) +} + +func TestRunParserInvalidMsg(t *testing.T) { + log.SetOutput(ioutil.Discard) + var testmsg = []byte("cpu_load_short") + + listener, in := newTestUdpListener() + acc := testutil.Accumulator{} + listener.acc = &acc + defer close(listener.done) + + listener.parser, _ = parsers.NewInfluxParser() + go listener.udpParser() + + in <- testmsg + time.Sleep(time.Millisecond * 25) + + if a := acc.NFields(); a != 0 { + t.Errorf("got %v, expected %v", a, 0) + } +} + +func TestRunParserGraphiteMsg(t *testing.T) { + log.SetOutput(ioutil.Discard) + var testmsg = []byte("cpu.load.graphite 12 1454780029") + + listener, in := newTestUdpListener() + acc := testutil.Accumulator{} + listener.acc = &acc + defer close(listener.done) + + listener.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) + go listener.udpParser() + + in <- testmsg + time.Sleep(time.Millisecond * 25) + listener.Gather(&acc) + + acc.AssertContainsFields(t, "cpu_load_graphite", + map[string]interface{}{"value": float64(12)}) +} + +func TestRunParserJSONMsg(t *testing.T) { + log.SetOutput(ioutil.Discard) + var testmsg = []byte("{\"a\": 5, \"b\": {\"c\": 6}}\n") + + listener, in := newTestUdpListener() + acc := testutil.Accumulator{} + listener.acc = &acc + defer close(listener.done) + + listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil) + go listener.udpParser() + + in <- testmsg + time.Sleep(time.Millisecond * 25) + listener.Gather(&acc) + + acc.AssertContainsFields(t, "udp_json_test", + map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }) +}