From 76ba21077e30353d63a891783c3ffc3bf6e69a3a Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Tue, 20 Feb 2024 21:12:41 +0100 Subject: [PATCH] chore!(inputs.tcp_listener): Remove deprecated plugin --- plugins/inputs/all/tcp_listener.go | 5 - plugins/inputs/tcp_listener/README.md | 37 -- plugins/inputs/tcp_listener/sample.conf | 4 - plugins/inputs/tcp_listener/tcp_listener.go | 304 ---------------- .../inputs/tcp_listener/tcp_listener_test.go | 337 ------------------ 5 files changed, 687 deletions(-) delete mode 100644 plugins/inputs/all/tcp_listener.go delete mode 100644 plugins/inputs/tcp_listener/README.md delete mode 100644 plugins/inputs/tcp_listener/sample.conf delete mode 100644 plugins/inputs/tcp_listener/tcp_listener.go delete mode 100644 plugins/inputs/tcp_listener/tcp_listener_test.go diff --git a/plugins/inputs/all/tcp_listener.go b/plugins/inputs/all/tcp_listener.go deleted file mode 100644 index 3b676e8c5dead..0000000000000 --- a/plugins/inputs/all/tcp_listener.go +++ /dev/null @@ -1,5 +0,0 @@ -//go:build !custom || inputs || inputs.tcp_listener - -package all - -import _ "github.com/influxdata/telegraf/plugins/inputs/tcp_listener" // register plugin diff --git a/plugins/inputs/tcp_listener/README.md b/plugins/inputs/tcp_listener/README.md deleted file mode 100644 index 31127014fab85..0000000000000 --- a/plugins/inputs/tcp_listener/README.md +++ /dev/null @@ -1,37 +0,0 @@ -# TCP Listener Input Plugin - -**DEPRECATED: As of version 1.3 the TCP listener plugin has been deprecated in -favor of the [socket_listener plugin](../socket_listener/README.md)** - -## Service Input - -This plugin is a service input. Normal plugins gather metrics determined by the -interval setting. Service plugins start a service to listens and waits for -metrics or events to occur. Service plugins have two key differences from -normal plugins: - -1. The global or plugin specific `interval` setting may not apply -2. The CLI options of `--test`, `--test-wait`, and `--once` may not produce - output for this plugin - -## Global configuration options - -In addition to the plugin-specific configuration settings, plugins support -additional global and plugin configuration settings. These settings are used to -modify metrics, tags, and field or create aliases and configure ordering, etc. -See the [CONFIGURATION.md][CONFIGURATION.md] for more details. - -[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins - -## Configuration - -```toml @sample.conf -# Generic TCP listener -[[inputs.tcp_listener]] - # socket_listener plugin - # see https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener -``` - -## Metrics - -## Example Output diff --git a/plugins/inputs/tcp_listener/sample.conf b/plugins/inputs/tcp_listener/sample.conf deleted file mode 100644 index 63d2654fabba2..0000000000000 --- a/plugins/inputs/tcp_listener/sample.conf +++ /dev/null @@ -1,4 +0,0 @@ -# Generic TCP listener -[[inputs.tcp_listener]] - # socket_listener plugin - # see https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener diff --git a/plugins/inputs/tcp_listener/tcp_listener.go b/plugins/inputs/tcp_listener/tcp_listener.go deleted file mode 100644 index 3dc974c0d53fb..0000000000000 --- a/plugins/inputs/tcp_listener/tcp_listener.go +++ /dev/null @@ -1,304 +0,0 @@ -//go:generate ../../../tools/readme_config_includer/generator -package tcp_listener - -import ( - "bufio" - _ "embed" - "fmt" - "net" - "sync" - - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/plugins/inputs" - "github.com/influxdata/telegraf/selfstat" -) - -//go:embed sample.conf -var sampleConfig string - -type TCPListener struct { - ServiceAddress string - AllowedPendingMessages int - MaxTCPConnections int `toml:"max_tcp_connections"` - - sync.Mutex - // Lock for preventing a data race during resource cleanup - cleanup sync.Mutex - wg sync.WaitGroup - - in chan []byte - done chan struct{} - // accept channel tracks how many active connections there are, if there - // is an available bool in accept, then we are below the maximum and can - // accept the connection - accept chan bool - // drops tracks the number of dropped metrics. - drops int - // malformed tracks the number of malformed packets - malformed int - - // track the listener here so we can close it in Stop() - listener *net.TCPListener - // track current connections so we can close them in Stop() - conns map[string]*net.TCPConn - - parser telegraf.Parser - acc telegraf.Accumulator - - MaxConnections selfstat.Stat - CurrentConnections selfstat.Stat - TotalConnections selfstat.Stat - PacketsRecv selfstat.Stat - BytesRecv selfstat.Stat - - Log telegraf.Logger -} - -var dropwarn = "tcp_listener message queue full. " + - "We have dropped %d messages so far. " + - "You may want to increase allowed_pending_messages in the config" - -var malformedwarn = "tcp_listener has received %d malformed packets" + - " thus far." - -func (*TCPListener) SampleConfig() string { - return sampleConfig -} - -// All the work is done in the Start() function, so this is just a dummy -// function. -func (t *TCPListener) Gather(_ telegraf.Accumulator) error { - return nil -} - -func (t *TCPListener) SetParser(parser telegraf.Parser) { - t.parser = parser -} - -// Start starts the tcp listener service. -func (t *TCPListener) Start(acc telegraf.Accumulator) error { - t.Lock() - defer t.Unlock() - - t.Log.Warn("DEPRECATED: the TCP listener plugin has been deprecated " + - "in favor of the socket_listener plugin " + - "(https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener)") - - tags := map[string]string{ - "address": t.ServiceAddress, - } - t.MaxConnections = selfstat.Register("tcp_listener", "max_connections", tags) - t.MaxConnections.Set(int64(t.MaxTCPConnections)) - t.CurrentConnections = selfstat.Register("tcp_listener", "current_connections", tags) - t.TotalConnections = selfstat.Register("tcp_listener", "total_connections", tags) - t.PacketsRecv = selfstat.Register("tcp_listener", "packets_received", tags) - t.BytesRecv = selfstat.Register("tcp_listener", "bytes_received", tags) - - t.acc = acc - t.in = make(chan []byte, t.AllowedPendingMessages) - t.done = make(chan struct{}) - t.accept = make(chan bool, t.MaxTCPConnections) - t.conns = make(map[string]*net.TCPConn) - for i := 0; i < t.MaxTCPConnections; i++ { - t.accept <- true - } - - // Start listener - var err error - address, _ := net.ResolveTCPAddr("tcp", t.ServiceAddress) - t.listener, err = net.ListenTCP("tcp", address) - if err != nil { - t.Log.Errorf("Failed to listen: %s", err.Error()) - return err - } - - t.wg.Add(2) - go t.tcpListen() - go t.tcpParser() - - t.Log.Infof("Started TCP listener service on %q", t.ServiceAddress) - return nil -} - -// Stop cleans up all resources -func (t *TCPListener) Stop() { - t.Lock() - defer t.Unlock() - close(t.done) - - t.listener.Close() - - // Close all open TCP connections - // - get all conns from the t.conns map and put into slice - // - this is so the forget() function doesn't conflict with looping - // over the t.conns map - t.cleanup.Lock() - conns := make([]*net.TCPConn, 0, len(t.conns)) - for _, conn := range t.conns { - conns = append(conns, conn) - } - t.cleanup.Unlock() - for _, conn := range conns { - conn.Close() - } - - t.wg.Wait() - close(t.in) - t.Log.Infof("Stopped TCP listener service on %q", t.ServiceAddress) -} - -// tcpListen listens for incoming TCP connections. -func (t *TCPListener) tcpListen() { - defer t.wg.Done() - - for { - select { - case <-t.done: - return - default: - // Accept connection: - conn, err := t.listener.AcceptTCP() - if err != nil { - t.Log.Errorf("accepting TCP connection failed: %v", err) - return - } - - select { - case <-t.accept: - // generate a random id for this TCPConn - id, err := internal.RandomString(6) - if err != nil { - t.Log.Errorf("generating a random id for TCP connection failed: %v", err) - return - } - - // not over connection limit, handle the connection properly. - t.wg.Add(1) - - t.remember(id, conn) - go t.handler(conn, id) - default: - // We are over the connection limit, refuse & close. - t.refuser(conn) - } - } - } -} - -// refuser refuses a TCP connection -func (t *TCPListener) refuser(conn *net.TCPConn) { - // Tell the connection why we are closing. - fmt.Fprintf(conn, "Telegraf maximum concurrent TCP connections (%d)"+ - " reached, closing.\nYou may want to increase max_tcp_connections in"+ - " the Telegraf tcp listener configuration.\n", t.MaxTCPConnections) - - conn.Close() - t.Log.Infof("Refused TCP Connection from %s", conn.RemoteAddr()) - t.Log.Warn("Maximum TCP Connections reached, you may want to adjust max_tcp_connections") -} - -// handler handles a single TCP Connection -func (t *TCPListener) handler(conn *net.TCPConn, id string) { - t.CurrentConnections.Incr(1) - t.TotalConnections.Incr(1) - // connection cleanup function - defer func() { - t.wg.Done() - if err := conn.Close(); err != nil { - t.acc.AddError(err) - } - // Add one connection potential back to channel when this one closes - t.accept <- true - t.forget(id) - t.CurrentConnections.Incr(-1) - }() - - var n int - scanner := bufio.NewScanner(conn) - for { - select { - case <-t.done: - return - default: - if !scanner.Scan() { - return - } - n = len(scanner.Bytes()) - if n == 0 { - continue - } - t.BytesRecv.Incr(int64(n)) - t.PacketsRecv.Incr(1) - bufCopy := make([]byte, n+1) - copy(bufCopy, scanner.Bytes()) - bufCopy[n] = '\n' - - select { - case t.in <- bufCopy: - default: - t.drops++ - if t.drops == 1 || t.drops%t.AllowedPendingMessages == 0 { - t.Log.Errorf(dropwarn, t.drops) - } - } - } - } -} - -// tcpParser parses the incoming tcp byte packets -func (t *TCPListener) tcpParser() { - defer t.wg.Done() - - var packet []byte - var metrics []telegraf.Metric - var err error - for { - select { - case <-t.done: - // drain input packets before finishing: - if len(t.in) == 0 { - return - } - case packet = <-t.in: - if len(packet) == 0 { - continue - } - metrics, err = t.parser.Parse(packet) - if err == nil { - for _, m := range metrics { - t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) - } - } else { - t.malformed++ - if t.malformed == 1 || t.malformed%1000 == 0 { - t.Log.Errorf(malformedwarn, t.malformed) - } - } - } - } -} - -// forget a TCP connection -func (t *TCPListener) forget(id string) { - t.cleanup.Lock() - defer t.cleanup.Unlock() - delete(t.conns, id) -} - -// remember a TCP connection -func (t *TCPListener) remember(id string, conn *net.TCPConn) { - t.cleanup.Lock() - defer t.cleanup.Unlock() - t.conns[id] = conn -} - -func init() { - inputs.Add("tcp_listener", func() telegraf.Input { - return &TCPListener{ - ServiceAddress: ":8094", - AllowedPendingMessages: 10000, - MaxTCPConnections: 250, - } - }) -} diff --git a/plugins/inputs/tcp_listener/tcp_listener_test.go b/plugins/inputs/tcp_listener/tcp_listener_test.go deleted file mode 100644 index 020aec8578c79..0000000000000 --- a/plugins/inputs/tcp_listener/tcp_listener_test.go +++ /dev/null @@ -1,337 +0,0 @@ -package tcp_listener - -import ( - "fmt" - "io" - "net" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/influxdata/telegraf/plugins/parsers/graphite" - "github.com/influxdata/telegraf/plugins/parsers/influx" - "github.com/influxdata/telegraf/plugins/parsers/json" - "github.com/influxdata/telegraf/testutil" -) - -const ( - testMsg = "cpu_load_short,host=server01 value=12.0 1422568543702900257\n" - - testMsgs = ` -cpu_load_short,host=server02 value=12.0 1422568543702900257 -cpu_load_short,host=server03 value=12.0 1422568543702900257 -cpu_load_short,host=server04 value=12.0 1422568543702900257 -cpu_load_short,host=server05 value=12.0 1422568543702900257 -cpu_load_short,host=server06 value=12.0 1422568543702900257 -` -) - -func newTestTCPListener() (*TCPListener, chan []byte) { - in := make(chan []byte, 1500) - listener := &TCPListener{ - Log: testutil.Logger{}, - ServiceAddress: "localhost:8194", - AllowedPendingMessages: 10000, - MaxTCPConnections: 250, - in: in, - done: make(chan struct{}), - } - return listener, in -} - -// benchmark how long it takes to accept & process 100,000 metrics: -func BenchmarkTCP(b *testing.B) { - listener := TCPListener{ - Log: testutil.Logger{}, - ServiceAddress: "localhost:8198", - AllowedPendingMessages: 100000, - MaxTCPConnections: 250, - } - parser := &influx.Parser{} - require.NoError(b, parser.Init()) - listener.parser = parser - - acc := &testutil.Accumulator{Discard: true} - - // send multiple messages to socket - for n := 0; n < b.N; n++ { - require.NoError(b, listener.Start(acc)) - - conn, err := net.Dial("tcp", "127.0.0.1:8198") - require.NoError(b, err) - for i := 0; i < 100000; i++ { - _, err := fmt.Fprint(conn, testMsg) - require.NoError(b, err) - } - require.NoError(b, conn.(*net.TCPConn).CloseWrite()) - // wait for all 100,000 metrics to be processed - buf := []byte{0} - // will EOF when completed - _, err = conn.Read(buf) - require.NoError(b, err) - listener.Stop() - } -} - -func TestHighTrafficTCP(t *testing.T) { - listener := TCPListener{ - Log: testutil.Logger{}, - ServiceAddress: "localhost:8199", - AllowedPendingMessages: 100000, - MaxTCPConnections: 250, - } - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - listener.parser = parser - acc := &testutil.Accumulator{} - - // send multiple messages to socket - require.NoError(t, listener.Start(acc)) - - conn, err := net.Dial("tcp", "127.0.0.1:8199") - require.NoError(t, err) - for i := 0; i < 100000; i++ { - _, err := fmt.Fprint(conn, testMsg) - require.NoError(t, err) - } - require.NoError(t, conn.(*net.TCPConn).CloseWrite()) - buf := []byte{0} - _, err = conn.Read(buf) - require.Equal(t, err, io.EOF) - listener.Stop() - - require.Equal(t, 100000, int(acc.NMetrics())) -} - -func TestConnectTCP(t *testing.T) { - listener := TCPListener{ - Log: testutil.Logger{}, - ServiceAddress: "localhost:8194", - AllowedPendingMessages: 10000, - MaxTCPConnections: 250, - } - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - listener.parser = parser - - acc := &testutil.Accumulator{} - require.NoError(t, listener.Start(acc)) - defer listener.Stop() - - conn, err := net.Dial("tcp", "127.0.0.1:8194") - require.NoError(t, err) - - // send single message to socket - _, err = fmt.Fprint(conn, testMsg) - require.NoError(t, err) - acc.Wait(1) - acc.AssertContainsTaggedFields(t, "cpu_load_short", - map[string]interface{}{"value": float64(12)}, - map[string]string{"host": "server01"}, - ) - - // send multiple messages to socket - _, err = fmt.Fprint(conn, testMsgs) - require.NoError(t, err) - acc.Wait(6) - hostTags := []string{"server02", "server03", - "server04", "server05", "server06"} - for _, hostTag := range hostTags { - acc.AssertContainsTaggedFields(t, "cpu_load_short", - map[string]interface{}{"value": float64(12)}, - map[string]string{"host": hostTag}, - ) - } -} - -// Test that MaxTCPConnections is respected -func TestConcurrentConns(t *testing.T) { - listener := TCPListener{ - Log: testutil.Logger{}, - ServiceAddress: "localhost:8195", - AllowedPendingMessages: 10000, - MaxTCPConnections: 2, - } - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - listener.parser = parser - - acc := &testutil.Accumulator{} - require.NoError(t, listener.Start(acc)) - defer listener.Stop() - - _, err := net.Dial("tcp", "127.0.0.1:8195") - require.NoError(t, err) - _, err = net.Dial("tcp", "127.0.0.1:8195") - require.NoError(t, err) - - // Connection over the limit: - conn, err := net.Dial("tcp", "127.0.0.1:8195") - require.NoError(t, err) - _, err = net.Dial("tcp", "127.0.0.1:8195") - require.NoError(t, err) - buf := make([]byte, 1500) - n, err := conn.Read(buf) - require.NoError(t, err) - require.Equal(t, - "Telegraf maximum concurrent TCP connections (2) reached, closing.\n"+ - "You may want to increase max_tcp_connections in"+ - " the Telegraf tcp listener configuration.\n", - string(buf[:n])) - - _, err = conn.Read(buf) - require.Equal(t, io.EOF, err) -} - -// Test that MaxTCPConnections is respected when max==1 -func TestConcurrentConns1(t *testing.T) { - listener := TCPListener{ - Log: testutil.Logger{}, - ServiceAddress: "localhost:8196", - AllowedPendingMessages: 10000, - MaxTCPConnections: 1, - } - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - listener.parser = parser - - acc := &testutil.Accumulator{} - require.NoError(t, listener.Start(acc)) - defer listener.Stop() - - _, err := net.Dial("tcp", "127.0.0.1:8196") - require.NoError(t, err) - - // Connection over the limit: - conn, err := net.Dial("tcp", "127.0.0.1:8196") - require.NoError(t, err) - _, err = net.Dial("tcp", "127.0.0.1:8196") - require.NoError(t, err) - buf := make([]byte, 1500) - n, err := conn.Read(buf) - require.NoError(t, err) - require.Equal(t, - "Telegraf maximum concurrent TCP connections (1) reached, closing.\n"+ - "You may want to increase max_tcp_connections in"+ - " the Telegraf tcp listener configuration.\n", - string(buf[:n])) - - _, err = conn.Read(buf) - require.Equal(t, io.EOF, err) -} - -// Test that MaxTCPConnections is respected -func TestCloseConcurrentConns(t *testing.T) { - listener := TCPListener{ - Log: testutil.Logger{}, - ServiceAddress: "localhost:8195", - AllowedPendingMessages: 10000, - MaxTCPConnections: 2, - } - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - listener.parser = parser - - acc := &testutil.Accumulator{} - require.NoError(t, listener.Start(acc)) - - _, err := net.Dial("tcp", "127.0.0.1:8195") - require.NoError(t, err) - _, err = net.Dial("tcp", "127.0.0.1:8195") - require.NoError(t, err) - - listener.Stop() -} - -func TestRunParser(t *testing.T) { - var testmsg = []byte(testMsg) - - listener, in := newTestTCPListener() - acc := testutil.Accumulator{} - listener.acc = &acc - defer close(listener.done) - - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - listener.parser = parser - listener.wg.Add(1) - go listener.tcpParser() - - in <- testmsg - require.NoError(t, listener.Gather(&acc)) - - acc.Wait(1) - acc.AssertContainsTaggedFields(t, "cpu_load_short", - map[string]interface{}{"value": float64(12)}, - map[string]string{"host": "server01"}, - ) -} - -func TestRunParserInvalidMsg(t *testing.T) { - var testmsg = []byte("cpu_load_short") - - logger := &testutil.CaptureLogger{} - - listener, in := newTestTCPListener() - listener.Log = logger - listener.acc = &testutil.Accumulator{} - - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - listener.parser = parser - listener.wg.Add(1) - - go listener.tcpParser() - in <- testmsg - - listener.Stop() - require.Contains(t, logger.LastError(), "tcp_listener has received 1 malformed packets thus far.") -} - -func TestRunParserGraphiteMsg(t *testing.T) { - var testmsg = []byte("cpu.load.graphite 12 1454780029") - - listener, in := newTestTCPListener() - acc := testutil.Accumulator{} - listener.acc = &acc - defer close(listener.done) - - p := graphite.Parser{Separator: "_", Templates: []string{}} - require.NoError(t, p.Init()) - listener.parser = &p - listener.wg.Add(1) - go listener.tcpParser() - - in <- testmsg - require.NoError(t, listener.Gather(&acc)) - - acc.Wait(1) - acc.AssertContainsFields(t, "cpu_load_graphite", - map[string]interface{}{"value": float64(12)}) -} - -func TestRunParserJSONMsg(t *testing.T) { - var testmsg = []byte("{\"a\": 5, \"b\": {\"c\": 6}}\n") - - listener, in := newTestTCPListener() - acc := testutil.Accumulator{} - listener.acc = &acc - defer close(listener.done) - - parser := &json.Parser{MetricName: "udp_json_test"} - require.NoError(t, parser.Init()) - listener.parser = parser - - listener.wg.Add(1) - go listener.tcpParser() - - in <- testmsg - require.NoError(t, listener.Gather(&acc)) - - acc.Wait(1) - acc.AssertContainsFields(t, "udp_json_test", - map[string]interface{}{ - "a": float64(5), - "b_c": float64(6), - }) -}