From 0e6b4f05f9a2f1626dbd278fce8f240441c9fabd Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Fri, 23 Feb 2024 20:20:39 +0100 Subject: [PATCH] chore(inputs.syslog): Use common/socket implementation (#14837) --- internal/syslog/framing.go | 63 -- internal/syslog/framing_test.go | 44 -- plugins/common/socket/datagram.go | 107 +++- plugins/common/socket/socket.conf | 37 ++ plugins/common/socket/socket.go | 124 +--- plugins/common/socket/socket_test.go | 366 +++++++++++- .../socket/{sample.conf => splitter.conf} | 38 -- plugins/common/socket/splitters.go | 86 +++ plugins/common/socket/stream.go | 216 +++++-- plugins/inputs/socket_listener/sample.conf.in | 4 +- .../inputs/socket_listener/socket_listener.go | 9 +- plugins/inputs/syslog/README.md | 58 +- plugins/inputs/syslog/sample.conf | 56 +- plugins/inputs/syslog/sample.conf.in | 39 ++ plugins/inputs/syslog/syslog.go | 545 +++++++----------- plugins/inputs/syslog/syslog_test.go | 204 ++++--- plugins/outputs/syslog/syslog.go | 17 +- plugins/outputs/syslog/syslog_test.go | 14 +- testutil/capturelog.go | 6 + 19 files changed, 1268 insertions(+), 765 deletions(-) delete mode 100644 internal/syslog/framing.go delete mode 100644 internal/syslog/framing_test.go create mode 100644 plugins/common/socket/socket.conf rename plugins/common/socket/{sample.conf => splitter.conf} (57%) create mode 100644 plugins/inputs/syslog/sample.conf.in diff --git a/internal/syslog/framing.go b/internal/syslog/framing.go deleted file mode 100644 index aad9b58d28816..0000000000000 --- a/internal/syslog/framing.go +++ /dev/null @@ -1,63 +0,0 @@ -package syslog - -import ( - "errors" - "strings" -) - -// Framing represents the framing technique we expect the messages to come. -type Framing int - -const ( - // OctetCounting indicates the transparent framing technique for syslog transport. - OctetCounting Framing = iota - // NonTransparent indicates the non-transparent framing technique for syslog transport. - NonTransparent -) - -func (f Framing) String() string { - switch f { - case OctetCounting: - return "OCTET-COUNTING" - case NonTransparent: - return "NON-TRANSPARENT" - } - return "" -} - -// UnmarshalTOML implements ability to unmarshal framing from TOML files. -func (f *Framing) UnmarshalTOML(data []byte) error { - return f.UnmarshalText(data) -} - -// UnmarshalText implements encoding.TextUnmarshaler -func (f *Framing) UnmarshalText(data []byte) error { - s := string(data) - switch strings.ToUpper(s) { - case `OCTET-COUNTING`: - fallthrough - case `"OCTET-COUNTING"`: - fallthrough - case `'OCTET-COUNTING'`: - *f = OctetCounting - return nil - case `NON-TRANSPARENT`: - fallthrough - case `"NON-TRANSPARENT"`: - fallthrough - case `'NON-TRANSPARENT'`: - *f = NonTransparent - return nil - } - *f = -1 - return errors.New("unknown framing") -} - -// MarshalText implements encoding.TextMarshaller -func (f Framing) MarshalText() ([]byte, error) { - s := f.String() - if s != "" { - return []byte(s), nil - } - return nil, errors.New("unknown framing") -} diff --git a/internal/syslog/framing_test.go b/internal/syslog/framing_test.go deleted file mode 100644 index 8aa39dad1f9c7..0000000000000 --- a/internal/syslog/framing_test.go +++ /dev/null @@ -1,44 +0,0 @@ -package syslog - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestFraming(t *testing.T) { - var f1 Framing - err := f1.UnmarshalTOML([]byte(`"non-transparent"`)) - require.NoError(t, err) - require.Equal(t, NonTransparent, f1) - - var f2 Framing - err = f2.UnmarshalTOML([]byte(`non-transparent`)) - require.NoError(t, err) - require.Equal(t, NonTransparent, f2) - - var f3 Framing - err = f3.UnmarshalTOML([]byte(`'non-transparent'`)) - require.NoError(t, err) - require.Equal(t, NonTransparent, f3) - - var f4 Framing - err = f4.UnmarshalTOML([]byte(`"octet-counting"`)) - require.NoError(t, err) - require.Equal(t, OctetCounting, f4) - - var f5 Framing - err = f5.UnmarshalTOML([]byte(`octet-counting`)) - require.NoError(t, err) - require.Equal(t, OctetCounting, f5) - - var f6 Framing - err = f6.UnmarshalTOML([]byte(`'octet-counting'`)) - require.NoError(t, err) - require.Equal(t, OctetCounting, f6) - - var f7 Framing - err = f7.UnmarshalTOML([]byte(`nope`)) - require.Error(t, err) - require.Equal(t, Framing(-1), f7) -} diff --git a/plugins/common/socket/datagram.go b/plugins/common/socket/datagram.go index d7999fcf7eec1..e07b0f53164d8 100644 --- a/plugins/common/socket/datagram.go +++ b/plugins/common/socket/datagram.go @@ -3,11 +3,15 @@ package socket import ( "errors" "fmt" + "io" "net" "net/url" "os" + "path/filepath" + "runtime" "strconv" "strings" + "sync" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -18,48 +22,101 @@ type packetListener struct { MaxDecompressionSize int64 SocketMode string ReadBufferSize int - OnData CallbackData - OnError CallbackError Log telegraf.Logger conn net.PacketConn decoder internal.ContentDecoder path string + wg sync.WaitGroup } -func (l *packetListener) listen() { - buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet - for { - n, _, err := l.conn.ReadFrom(buf) - if err != nil { - if !strings.HasSuffix(err.Error(), ": use of closed network connection") { - if l.OnError != nil { - l.OnError(err) +func (l *packetListener) listenData(onData CallbackData, onError CallbackError) { + l.wg.Add(1) + + go func() { + defer l.wg.Done() + + buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet + for { + n, src, err := l.conn.ReadFrom(buf) + if err != nil { + if !strings.HasSuffix(err.Error(), ": use of closed network connection") { + if onError != nil { + onError(err) + } } + break + } + + body, err := l.decoder.Decode(buf[:n]) + if err != nil && onError != nil { + onError(fmt.Errorf("unable to decode incoming packet: %w", err)) } - break - } - body, err := l.decoder.Decode(buf[:n]) - if err != nil && l.OnError != nil { - l.OnError(fmt.Errorf("unable to decode incoming packet: %w", err)) + if l.path != "" { + src = &net.UnixAddr{Name: l.path, Net: "unixgram"} + } + onData(src, body) } + }() +} - l.OnData(body) - } +func (l *packetListener) listenConnection(onConnection CallbackConnection, onError CallbackError) { + l.wg.Add(1) + go func() { + defer l.wg.Done() + defer l.conn.Close() + + buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet + for { + // Wait for packets and read them + n, src, err := l.conn.ReadFrom(buf) + if err != nil { + if !strings.HasSuffix(err.Error(), ": use of closed network connection") { + if onError != nil { + onError(err) + } + } + break + } + + // Decode the contents depending on the given encoding + body, err := l.decoder.Decode(buf[:n]) + if err != nil && onError != nil { + onError(fmt.Errorf("unable to decode incoming packet: %w", err)) + } + + // Workaround to provide remote endpoints for Unix-type sockets + if l.path != "" { + src = &net.UnixAddr{Name: l.path, Net: "unixgram"} + } + + // Create a pipe and notify the caller via Callback that new data is + // available. Afterwards write the data. Please note: Write() will + // blocks until all data is consumed! + reader, writer := io.Pipe() + go onConnection(src, reader) + if _, err := writer.Write(body); err != nil && onError != nil { + onError(err) + } + writer.Close() + } + }() } func (l *packetListener) setupUnixgram(u *url.URL, socketMode string) error { - err := os.Remove(u.Path) - if err != nil && !errors.Is(err, os.ErrNotExist) { + l.path = filepath.FromSlash(u.Path) + if runtime.GOOS == "windows" && strings.Contains(l.path, ":") { + l.path = strings.TrimPrefix(l.path, `\`) + } + if err := os.Remove(l.path); err != nil && !errors.Is(err, os.ErrNotExist) { return fmt.Errorf("removing socket failed: %w", err) } - conn, err := net.ListenPacket(u.Scheme, u.Path) + conn, err := net.ListenPacket(u.Scheme, l.path) if err != nil { return fmt.Errorf("listening (unixgram) failed: %w", err) } - l.path = u.Path l.conn = conn // Set permissions on socket @@ -167,10 +224,14 @@ func (l *packetListener) close() error { if err := l.conn.Close(); err != nil { return err } + l.wg.Wait() if l.path != "" { - err := os.Remove(l.path) - if err != nil && !errors.Is(err, os.ErrNotExist) { + fn := filepath.FromSlash(l.path) + if runtime.GOOS == "windows" && strings.Contains(fn, ":") { + fn = strings.TrimPrefix(fn, `\`) + } + if err := os.Remove(fn); err != nil && !errors.Is(err, os.ErrNotExist) { // Ignore file-not-exists errors when removing the socket return err } diff --git a/plugins/common/socket/socket.conf b/plugins/common/socket/socket.conf new file mode 100644 index 0000000000000..cbe7cf66cc904 --- /dev/null +++ b/plugins/common/socket/socket.conf @@ -0,0 +1,37 @@ + ## Permission for unix sockets (only available on unix sockets) + ## This setting may not be respected by some platforms. To safely restrict + ## permissions it is recommended to place the socket into a previously + ## created directory with the desired permissions. + ## ex: socket_mode = "777" + # socket_mode = "" + + ## Maximum number of concurrent connections (only available on stream sockets like TCP) + ## Zero means unlimited. + # max_connections = 0 + + ## Read timeout (only available on stream sockets like TCP) + ## Zero means unlimited. + # read_timeout = "0s" + + ## Optional TLS configuration (only available on stream sockets like TCP) + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Enables client authentication if set. + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + + ## Maximum socket buffer size (in bytes when no unit specified) + ## For stream sockets, once the buffer fills up, the sender will start + ## backing up. For datagram sockets, once the buffer fills up, metrics will + ## start dropping. Defaults to the OS default. + # read_buffer_size = "64KiB" + + ## Period between keep alive probes (only applies to TCP sockets) + ## Zero disables keep alive probes. Defaults to the OS configuration. + # keep_alive_period = "5m" + + ## Content encoding for message payloads + ## Can be set to "gzip" for compressed payloads or "identity" for no encoding. + # content_encoding = "identity" + + ## Maximum size of decoded packet (in bytes when no unit specified) + # max_decompression_size = "500MB" \ No newline at end of file diff --git a/plugins/common/socket/socket.go b/plugins/common/socket/socket.go index 19fd1e043d363..8b067140ffae4 100644 --- a/plugins/common/socket/socket.go +++ b/plugins/common/socket/socket.go @@ -3,37 +3,29 @@ package socket import ( "bufio" "crypto/tls" - "encoding/binary" - "encoding/hex" "fmt" + "io" "net" "net/url" "regexp" "strings" - "sync" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" tlsint "github.com/influxdata/telegraf/plugins/common/tls" ) +type CallbackData func(net.Addr, []byte) +type CallbackConnection func(net.Addr, io.ReadCloser) +type CallbackError func(error) + type listener interface { address() net.Addr - listen() + listenData(CallbackData, CallbackError) + listenConnection(CallbackConnection, CallbackError) close() error } -type lengthFieldSpec struct { - Offset int64 `toml:"offset"` - Bytes int64 `toml:"bytes"` - Endianness string `toml:"endianness"` - HeaderLength int64 `toml:"header_length"` - converter func([]byte) int -} - -type CallbackData func([]byte) -type CallbackError func(error) - type Config struct { MaxConnections int `toml:"max_connections"` ReadBufferSize config.Size `toml:"read_buffer_size"` @@ -42,10 +34,6 @@ type Config struct { SocketMode string `toml:"socket_mode"` ContentEncoding string `toml:"content_encoding"` MaxDecompressionSize config.Size `toml:"max_decompression_size"` - SplittingStrategy string `toml:"splitting_strategy"` - SplittingDelimiter string `toml:"splitting_delimiter"` - SplittingLength int `toml:"splitting_length"` - SplittingLengthField lengthFieldSpec `toml:"splitting_length_field"` tlsint.ServerConfig } @@ -58,79 +46,22 @@ type Socket struct { log telegraf.Logger splitter bufio.SplitFunc - wg sync.WaitGroup - listener listener } -func (cfg *Config) NewSocket(address string, logger telegraf.Logger) (*Socket, error) { +func (cfg *Config) NewSocket(address string, splitcfg *SplitConfig, logger telegraf.Logger) (*Socket, error) { s := &Socket{ Config: *cfg, log: logger, } - switch s.SplittingStrategy { - case "", "newline": - s.splitter = bufio.ScanLines - case "null": - s.splitter = scanNull - case "delimiter": - re := regexp.MustCompile(`(\s*0?x)`) - d := re.ReplaceAllString(strings.ToLower(s.SplittingDelimiter), "") - delimiter, err := hex.DecodeString(d) + // Setup the splitter if given + if splitcfg != nil { + splitter, err := splitcfg.NewSplitter() if err != nil { - return nil, fmt.Errorf("decoding delimiter failed: %w", err) + return nil, err } - s.splitter = createScanDelimiter(delimiter) - case "fixed length": - s.splitter = createScanFixedLength(s.SplittingLength) - case "variable length": - // Create the converter function - var order binary.ByteOrder - switch strings.ToLower(s.SplittingLengthField.Endianness) { - case "", "be": - order = binary.BigEndian - case "le": - order = binary.LittleEndian - default: - return nil, fmt.Errorf("invalid 'endianness' %q", s.SplittingLengthField.Endianness) - } - - switch s.SplittingLengthField.Bytes { - case 1: - s.SplittingLengthField.converter = func(b []byte) int { - return int(b[0]) - } - case 2: - s.SplittingLengthField.converter = func(b []byte) int { - return int(order.Uint16(b)) - } - case 4: - s.SplittingLengthField.converter = func(b []byte) int { - return int(order.Uint32(b)) - } - case 8: - s.SplittingLengthField.converter = func(b []byte) int { - return int(order.Uint64(b)) - } - default: - s.SplittingLengthField.converter = func(b []byte) int { - buf := make([]byte, 8) - start := 0 - if order == binary.BigEndian { - start = 8 - len(b) - } - for i := 0; i < len(b); i++ { - buf[start+i] = b[i] - } - return int(order.Uint64(buf)) - } - } - - // Check if we have enough bytes in the header - s.splitter = createScanVariableLength(s.SplittingLengthField) - default: - return nil, fmt.Errorf("unknown 'splitting_strategy' %q", s.SplittingStrategy) + s.splitter = splitter } // Resolve the interface to an address if any given @@ -164,7 +95,7 @@ func (cfg *Config) NewSocket(address string, logger telegraf.Logger) (*Socket, e return s, nil } -func (s *Socket) Listen(onData CallbackData, onError CallbackError) error { +func (s *Socket) Setup() error { switch s.url.Scheme { case "tcp", "tcp4", "tcp6": l := &streamListener{ @@ -174,8 +105,6 @@ func (s *Socket) Listen(onData CallbackData, onError CallbackError) error { MaxConnections: s.MaxConnections, Encoding: s.ContentEncoding, Splitter: s.splitter, - OnData: onData, - OnError: onError, Log: s.log, } @@ -191,8 +120,6 @@ func (s *Socket) Listen(onData CallbackData, onError CallbackError) error { MaxConnections: s.MaxConnections, Encoding: s.ContentEncoding, Splitter: s.splitter, - OnData: onData, - OnError: onError, Log: s.log, } @@ -204,8 +131,6 @@ func (s *Socket) Listen(onData CallbackData, onError CallbackError) error { l := &packetListener{ Encoding: s.ContentEncoding, MaxDecompressionSize: int64(s.MaxDecompressionSize), - OnData: onData, - OnError: onError, } if err := l.setupUDP(s.url, s.interfaceName, int(s.ReadBufferSize)); err != nil { return err @@ -215,8 +140,6 @@ func (s *Socket) Listen(onData CallbackData, onError CallbackError) error { l := &packetListener{ Encoding: s.ContentEncoding, MaxDecompressionSize: int64(s.MaxDecompressionSize), - OnData: onData, - OnError: onError, } if err := l.setupIP(s.url); err != nil { return err @@ -226,8 +149,6 @@ func (s *Socket) Listen(onData CallbackData, onError CallbackError) error { l := &packetListener{ Encoding: s.ContentEncoding, MaxDecompressionSize: int64(s.MaxDecompressionSize), - OnData: onData, - OnError: onError, } if err := l.setupUnixgram(s.url, s.SocketMode); err != nil { return err @@ -241,8 +162,6 @@ func (s *Socket) Listen(onData CallbackData, onError CallbackError) error { MaxConnections: s.MaxConnections, Encoding: s.ContentEncoding, Splitter: s.splitter, - OnData: onData, - OnError: onError, Log: s.log, } @@ -254,15 +173,17 @@ func (s *Socket) Listen(onData CallbackData, onError CallbackError) error { return fmt.Errorf("unknown protocol %q", s.url.Scheme) } - s.wg.Add(1) - go func() { - defer s.wg.Done() - s.listener.listen() - }() - return nil } +func (s *Socket) Listen(onData CallbackData, onError CallbackError) { + s.listener.listenData(onData, onError) +} + +func (s *Socket) ListenConnection(onConnection CallbackConnection, onError CallbackError) { + s.listener.listenConnection(onConnection, onError) +} + func (s *Socket) Close() { if s.listener != nil { // Ignore the returned error as we cannot do anything about it anyway @@ -271,7 +192,6 @@ func (s *Socket) Close() { } s.listener = nil } - s.wg.Wait() } func (s *Socket) Address() net.Addr { diff --git a/plugins/common/socket/socket_test.go b/plugins/common/socket/socket_test.go index 9334422d16048..9592359022ff8 100644 --- a/plugins/common/socket/socket_test.go +++ b/plugins/common/socket/socket_test.go @@ -24,12 +24,12 @@ import ( var pki = testutil.NewPKI("../../../testutil/pki") -func TestSocketListener(t *testing.T) { +func TestListenData(t *testing.T) { messages := [][]byte{ []byte("test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n"), []byte("test,foo=zab v=3i 123456791\n"), } - expected := []telegraf.Metric{ + expectedTemplates := []telegraf.Metric{ metric.New( "test", map[string]string{"foo": "bar"}, @@ -111,6 +111,7 @@ func TestSocketListener(t *testing.T) { proto := strings.TrimSuffix(tt.schema, "+tls") // Prepare the address and socket if needed + var sockPath string var serviceAddress string var tlsCfg *tls.Config switch proto { @@ -122,11 +123,11 @@ func TestSocketListener(t *testing.T) { } // Create a socket - fn := testutil.TempSocket(t) - f, err := os.Create(fn) + sockPath = testutil.TempSocket(t) + f, err := os.Create(sockPath) require.NoError(t, err) defer f.Close() - serviceAddress = proto + "://" + fn + serviceAddress = proto + "://" + sockPath } // Setup the configuration according to test specification @@ -142,7 +143,7 @@ func TestSocketListener(t *testing.T) { } // Create the socket - sock, err := cfg.NewSocket(serviceAddress, &testutil.Logger{}) + sock, err := cfg.NewSocket(serviceAddress, &SplitConfig{}, &testutil.Logger{}) require.NoError(t, err) // Create callbacks @@ -150,9 +151,16 @@ func TestSocketListener(t *testing.T) { require.NoError(t, parser.Init()) var acc testutil.Accumulator - onData := func(data []byte) { + onData := func(remote net.Addr, data []byte) { m, err := parser.Parse(data) require.NoError(t, err) + addr, _, err := net.SplitHostPort(remote.String()) + if err != nil { + addr = remote.String() + } + for i := range m { + m[i].AddTag("source", addr) + } acc.AddMetrics(m) } onError := func(err error) { @@ -160,7 +168,8 @@ func TestSocketListener(t *testing.T) { } // Start the listener - require.NoError(t, sock.Listen(onData, onError)) + require.NoError(t, sock.Setup()) + sock.Listen(onData, onError) defer sock.Close() addr := sock.Address() @@ -175,6 +184,24 @@ func TestSocketListener(t *testing.T) { client, err = createClient(serviceAddress, addr, tlsCfg) require.NoError(t, err) + // Conditionally add the source address to the expectation + expected := make([]telegraf.Metric, 0, len(expectedTemplates)) + for _, tmpl := range expectedTemplates { + m := tmpl.Copy() + switch proto { + case "tcp", "udp": + laddr := client.LocalAddr().String() + addr, _, err := net.SplitHostPort(laddr) + if err != nil { + addr = laddr + } + m.AddTag("source", addr) + case "unix", "unixgram": + m.AddTag("source", sockPath) + } + expected = append(expected, m) + } + // Send the data with the correct encoding encoder, err := internal.NewContentEncoder(tt.encoding) require.NoError(t, err) @@ -192,13 +219,219 @@ func TestSocketListener(t *testing.T) { defer acc.Unlock() return acc.NMetrics() >= uint64(len(expected)) }, time.Second, 100*time.Millisecond, "did not receive metrics (%d)", acc.NMetrics()) + actual := acc.GetTelegrafMetrics() testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics()) }) } } -func TestSocketListenerStream(t *testing.T) { +func TestListenConnection(t *testing.T) { + messages := [][]byte{ + []byte("test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n"), + []byte("test,foo=zab v=3i 123456791\n"), + } + expectedTemplates := []telegraf.Metric{ + metric.New( + "test", + map[string]string{"foo": "bar"}, + map[string]interface{}{"v": int64(1)}, + time.Unix(0, 123456789), + ), + metric.New( + "test", + map[string]string{"foo": "baz"}, + map[string]interface{}{"v": int64(2)}, + time.Unix(0, 123456790), + ), + metric.New( + "test", + map[string]string{"foo": "zab"}, + map[string]interface{}{"v": int64(3)}, + time.Unix(0, 123456791), + ), + } + + tests := []struct { + name string + schema string + buffersize config.Size + encoding string + }{ + { + name: "TCP", + schema: "tcp", + buffersize: config.Size(1024), + }, + { + name: "TCP with TLS", + schema: "tcp+tls", + }, + { + name: "TCP with gzip encoding", + schema: "tcp", + buffersize: config.Size(1024), + encoding: "gzip", + }, + { + name: "UDP", + schema: "udp", + buffersize: config.Size(1024), + }, + { + name: "UDP with gzip encoding", + schema: "udp", + buffersize: config.Size(1024), + encoding: "gzip", + }, + { + name: "unix socket", + schema: "unix", + buffersize: config.Size(1024), + }, + { + name: "unix socket with TLS", + schema: "unix+tls", + }, + { + name: "unix socket with gzip encoding", + schema: "unix", + encoding: "gzip", + }, + { + name: "unixgram socket", + schema: "unixgram", + buffersize: config.Size(1024), + }, + } + + serverTLS := pki.TLSServerConfig() + clientTLS := pki.TLSClientConfig() + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + proto := strings.TrimSuffix(tt.schema, "+tls") + + // Prepare the address and socket if needed + var sockPath string + var serviceAddress string + var tlsCfg *tls.Config + switch proto { + case "tcp", "udp": + serviceAddress = proto + "://" + "127.0.0.1:0" + case "unix", "unixgram": + if runtime.GOOS == "windows" { + t.Skip("Skipping on Windows, as unixgram sockets are not supported") + } + + // Create a socket + sockPath = testutil.TempSocket(t) + f, err := os.Create(sockPath) + require.NoError(t, err) + defer f.Close() + serviceAddress = proto + "://" + sockPath + } + + // Setup the configuration according to test specification + cfg := &Config{ + ContentEncoding: tt.encoding, + ReadBufferSize: tt.buffersize, + } + if strings.HasSuffix(tt.schema, "tls") { + cfg.ServerConfig = *serverTLS + var err error + tlsCfg, err = clientTLS.TLSConfig() + require.NoError(t, err) + } + + // Create the socket + sock, err := cfg.NewSocket(serviceAddress, &SplitConfig{}, &testutil.Logger{}) + require.NoError(t, err) + + // Create callbacks + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + + var acc testutil.Accumulator + onConnection := func(remote net.Addr, reader io.ReadCloser) { + data, err := io.ReadAll(reader) + require.NoError(t, err) + m, err := parser.Parse(data) + require.NoError(t, err) + addr, _, err := net.SplitHostPort(remote.String()) + if err != nil { + addr = remote.String() + } + for i := range m { + m[i].AddTag("source", addr) + } + acc.AddMetrics(m) + } + onError := func(err error) { + acc.AddError(err) + } + + // Start the listener + require.NoError(t, sock.Setup()) + sock.ListenConnection(onConnection, onError) + defer sock.Close() + + addr := sock.Address() + + // Create a noop client + // Server is async, so verify no errors at the end. + client, err := createClient(serviceAddress, addr, tlsCfg) + require.NoError(t, err) + require.NoError(t, client.Close()) + + // Setup the client for submitting data + client, err = createClient(serviceAddress, addr, tlsCfg) + require.NoError(t, err) + + // Conditionally add the source address to the expectation + expected := make([]telegraf.Metric, 0, len(expectedTemplates)) + for _, tmpl := range expectedTemplates { + m := tmpl.Copy() + switch proto { + case "tcp", "udp": + laddr := client.LocalAddr().String() + addr, _, err := net.SplitHostPort(laddr) + if err != nil { + addr = laddr + } + m.AddTag("source", addr) + case "unix", "unixgram": + m.AddTag("source", sockPath) + } + expected = append(expected, m) + } + + // Send the data with the correct encoding + encoder, err := internal.NewContentEncoder(tt.encoding) + require.NoError(t, err) + + for i, msg := range messages { + m, err := encoder.Encode(msg) + require.NoErrorf(t, err, "encoding failed for msg %d", i) + _, err = client.Write(m) + require.NoErrorf(t, err, "sending msg %d failed", i) + } + client.Close() + + // Test the resulting metrics and compare against expected results + require.Eventuallyf(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond, "did not receive metrics (%d)", acc.NMetrics()) + + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics()) + }) + } +} + +func TestClosingConnections(t *testing.T) { // Setup the configuration cfg := &Config{ ReadBufferSize: 1024, @@ -207,7 +440,7 @@ func TestSocketListenerStream(t *testing.T) { // Create the socket serviceAddress := "tcp://127.0.0.1:0" logger := &testutil.CaptureLogger{} - sock, err := cfg.NewSocket(serviceAddress, logger) + sock, err := cfg.NewSocket(serviceAddress, &SplitConfig{}, logger) require.NoError(t, err) // Create callbacks @@ -215,7 +448,7 @@ func TestSocketListenerStream(t *testing.T) { require.NoError(t, parser.Init()) var acc testutil.Accumulator - onData := func(data []byte) { + onData := func(_ net.Addr, data []byte) { m, err := parser.Parse(data) require.NoError(t, err) acc.AddMetrics(m) @@ -225,7 +458,8 @@ func TestSocketListenerStream(t *testing.T) { } // Start the listener - require.NoError(t, sock.Listen(onData, onError)) + require.NoError(t, sock.Setup()) + sock.Listen(onData, onError) defer sock.Close() addr := sock.Address() @@ -263,6 +497,114 @@ func TestSocketListenerStream(t *testing.T) { require.Empty(t, logger.Warnings()) } +func TestNoSplitter(t *testing.T) { + messages := [][]byte{ + []byte("test,foo=bar v"), + []byte("=1i 123456789\ntest,foo=baz v=2i 123456790\ntest,foo=zab v=3i 123456791\n"), + } + expectedTemplates := []telegraf.Metric{ + metric.New( + "test", + map[string]string{"foo": "bar"}, + map[string]interface{}{"v": int64(1)}, + time.Unix(0, 123456789), + ), + metric.New( + "test", + map[string]string{"foo": "baz"}, + map[string]interface{}{"v": int64(2)}, + time.Unix(0, 123456790), + ), + metric.New( + "test", + map[string]string{"foo": "zab"}, + map[string]interface{}{"v": int64(3)}, + time.Unix(0, 123456791), + ), + } + + // Prepare the address and socket if needed + serviceAddress := "tcp://127.0.0.1:0" + + // Setup the configuration according to test specification + cfg := &Config{} + + // Create the socket + sock, err := cfg.NewSocket(serviceAddress, nil, &testutil.Logger{}) + require.NoError(t, err) + + // Create callbacks + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + + var acc testutil.Accumulator + onConnection := func(remote net.Addr, reader io.ReadCloser) { + data, err := io.ReadAll(reader) + require.NoError(t, err) + m, err := parser.Parse(data) + require.NoError(t, err) + addr, _, err := net.SplitHostPort(remote.String()) + if err != nil { + addr = remote.String() + } + for i := range m { + m[i].AddTag("source", addr) + } + acc.AddMetrics(m) + } + onError := func(err error) { + acc.AddError(err) + } + + // Start the listener + require.NoError(t, sock.Setup()) + sock.ListenConnection(onConnection, onError) + defer sock.Close() + + addr := sock.Address() + + // Create a noop client + // Server is async, so verify no errors at the end. + client, err := createClient(serviceAddress, addr, nil) + require.NoError(t, err) + require.NoError(t, client.Close()) + + // Setup the client for submitting data + client, err = createClient(serviceAddress, addr, nil) + require.NoError(t, err) + + // Conditionally add the source address to the expectation + expected := make([]telegraf.Metric, 0, len(expectedTemplates)) + for _, tmpl := range expectedTemplates { + m := tmpl.Copy() + laddr := client.LocalAddr().String() + addr, _, err := net.SplitHostPort(laddr) + if err != nil { + addr = laddr + } + m.AddTag("source", addr) + expected = append(expected, m) + } + + // Send the data + for i, msg := range messages { + _, err = client.Write(msg) + time.Sleep(100 * time.Millisecond) + require.NoErrorf(t, err, "sending msg %d failed", i) + } + client.Close() + + // Test the resulting metrics and compare against expected results + require.Eventuallyf(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond, "did not receive metrics (%d)", acc.NMetrics()) + + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics()) +} + func createClient(endpoint string, addr net.Addr, tlsCfg *tls.Config) (net.Conn, error) { // Determine the protocol in a crude fashion parts := strings.SplitN(endpoint, "://", 2) diff --git a/plugins/common/socket/sample.conf b/plugins/common/socket/splitter.conf similarity index 57% rename from plugins/common/socket/sample.conf rename to plugins/common/socket/splitter.conf index a87b90ad0b0cf..d7af50183a98a 100644 --- a/plugins/common/socket/sample.conf +++ b/plugins/common/socket/splitter.conf @@ -1,41 +1,3 @@ - ## Permission for unix sockets (only available on unix sockets) - ## This setting may not be respected by some platforms. To safely restrict - ## permissions it is recommended to place the socket into a previously - ## created directory with the desired permissions. - ## ex: socket_mode = "777" - # socket_mode = "" - - ## Maximum number of concurrent connections (only available on stream sockets like TCP) - ## Zero means unlimited. - # max_connections = 0 - - ## Read timeout (only available on stream sockets like TCP) - ## Zero means unlimited. - # read_timeout = "0s" - - ## Optional TLS configuration (only available on stream sockets like TCP) - # tls_cert = "/etc/telegraf/cert.pem" - # tls_key = "/etc/telegraf/key.pem" - ## Enables client authentication if set. - # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] - - ## Maximum socket buffer size (in bytes when no unit specified) - ## For stream sockets, once the buffer fills up, the sender will start - ## backing up. For datagram sockets, once the buffer fills up, metrics will - ## start dropping. Defaults to the OS default. - # read_buffer_size = "64KiB" - - ## Period between keep alive probes (only applies to TCP sockets) - ## Zero disables keep alive probes. Defaults to the OS configuration. - # keep_alive_period = "5m" - - ## Content encoding for message payloads - ## Can be set to "gzip" for compressed payloads or "identity" for no encoding. - # content_encoding = "identity" - - ## Maximum size of decoded packet (in bytes when no unit specified) - # max_decompression_size = "500MB" - ## Message splitting strategy and corresponding settings for stream sockets ## (tcp, tcp4, tcp6, unix or unixpacket). The setting is ignored for packet ## listeners such as udp. diff --git a/plugins/common/socket/splitters.go b/plugins/common/socket/splitters.go index 278c9fc16385a..9949f225a3591 100644 --- a/plugins/common/socket/splitters.go +++ b/plugins/common/socket/splitters.go @@ -3,8 +3,94 @@ package socket import ( "bufio" "bytes" + "encoding/binary" + "encoding/hex" + "fmt" + "regexp" + "strings" ) +type lengthFieldSpec struct { + Offset int64 `toml:"offset"` + Bytes int64 `toml:"bytes"` + Endianness string `toml:"endianness"` + HeaderLength int64 `toml:"header_length"` + converter func([]byte) int +} + +type SplitConfig struct { + SplittingStrategy string `toml:"splitting_strategy"` + SplittingDelimiter string `toml:"splitting_delimiter"` + SplittingLength int `toml:"splitting_length"` + SplittingLengthField lengthFieldSpec `toml:"splitting_length_field"` +} + +func (cfg *SplitConfig) NewSplitter() (bufio.SplitFunc, error) { + switch cfg.SplittingStrategy { + case "", "newline": + return bufio.ScanLines, nil + case "null": + return scanNull, nil + case "delimiter": + re := regexp.MustCompile(`(\s*0?x)`) + d := re.ReplaceAllString(strings.ToLower(cfg.SplittingDelimiter), "") + delimiter, err := hex.DecodeString(d) + if err != nil { + return nil, fmt.Errorf("decoding delimiter failed: %w", err) + } + return createScanDelimiter(delimiter), nil + case "fixed length": + return createScanFixedLength(cfg.SplittingLength), nil + case "variable length": + // Create the converter function + var order binary.ByteOrder + switch strings.ToLower(cfg.SplittingLengthField.Endianness) { + case "", "be": + order = binary.BigEndian + case "le": + order = binary.LittleEndian + default: + return nil, fmt.Errorf("invalid 'endianness' %q", cfg.SplittingLengthField.Endianness) + } + + switch cfg.SplittingLengthField.Bytes { + case 1: + cfg.SplittingLengthField.converter = func(b []byte) int { + return int(b[0]) + } + case 2: + cfg.SplittingLengthField.converter = func(b []byte) int { + return int(order.Uint16(b)) + } + case 4: + cfg.SplittingLengthField.converter = func(b []byte) int { + return int(order.Uint32(b)) + } + case 8: + cfg.SplittingLengthField.converter = func(b []byte) int { + return int(order.Uint64(b)) + } + default: + cfg.SplittingLengthField.converter = func(b []byte) int { + buf := make([]byte, 8) + start := 0 + if order == binary.BigEndian { + start = 8 - len(b) + } + for i := 0; i < len(b); i++ { + buf[start+i] = b[i] + } + return int(order.Uint64(buf)) + } + } + + // Check if we have enough bytes in the header + return createScanVariableLength(cfg.SplittingLengthField), nil + } + + return nil, fmt.Errorf("unknown 'splitting_strategy' %q", cfg.SplittingStrategy) +} + func scanNull(data []byte, atEOF bool) (advance int, token []byte, err error) { if atEOF && len(data) == 0 { return 0, nil, nil diff --git a/plugins/common/socket/stream.go b/plugins/common/socket/stream.go index 24cec1b6d4000..87753f53668dd 100644 --- a/plugins/common/socket/stream.go +++ b/plugins/common/socket/stream.go @@ -10,6 +10,8 @@ import ( "net" "net/url" "os" + "path/filepath" + "runtime" "strconv" "strings" "sync" @@ -34,8 +36,6 @@ type streamListener struct { ReadTimeout config.Duration KeepAlivePeriod *config.Duration Splitter bufio.SplitFunc - OnData CallbackData - OnError CallbackError Log telegraf.Logger listener net.Listener @@ -57,20 +57,23 @@ func (l *streamListener) setupTCP(u *url.URL, tlsCfg *tls.Config) error { } func (l *streamListener) setupUnix(u *url.URL, tlsCfg *tls.Config, socketMode string) error { - err := os.Remove(u.Path) - if err != nil && !errors.Is(err, os.ErrNotExist) { + l.path = filepath.FromSlash(u.Path) + if runtime.GOOS == "windows" && strings.Contains(l.path, ":") { + l.path = strings.TrimPrefix(l.path, `\`) + } + if err := os.Remove(l.path); err != nil && !errors.Is(err, os.ErrNotExist) { return fmt.Errorf("removing socket failed: %w", err) } + var err error if tlsCfg == nil { - l.listener, err = net.Listen(u.Scheme, u.Path) + l.listener, err = net.Listen(u.Scheme, l.path) } else { - l.listener, err = tls.Listen(u.Scheme, u.Path, tlsCfg) + l.listener, err = tls.Listen(u.Scheme, l.path, tlsCfg) } if err != nil { return err } - l.path = u.Path // Set permissions on socket if socketMode != "" { @@ -187,8 +190,11 @@ func (l *streamListener) close() error { l.wg.Wait() if l.path != "" { - err := os.Remove(l.path) - if err != nil && !errors.Is(err, os.ErrNotExist) { + fn := filepath.FromSlash(l.path) + if runtime.GOOS == "windows" && strings.Contains(fn, ":") { + fn = strings.TrimPrefix(fn, `\`) + } + if err := os.Remove(fn); err != nil && !errors.Is(err, os.ErrNotExist) { // Ignore file-not-exists errors when removing the socket return err } @@ -196,46 +202,96 @@ func (l *streamListener) close() error { return nil } -func (l *streamListener) listen() { +func (l *streamListener) listenData(onData CallbackData, onError CallbackError) { l.connections = make(map[net.Conn]bool) l.wg.Add(1) - defer l.wg.Done() + go func() { + defer l.wg.Done() - var wg sync.WaitGroup - for { - conn, err := l.listener.Accept() - if err != nil { - if !errors.Is(err, net.ErrClosed) && l.OnError != nil { - l.OnError(err) + var wg sync.WaitGroup + for { + conn, err := l.listener.Accept() + if err != nil { + if !errors.Is(err, net.ErrClosed) && onError != nil { + onError(err) + } + break } - break - } - if err := l.setupConnection(conn); err != nil && l.OnError != nil { - l.OnError(err) - continue - } + if err := l.setupConnection(conn); err != nil && onError != nil { + onError(err) + continue + } - wg.Add(1) - go func(c net.Conn) { - defer wg.Done() - if err := l.read(c); err != nil { - if !errors.Is(err, io.EOF) && !errors.Is(err, syscall.ECONNRESET) { - if l.OnError != nil { - l.OnError(err) + wg.Add(1) + go func(c net.Conn) { + defer wg.Done() + defer func() { + l.Lock() + l.closeConnection(conn) + l.Unlock() + }() + + reader := l.read + if l.Splitter == nil { + reader = l.readAll + } + if err := reader(c, onData); err != nil { + if !errors.Is(err, io.EOF) && !errors.Is(err, syscall.ECONNRESET) { + if onError != nil { + onError(err) + } } } + }(conn) + } + wg.Wait() + }() +} + +func (l *streamListener) listenConnection(onConnection CallbackConnection, onError CallbackError) { + l.connections = make(map[net.Conn]bool) + + l.wg.Add(1) + go func() { + defer l.wg.Done() + + var wg sync.WaitGroup + for { + conn, err := l.listener.Accept() + if err != nil { + if !errors.Is(err, net.ErrClosed) && onError != nil { + onError(err) + } + break } - l.Lock() - l.closeConnection(conn) - l.Unlock() - }(conn) - } - wg.Wait() + + if err := l.setupConnection(conn); err != nil && onError != nil { + onError(err) + continue + } + + wg.Add(1) + go func(c net.Conn) { + defer wg.Done() + if err := l.handleConnection(c, onConnection); err != nil { + if !errors.Is(err, io.EOF) && !errors.Is(err, syscall.ECONNRESET) { + if onError != nil { + onError(err) + } + } + } + l.Lock() + l.closeConnection(conn) + l.Unlock() + }(conn) + } + wg.Wait() + }() } -func (l *streamListener) read(conn net.Conn) error { +func (l *streamListener) read(conn net.Conn, onData CallbackData) error { decoder, err := internal.NewStreamContentDecoder(l.Encoding, conn) if err != nil { return fmt.Errorf("creating decoder failed: %w", err) @@ -261,8 +317,12 @@ func (l *streamListener) read(conn net.Conn) error { break } + src := conn.RemoteAddr() + if l.path != "" { + src = &net.UnixAddr{Name: l.path, Net: "unix"} + } data := scanner.Bytes() - l.OnData(data) + onData(src, data) } if err := scanner.Err(); err != nil { @@ -279,3 +339,83 @@ func (l *streamListener) read(conn net.Conn) error { } return nil } + +func (l *streamListener) readAll(conn net.Conn, onData CallbackData) error { + src := conn.RemoteAddr() + if l.path != "" { + src = &net.UnixAddr{Name: l.path, Net: "unix"} + } + + decoder, err := internal.NewStreamContentDecoder(l.Encoding, conn) + if err != nil { + return fmt.Errorf("creating decoder failed: %w", err) + } + + timeout := time.Duration(l.ReadTimeout) + // Set the read deadline, if any, then start reading. The read + // will accept the deadline and return if no or insufficient data + // arrived in time. We need to set the deadline in every cycle as + // it is an ABSOLUTE time and not a timeout. + if timeout > 0 { + deadline := time.Now().Add(timeout) + if err := conn.SetReadDeadline(deadline); err != nil { + return fmt.Errorf("setting read deadline failed: %w", err) + } + } + + buf, err := io.ReadAll(decoder) + if err != nil { + return fmt.Errorf("read on %s failed: %w", src, err) + } + onData(src, buf) + + return nil +} + +func (l *streamListener) handleConnection(conn net.Conn, onConnection CallbackConnection) error { + // Prepare the data decoder for the connection + decoder, err := internal.NewStreamContentDecoder(l.Encoding, conn) + if err != nil { + return fmt.Errorf("creating decoder failed: %w", err) + } + + // Get the remote address + src := conn.RemoteAddr() + if l.path != "" { + src = &net.UnixAddr{Name: l.path, Net: "unix"} + } + + // Create a pipe and feed it to the callback + reader, writer := io.Pipe() + defer writer.Close() + go onConnection(src, reader) + + timeout := time.Duration(l.ReadTimeout) + buf := make([]byte, 4096) // 4kb + for { + // Set the read deadline, if any, then start reading. The read + // will accept the deadline and return if no or insufficient data + // arrived in time. We need to set the deadline in every cycle as + // it is an ABSOLUTE time and not a timeout. + if timeout > 0 { + deadline := time.Now().Add(timeout) + if err := conn.SetReadDeadline(deadline); err != nil { + return fmt.Errorf("setting read deadline failed: %w", err) + } + } + + // Copy the data + n, err := decoder.Read(buf) + if err != nil { + if !strings.HasSuffix(err.Error(), ": use of closed network connection") { + if !errors.Is(err, os.ErrDeadlineExceeded) && errors.Is(err, net.ErrClosed) { + writer.CloseWithError(err) + } + } + return nil + } + if _, err := writer.Write(buf[:n]); err != nil { + return err + } + } +} diff --git a/plugins/inputs/socket_listener/sample.conf.in b/plugins/inputs/socket_listener/sample.conf.in index f8c1e19a97c87..5ade7a4f8ea0f 100644 --- a/plugins/inputs/socket_listener/sample.conf.in +++ b/plugins/inputs/socket_listener/sample.conf.in @@ -13,7 +13,9 @@ # service_address = "unixgram:///tmp/telegraf.sock" # service_address = "vsock://cid:port" -{{template "/plugins/common/socket/sample.conf"}} +{{template "/plugins/common/socket/socket.conf"}} + +{{template "/plugins/common/socket/splitter.conf"}} ## Data format to consume. ## Each data format has its own unique set of configuration options, read diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index 6893d3f152b6b..7d1dc6b3b3b58 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -4,6 +4,7 @@ package socket_listener import ( _ "embed" + "net" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/common/socket" @@ -17,6 +18,7 @@ type SocketListener struct { ServiceAddress string `toml:"service_address"` Log telegraf.Logger `toml:"-"` socket.Config + socket.SplitConfig socket *socket.Socket parser telegraf.Parser @@ -27,7 +29,7 @@ func (*SocketListener) SampleConfig() string { } func (sl *SocketListener) Init() error { - sock, err := sl.Config.NewSocket(sl.ServiceAddress, sl.Log) + sock, err := sl.Config.NewSocket(sl.ServiceAddress, &sl.SplitConfig, sl.Log) if err != nil { return err } @@ -46,7 +48,7 @@ func (sl *SocketListener) SetParser(parser telegraf.Parser) { func (sl *SocketListener) Start(acc telegraf.Accumulator) error { // Create the callbacks for parsing the data and recording issues - onData := func(data []byte) { + onData := func(_ net.Addr, data []byte) { metrics, err := sl.parser.Parse(data) if err != nil { acc.AddError(err) @@ -61,9 +63,10 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { } // Start the listener - if err := sl.socket.Listen(onData, onError); err != nil { + if err := sl.socket.Setup(); err != nil { return err } + sl.socket.Listen(onData, onError) addr := sl.socket.Address() sl.Log.Infof("Listening on %s://%s", addr.Network(), addr.String()) diff --git a/plugins/inputs/syslog/README.md b/plugins/inputs/syslog/README.md index e5009b82915d8..3382510c8a816 100644 --- a/plugins/inputs/syslog/README.md +++ b/plugins/inputs/syslog/README.md @@ -43,32 +43,50 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## When using tcp, consider using 'tcp4' or 'tcp6' to force the usage of IPv4 ## or IPV6 respectively. There are cases, where when not specified, a system ## may force an IPv4 mapped IPv6 address. - server = "tcp://:6514" + server = "tcp://127.0.0.1:6514" - ## TLS Config - # tls_allowed_cacerts = ["/etc/telegraf/ca.pem"] - # tls_cert = "/etc/telegraf/cert.pem" - # tls_key = "/etc/telegraf/key.pem" + ## Permission for unix sockets (only available on unix sockets) + ## This setting may not be respected by some platforms. To safely restrict + ## permissions it is recommended to place the socket into a previously + ## created directory with the desired permissions. + ## ex: socket_mode = "777" + # socket_mode = "" + + ## Maximum number of concurrent connections (only available on stream sockets like TCP) + ## Zero means unlimited. + # max_connections = 0 - ## Period between keep alive probes. - ## 0 disables keep alive probes. - ## Defaults to the OS configuration. - ## Only applies to stream sockets (e.g. TCP). + ## Read timeout (only available on stream sockets like TCP) + ## Zero means unlimited. + # read_timeout = "0s" + + ## Optional TLS configuration (only available on stream sockets like TCP) + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Enables client authentication if set. + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + + ## Maximum socket buffer size (in bytes when no unit specified) + ## For stream sockets, once the buffer fills up, the sender will start + ## backing up. For datagram sockets, once the buffer fills up, metrics will + ## start dropping. Defaults to the OS default. + # read_buffer_size = "64KiB" + + ## Period between keep alive probes (only applies to TCP sockets) + ## Zero disables keep alive probes. Defaults to the OS configuration. # keep_alive_period = "5m" - ## Maximum number of concurrent connections (default = 0). - ## 0 means unlimited. - ## Only applies to stream sockets (e.g. TCP). - # max_connections = 1024 + ## Content encoding for message payloads + ## Can be set to "gzip" for compressed payloads or "identity" for no encoding. + # content_encoding = "identity" - ## Read timeout is the maximum time allowed for reading a single message (default = 5s). - ## 0 means unlimited. - # read_timeout = "5s" + ## Maximum size of decoded packet (in bytes when no unit specified) + # max_decompression_size = "500MB" - ## The framing technique with which it is expected that messages are transported (default = "octet-counting"). - ## Whether the messages come using the octect-counting (RFC5425#section-4.3.1, RFC6587#section-3.4.1), - ## or the non-transparent framing technique (RFC6587#section-3.4.2). - ## Must be one of "octect-counting", "non-transparent". + ## Framing technique used for messages transport + ## Available settings are: + ## octet-counting -- see RFC5425#section-4.3.1 and RFC6587#section-3.4.1 + ## non-transparent -- see RFC6587#section-3.4.2 # framing = "octet-counting" ## The trailer to be expected in case of non-transparent framing (default = "LF"). diff --git a/plugins/inputs/syslog/sample.conf b/plugins/inputs/syslog/sample.conf index 5c73effcb72a8..777f16037a2e4 100644 --- a/plugins/inputs/syslog/sample.conf +++ b/plugins/inputs/syslog/sample.conf @@ -8,32 +8,50 @@ ## When using tcp, consider using 'tcp4' or 'tcp6' to force the usage of IPv4 ## or IPV6 respectively. There are cases, where when not specified, a system ## may force an IPv4 mapped IPv6 address. - server = "tcp://:6514" + server = "tcp://127.0.0.1:6514" - ## TLS Config - # tls_allowed_cacerts = ["/etc/telegraf/ca.pem"] + ## Permission for unix sockets (only available on unix sockets) + ## This setting may not be respected by some platforms. To safely restrict + ## permissions it is recommended to place the socket into a previously + ## created directory with the desired permissions. + ## ex: socket_mode = "777" + # socket_mode = "" + + ## Maximum number of concurrent connections (only available on stream sockets like TCP) + ## Zero means unlimited. + # max_connections = 0 + + ## Read timeout (only available on stream sockets like TCP) + ## Zero means unlimited. + # read_timeout = "0s" + + ## Optional TLS configuration (only available on stream sockets like TCP) # tls_cert = "/etc/telegraf/cert.pem" - # tls_key = "/etc/telegraf/key.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Enables client authentication if set. + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + + ## Maximum socket buffer size (in bytes when no unit specified) + ## For stream sockets, once the buffer fills up, the sender will start + ## backing up. For datagram sockets, once the buffer fills up, metrics will + ## start dropping. Defaults to the OS default. + # read_buffer_size = "64KiB" - ## Period between keep alive probes. - ## 0 disables keep alive probes. - ## Defaults to the OS configuration. - ## Only applies to stream sockets (e.g. TCP). + ## Period between keep alive probes (only applies to TCP sockets) + ## Zero disables keep alive probes. Defaults to the OS configuration. # keep_alive_period = "5m" - ## Maximum number of concurrent connections (default = 0). - ## 0 means unlimited. - ## Only applies to stream sockets (e.g. TCP). - # max_connections = 1024 + ## Content encoding for message payloads + ## Can be set to "gzip" for compressed payloads or "identity" for no encoding. + # content_encoding = "identity" - ## Read timeout is the maximum time allowed for reading a single message (default = 5s). - ## 0 means unlimited. - # read_timeout = "5s" + ## Maximum size of decoded packet (in bytes when no unit specified) + # max_decompression_size = "500MB" - ## The framing technique with which it is expected that messages are transported (default = "octet-counting"). - ## Whether the messages come using the octect-counting (RFC5425#section-4.3.1, RFC6587#section-3.4.1), - ## or the non-transparent framing technique (RFC6587#section-3.4.2). - ## Must be one of "octect-counting", "non-transparent". + ## Framing technique used for messages transport + ## Available settings are: + ## octet-counting -- see RFC5425#section-4.3.1 and RFC6587#section-3.4.1 + ## non-transparent -- see RFC6587#section-3.4.2 # framing = "octet-counting" ## The trailer to be expected in case of non-transparent framing (default = "LF"). diff --git a/plugins/inputs/syslog/sample.conf.in b/plugins/inputs/syslog/sample.conf.in new file mode 100644 index 0000000000000..df58aed9024fb --- /dev/null +++ b/plugins/inputs/syslog/sample.conf.in @@ -0,0 +1,39 @@ +[[inputs.syslog]] + ## Protocol, address and port to host the syslog receiver. + ## If no host is specified, then localhost is used. + ## If no port is specified, 6514 is used (RFC5425#section-4.1). + ## ex: server = "tcp://localhost:6514" + ## server = "udp://:6514" + ## server = "unix:///var/run/telegraf-syslog.sock" + ## When using tcp, consider using 'tcp4' or 'tcp6' to force the usage of IPv4 + ## or IPV6 respectively. There are cases, where when not specified, a system + ## may force an IPv4 mapped IPv6 address. + server = "tcp://127.0.0.1:6514" + +{{template "/plugins/common/socket/socket.conf"}} + + ## Framing technique used for messages transport + ## Available settings are: + ## octet-counting -- see RFC5425#section-4.3.1 and RFC6587#section-3.4.1 + ## non-transparent -- see RFC6587#section-3.4.2 + # framing = "octet-counting" + + ## The trailer to be expected in case of non-transparent framing (default = "LF"). + ## Must be one of "LF", or "NUL". + # trailer = "LF" + + ## Whether to parse in best effort mode or not (default = false). + ## By default best effort parsing is off. + # best_effort = false + + ## The RFC standard to use for message parsing + ## By default RFC5424 is used. RFC3164 only supports UDP transport (no streaming support) + ## Must be one of "RFC5424", or "RFC3164". + # syslog_standard = "RFC5424" + + ## Character to prepend to SD-PARAMs (default = "_"). + ## A syslog message can contain multiple parameters and multiple identifiers within structured data section. + ## Eg., [id1 name1="val1" name2="val2"][id2 name1="val1" nameA="valA"] + ## For each combination a field is created. + ## Its name is created concatenating identifier, sdparam_separator, and parameter name. + # sdparam_separator = "_" diff --git a/plugins/inputs/syslog/syslog.go b/plugins/inputs/syslog/syslog.go index 55f6843a04b92..5b8d63cced470 100644 --- a/plugins/inputs/syslog/syslog.go +++ b/plugins/inputs/syslog/syslog.go @@ -1,18 +1,15 @@ +//go:generate ../../../tools/config_includer/generator //go:generate ../../../tools/readme_config_includer/generator package syslog import ( - "crypto/tls" _ "embed" "fmt" "io" "net" "net/url" - "os" - "path/filepath" "strings" "sync" - "time" "unicode" "github.com/influxdata/go-syslog/v3" @@ -22,112 +19,129 @@ import ( "github.com/influxdata/go-syslog/v3/rfc5424" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/config" - framing "github.com/influxdata/telegraf/internal/syslog" - tlsConfig "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/common/socket" "github.com/influxdata/telegraf/plugins/inputs" ) //go:embed sample.conf var sampleConfig string -type syslogRFC string - -const defaultReadTimeout = time.Second * 5 -const ipMaxPacketSize = 64 * 1024 -const syslogRFC3164 = "RFC3164" -const syslogRFC5424 = "RFC5424" +const readTimeoutMsg = "Read timeout set! Connections, inactive for the set duration, will be closed!" // Syslog is a syslog plugin type Syslog struct { - tlsConfig.ServerConfig - Address string `toml:"server"` - KeepAlivePeriod *config.Duration - MaxConnections int - ReadTimeout *config.Duration - Framing framing.Framing - SyslogStandard syslogRFC - Trailer nontransparent.TrailerType - BestEffort bool - Separator string `toml:"sdparam_separator"` - - now func() time.Time - lastTime time.Time + Address string `toml:"server"` + Framing string `toml:"framing"` + SyslogStandard string `toml:"syslog_standard"` + Trailer nontransparent.TrailerType `toml:"trailer"` + BestEffort bool `toml:"best_effort"` + Separator string `toml:"sdparam_separator"` + Log telegraf.Logger `toml:"-"` + socket.Config mu sync.Mutex wg sync.WaitGroup - io.Closer - - isStream bool - tcpListener net.Listener - tlsConfig *tls.Config - connections map[string]net.Conn - connectionsMu sync.Mutex - udpListener net.PacketConn + url *url.URL + socket *socket.Socket } func (*Syslog) SampleConfig() string { return sampleConfig } -// Gather ... -func (s *Syslog) Gather(_ telegraf.Accumulator) error { - return nil -} +func (s *Syslog) Init() error { + // Check settings and set defaults + switch s.Framing { + case "": + s.Framing = "octet-counting" + case "octet-counting", "non-transparent": + default: + return fmt.Errorf("invalid 'framing' %q", s.Framing) + } -// Start starts the service. -func (s *Syslog) Start(acc telegraf.Accumulator) error { - s.mu.Lock() - defer s.mu.Unlock() + switch s.SyslogStandard { + case "": + s.SyslogStandard = "RFC5424" + case "RFC3164", "RFC5424": + default: + return fmt.Errorf("invalid 'syslog_standard' %q", s.SyslogStandard) + } + + if s.Separator == "" { + s.Separator = "_" + } + + // Check and parse address, set default if necessary + if s.Address == "" { + s.Address = "tcp://127.0.0.1:6514" + } + + if !strings.Contains(s.Address, "://") { + return fmt.Errorf("missing protocol within address %q", s.Address) + } - scheme, host, err := getAddressParts(s.Address) + u, err := url.Parse(s.Address) if err != nil { - return err + return fmt.Errorf("parsing address %q failed: %w", s.Address, err) } - s.Address = host - switch scheme { + // Check if we do have a port and add the default one if not + if u.Port() == "" { + u.Host += ":6514" + } + s.url = u + + switch s.url.Scheme { case "tcp", "tcp4", "tcp6", "unix", "unixpacket": - s.isStream = true + if s.ReadTimeout > 0 { + s.Log.Warn(readTimeoutMsg) + } case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram": - s.isStream = false default: - return fmt.Errorf("unknown protocol %q in %q", scheme, s.Address) + return fmt.Errorf("unknown protocol %q in %q", u.Scheme, s.Address) } - if scheme == "unix" || scheme == "unixpacket" || scheme == "unixgram" { - os.Remove(s.Address) + // Create a socket + sock, err := s.Config.NewSocket(u.String(), nil, s.Log) + if err != nil { + return err } + s.socket = sock - if s.isStream { - l, err := net.Listen(scheme, s.Address) - if err != nil { - return err - } - s.Closer = l - s.tcpListener = l - s.tlsConfig, err = s.TLSConfig() - if err != nil { - return err - } + return nil +} - s.wg.Add(1) - go s.listenStream(acc) - } else { - l, err := net.ListenPacket(scheme, s.Address) - if err != nil { - return err - } - s.Closer = l - s.udpListener = l +// Gather ... +func (*Syslog) Gather(_ telegraf.Accumulator) error { + return nil +} - s.wg.Add(1) - go s.listenPacket(acc) +// Start starts the service. +func (s *Syslog) Start(acc telegraf.Accumulator) error { + s.mu.Lock() + defer s.mu.Unlock() + + // Setup the listener + if err := s.socket.Setup(); err != nil { + return err } + addr := s.socket.Address() + s.Log.Infof("Listening on %s://%s", addr.Network(), addr.String()) - if scheme == "unix" || scheme == "unixpacket" || scheme == "unixgram" { - s.Closer = unixCloser{path: s.Address, closer: s.Closer} + // Setup the callbacks and start listening + onError := func(err error) { + acc.AddError(err) + } + switch s.url.Scheme { + case "tcp", "tcp4", "tcp6", "unix", "unixpacket": + onConnection := s.createStreamDataHandler(acc) + s.socket.ListenConnection(onConnection, onError) + case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram": + onData := s.createDatagramDataHandler(acc) + s.socket.Listen(onData, onError) + default: + return fmt.Errorf("unknown protocol %q in %q", s.url.Scheme, s.Address) } return nil @@ -138,312 +152,185 @@ func (s *Syslog) Stop() { s.mu.Lock() defer s.mu.Unlock() - if s.Closer != nil { - s.Close() - } + s.socket.Close() s.wg.Wait() } -// getAddressParts returns the address scheme and host -// it also sets defaults for them when missing -// when the input address does not specify the protocol it returns an error -func getAddressParts(a string) (scheme string, host string, err error) { - parts := strings.SplitN(a, "://", 2) - if len(parts) != 2 { - return "", "", fmt.Errorf("missing protocol within address %q", a) - } - - u, err := url.Parse(filepath.ToSlash(a)) //convert backslashes to slashes (to make Windows path a valid URL) - if err != nil { - return "", "", fmt.Errorf("could not parse address %q: %w", a, err) - } - switch u.Scheme { - case "unix", "unixpacket", "unixgram": - return parts[0], parts[1], nil - } - - if u.Hostname() != "" { - host = u.Hostname() - } - host += ":" - if u.Port() == "" { - host += "6514" - } else { - host += u.Port() +func (s *Syslog) createStreamDataHandler(acc telegraf.Accumulator) socket.CallbackConnection { + // Create parser options + var opts []syslog.ParserOption + if s.BestEffort { + opts = append(opts, syslog.WithBestEffort()) } - - return u.Scheme, host, nil -} - -func (s *Syslog) listenPacket(acc telegraf.Accumulator) { - defer s.wg.Done() - b := make([]byte, ipMaxPacketSize) - var p syslog.Machine - switch { - case !s.BestEffort && s.SyslogStandard == syslogRFC5424: - p = rfc5424.NewParser() - case s.BestEffort && s.SyslogStandard == syslogRFC5424: - p = rfc5424.NewParser(rfc5424.WithBestEffort()) - case !s.BestEffort && s.SyslogStandard == syslogRFC3164: - p = rfc3164.NewParser(rfc3164.WithYear(rfc3164.CurrentYear{})) - case s.BestEffort && s.SyslogStandard == syslogRFC3164: - p = rfc3164.NewParser(rfc3164.WithYear(rfc3164.CurrentYear{}), rfc3164.WithBestEffort()) + if s.Framing == "non-transparent" { + opts = append(opts, nontransparent.WithTrailer(s.Trailer)) } - for { - n, sourceAddr, err := s.udpListener.ReadFrom(b) - if err != nil { - if !strings.HasSuffix(err.Error(), ": use of closed network connection") { - acc.AddError(err) - } - break - } - message, err := p.Parse(b[:n]) - if message != nil { - acc.AddFields("syslog", fields(message, s), tags(message, sourceAddr), s.currentTime()) + return func(src net.Addr, reader io.ReadCloser) { + // Create the parser depending on transport framing and other settings + var parser syslog.Parser + switch s.Framing { + case "octet-counting": + parser = octetcounting.NewParser(opts...) + case "non-transparent": + parser = nontransparent.NewParser(opts...) } - if err != nil { - acc.AddError(err) - } - if err == nil && message == nil { - acc.AddError(fmt.Errorf("unable to parse message: %s", string(b[:n]))) - } - } -} - -func (s *Syslog) listenStream(acc telegraf.Accumulator) { - defer s.wg.Done() - s.connections = map[string]net.Conn{} - - for { - conn, err := s.tcpListener.Accept() - if err != nil { - if !strings.HasSuffix(err.Error(), ": use of closed network connection") { - acc.AddError(err) + // Remove port from address + var addr string + if src.Network() != "unix" { + var err error + if addr, _, err = net.SplitHostPort(src.String()); err != nil { + addr = src.String() } - break - } - var tcpConn, _ = conn.(*net.TCPConn) - if s.tlsConfig != nil { - conn = tls.Server(conn, s.tlsConfig) } - s.connectionsMu.Lock() - if s.MaxConnections > 0 && len(s.connections) >= s.MaxConnections { - s.connectionsMu.Unlock() - if err := conn.Close(); err != nil { - acc.AddError(err) + parser.WithListener(func(r *syslog.Result) { + if r.Error != nil { + acc.AddError(r.Error) + } + if r.Message == nil { + return } - continue - } - s.connections[conn.RemoteAddr().String()] = conn - s.connectionsMu.Unlock() - - if err := s.setKeepAlive(tcpConn); err != nil { - acc.AddError(fmt.Errorf("unable to configure keep alive %q: %w", s.Address, err)) - } - - go s.handle(conn, acc) - } - s.connectionsMu.Lock() - for _, c := range s.connections { - if err := c.Close(); err != nil { - acc.AddError(err) - } + // Extract message information + acc.AddFields("syslog", fields(r.Message, s.Separator), tags(r.Message, addr)) + }) + parser.Parse(reader) } - s.connectionsMu.Unlock() } -func (s *Syslog) removeConnection(c net.Conn) { - s.connectionsMu.Lock() - delete(s.connections, c.RemoteAddr().String()) - s.connectionsMu.Unlock() -} - -func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) { - defer func() { - s.removeConnection(conn) - conn.Close() - }() - - var p syslog.Parser - - emit := func(r *syslog.Result) { - s.store(*r, conn.RemoteAddr(), acc) - if s.ReadTimeout != nil && time.Duration(*s.ReadTimeout) > 0 { - if err := conn.SetReadDeadline(time.Now().Add(time.Duration(*s.ReadTimeout))); err != nil { - acc.AddError(fmt.Errorf("setting read deadline failed: %w", err)) - } - } - } - - // Create parser options - opts := []syslog.ParserOption{ - syslog.WithListener(emit), +func (s *Syslog) createDatagramDataHandler(acc telegraf.Accumulator) socket.CallbackData { + // Create the parser depending on syslog standard and other settings + var parser syslog.Machine + switch s.SyslogStandard { + case "RFC3164": + parser = rfc3164.NewParser(rfc3164.WithYear(rfc3164.CurrentYear{})) + case "RFC5424": + parser = rfc5424.NewParser() } if s.BestEffort { - opts = append(opts, syslog.WithBestEffort()) + parser.WithBestEffort() } - // Select the parser to use depending on transport framing - if s.Framing == framing.OctetCounting { - // Octet counting transparent framing - p = octetcounting.NewParser(opts...) - } else { - // Non-transparent framing - opts = append(opts, nontransparent.WithTrailer(s.Trailer)) - p = nontransparent.NewParser(opts...) - } - - p.Parse(conn) + // Return the OnData function + return func(src net.Addr, data []byte) { + message, err := parser.Parse(data) + if err != nil { + acc.AddError(err) + } else if message == nil { + acc.AddError(fmt.Errorf("unable to parse message: %s", string(data))) + } + if message == nil { + return + } - if s.ReadTimeout != nil && time.Duration(*s.ReadTimeout) > 0 { - if err := conn.SetReadDeadline(time.Now().Add(time.Duration(*s.ReadTimeout))); err != nil { - acc.AddError(fmt.Errorf("setting read deadline failed: %w", err)) + // Extract message information + var addr string + if src.Network() != "unixgram" { + var err error + if addr, _, err = net.SplitHostPort(src.String()); err != nil { + addr = src.String() + } } + acc.AddFields("syslog", fields(message, s.Separator), tags(message, addr)) } } -func (s *Syslog) setKeepAlive(c *net.TCPConn) error { - if s.KeepAlivePeriod == nil { - return nil +func tags(msg syslog.Message, src string) map[string]string { + // Extract message information + tags := map[string]string{ + "severity": *msg.SeverityShortLevel(), + "facility": *msg.FacilityLevel(), } - if *s.KeepAlivePeriod == 0 { - return c.SetKeepAlive(false) + if src != "" { + tags["source"] = src } - if err := c.SetKeepAlive(true); err != nil { - return err - } - return c.SetKeepAlivePeriod(time.Duration(*s.KeepAlivePeriod)) -} - -func (s *Syslog) store(res syslog.Result, remoteAddr net.Addr, acc telegraf.Accumulator) { - if res.Error != nil { - acc.AddError(res.Error) - } - if res.Message != nil { - acc.AddFields("syslog", fields(res.Message, s), tags(res.Message, remoteAddr), s.currentTime()) - } -} - -func tags(msg syslog.Message, sourceAddr net.Addr) map[string]string { - ts := map[string]string{} - - // Not checking assuming a minimally valid message - ts["severity"] = *msg.SeverityShortLevel() - ts["facility"] = *msg.FacilityLevel() - switch m := msg.(type) { + switch msg := msg.(type) { case *rfc5424.SyslogMessage: - populateCommonTags(&m.Base, ts) + if msg.Hostname != nil { + tags["hostname"] = *msg.Hostname + } + if msg.Appname != nil { + tags["appname"] = *msg.Appname + } case *rfc3164.SyslogMessage: - populateCommonTags(&m.Base, ts) - } - - if sourceAddr != nil { - if source, _, err := net.SplitHostPort(sourceAddr.String()); err == nil { - ts["source"] = source + if msg.Hostname != nil { + tags["hostname"] = *msg.Hostname + } + if msg.Appname != nil { + tags["appname"] = *msg.Appname } } - return ts + return tags } -func fields(msg syslog.Message, s *Syslog) map[string]interface{} { - flds := map[string]interface{}{} - - switch m := msg.(type) { +func fields(msg syslog.Message, separator string) map[string]interface{} { + var fields map[string]interface{} + switch msg := msg.(type) { case *rfc5424.SyslogMessage: - populateCommonFields(&m.Base, flds) - // Not checking assuming a minimally valid message - flds["version"] = m.Version - - if m.StructuredData != nil { - for sdid, sdparams := range *m.StructuredData { + fields = map[string]interface{}{ + "facility_code": int(*msg.Facility), + "severity_code": int(*msg.Severity), + "version": msg.Version, + } + if msg.Timestamp != nil { + fields["timestamp"] = (*msg.Timestamp).UnixNano() + } + if msg.ProcID != nil { + fields["procid"] = *msg.ProcID + } + if msg.MsgID != nil { + fields["msgid"] = *msg.MsgID + } + if msg.Message != nil { + fields["message"] = strings.TrimRightFunc(*msg.Message, func(r rune) bool { + return unicode.IsSpace(r) + }) + } + if msg.StructuredData != nil { + for sdid, sdparams := range *msg.StructuredData { if len(sdparams) == 0 { // When SD-ID does not have params we indicate its presence with a bool - flds[sdid] = true + fields[sdid] = true continue } - for name, value := range sdparams { - // Using whitespace as separator since it is not allowed by the grammar within SDID - flds[sdid+s.Separator+name] = value + for k, v := range sdparams { + fields[sdid+separator+k] = v } } } case *rfc3164.SyslogMessage: - populateCommonFields(&m.Base, flds) - } - - return flds -} - -func populateCommonFields(msg *syslog.Base, flds map[string]interface{}) { - flds["facility_code"] = int(*msg.Facility) - flds["severity_code"] = int(*msg.Severity) - if msg.Timestamp != nil { - flds["timestamp"] = (*msg.Timestamp).UnixNano() - } - if msg.ProcID != nil { - flds["procid"] = *msg.ProcID - } - if msg.MsgID != nil { - flds["msgid"] = *msg.MsgID - } - if msg.Message != nil { - flds["message"] = strings.TrimRightFunc(*msg.Message, func(r rune) bool { - return unicode.IsSpace(r) - }) - } -} - -func populateCommonTags(msg *syslog.Base, ts map[string]string) { - if msg.Hostname != nil { - ts["hostname"] = *msg.Hostname - } - if msg.Appname != nil { - ts["appname"] = *msg.Appname - } -} - -type unixCloser struct { - path string - closer io.Closer -} - -func (uc unixCloser) Close() error { - err := uc.closer.Close() - os.Remove(uc.path) - return err -} - -func (s *Syslog) currentTime() time.Time { - t := s.now() - if t == s.lastTime { - t = t.Add(time.Nanosecond) + fields = map[string]interface{}{ + "facility_code": int(*msg.Facility), + "severity_code": int(*msg.Severity), + } + if msg.Timestamp != nil { + fields["timestamp"] = (*msg.Timestamp).UnixNano() + } + if msg.ProcID != nil { + fields["procid"] = *msg.ProcID + } + if msg.MsgID != nil { + fields["msgid"] = *msg.MsgID + } + if msg.Message != nil { + fields["message"] = strings.TrimRightFunc(*msg.Message, func(r rune) bool { + return unicode.IsSpace(r) + }) + } } - s.lastTime = t - return t -} -func getNanoNow() time.Time { - return time.Unix(0, time.Now().UnixNano()) + return fields } func init() { - defaultTimeout := config.Duration(defaultReadTimeout) inputs.Add("syslog", func() telegraf.Input { return &Syslog{ - Address: ":6514", - now: getNanoNow, - ReadTimeout: &defaultTimeout, - Framing: framing.OctetCounting, - SyslogStandard: syslogRFC5424, - Trailer: nontransparent.LF, - Separator: "_", + Trailer: nontransparent.LF, } }) } diff --git a/plugins/inputs/syslog/syslog_test.go b/plugins/inputs/syslog/syslog_test.go index 3ce47ae6afa2b..29b2d182158c8 100644 --- a/plugins/inputs/syslog/syslog_test.go +++ b/plugins/inputs/syslog/syslog_test.go @@ -3,10 +3,10 @@ package syslog import ( "crypto/tls" "net" - "net/url" "os" "path/filepath" "runtime" + "sort" "strings" "testing" "time" @@ -16,8 +16,8 @@ import ( "github.com/influxdata/go-syslog/v3/nontransparent" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" - framing "github.com/influxdata/telegraf/internal/syslog" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/common/socket" "github.com/influxdata/telegraf/plugins/inputs" influx "github.com/influxdata/telegraf/plugins/parsers/influx/influx_upstream" "github.com/influxdata/telegraf/testutil" @@ -25,50 +25,61 @@ import ( var pki = testutil.NewPKI("../../../testutil/pki") -func TestInitFail(t *testing.T) { - tests := []struct { - name string - address string - expected string - }{ - { - name: "no address", - expected: "missing protocol within address", - }, - { - name: "missing protocol", - address: "localhost:6514", - expected: "missing protocol within address", - }, - { - name: "unknown protocol", - address: "unsupported://example.com:6514", - expected: "unknown protocol", - }, +func TestAddressMissingProtocol(t *testing.T) { + plugin := &Syslog{ + Address: "localhost:6514", + Log: testutil.Logger{}, } + require.ErrorContains(t, plugin.Init(), "missing protocol within address") +} - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - plugin := &Syslog{ - Address: tt.address, - } - var acc testutil.Accumulator - require.ErrorContains(t, plugin.Start(&acc), tt.expected) - }) +func TestAddressUnknownProtocol(t *testing.T) { + plugin := &Syslog{ + Address: "unsupported://example.com:6514", + Log: testutil.Logger{}, } + require.ErrorContains(t, plugin.Init(), "unknown protocol") +} + +func TestAddressDefault(t *testing.T) { + plugin := &Syslog{Log: testutil.Logger{}} + require.NoError(t, plugin.Init()) + + require.Equal(t, "tcp://127.0.0.1:6514", plugin.url.String()) } func TestAddressDefaultPort(t *testing.T) { plugin := &Syslog{ Address: "tcp://localhost", + Log: testutil.Logger{}, } + require.NoError(t, plugin.Init()) + + // Default port is 6514 + require.Equal(t, "tcp://localhost:6514", plugin.url.String()) +} + +func TestReadTimeoutWarning(t *testing.T) { + logger := &testutil.CaptureLogger{} + plugin := &Syslog{ + Address: "tcp://localhost:6514", + Config: socket.Config{ + ReadTimeout: config.Duration(time.Second), + }, + Log: logger, + } + require.NoError(t, plugin.Init()) var acc testutil.Accumulator require.NoError(t, plugin.Start(&acc)) - defer plugin.Stop() + plugin.Stop() - // Default port is 6514 - require.Equal(t, "localhost:6514", plugin.Address) + require.Eventually(t, func() bool { + return logger.NMessages() > 0 + }, 3*time.Second, 100*time.Millisecond) + + warnings := logger.Warnings() + require.Contains(t, warnings, "W! [] "+readTimeoutMsg) } func TestUnixgram(t *testing.T) { @@ -83,16 +94,12 @@ func TestUnixgram(t *testing.T) { defer f.Close() // Setup plugin and start it - timeout := config.Duration(defaultReadTimeout) plugin := &Syslog{ - Address: "unixgram://" + sock, - Framing: framing.OctetCounting, - ReadTimeout: &timeout, - Separator: "_", - SyslogStandard: "RFC5424", - Trailer: nontransparent.LF, - now: getNanoNow, + Address: "unixgram://" + sock, + Trailer: nontransparent.LF, + Log: testutil.Logger{}, } + require.NoError(t, plugin.Init()) var acc testutil.Accumulator require.NoError(t, plugin.Start(&acc)) @@ -152,15 +159,9 @@ func TestCases(t *testing.T) { // Register the plugin inputs.Add("syslog", func() telegraf.Input { - defaultTimeout := config.Duration(defaultReadTimeout) return &Syslog{ - Address: ":6514", - now: getNanoNow, - ReadTimeout: &defaultTimeout, - Framing: framing.OctetCounting, - SyslogStandard: syslogRFC5424, - Trailer: nontransparent.LF, - Separator: "_", + Trailer: nontransparent.LF, + Log: testutil.Logger{}, } }) @@ -173,7 +174,7 @@ func TestCases(t *testing.T) { t.Run(f.Name(), func(t *testing.T) { testcasePath := filepath.Join("testcases", f.Name()) configFilename := filepath.Join(testcasePath, "telegraf.conf") - inputFilename := filepath.Join(testcasePath, "input.txt") + inputFilenamePattern := filepath.Join(testcasePath, "input*.txt") expectedFilename := filepath.Join(testcasePath, "expected.out") expectedErrorFilename := filepath.Join(testcasePath, "expected.err") @@ -182,8 +183,16 @@ func TestCases(t *testing.T) { require.NoError(t, parser.Init()) // Read the input data - inputData, err := os.ReadFile(inputFilename) + inputFiles, err := filepath.Glob(inputFilenamePattern) require.NoError(t, err) + require.NotEmpty(t, inputFiles) + sort.Strings(inputFiles) + messages := make([][]byte, 0, len(inputFiles)) + for _, fn := range inputFiles { + data, err := os.ReadFile(fn) + require.NoErrorf(t, err, "failed file: %s", fn) + messages = append(messages, data) + } // Read the expected output if any var expected []telegraf.Metric @@ -214,25 +223,22 @@ func TestCases(t *testing.T) { // Determine server properties. We need to parse the address before // calling Start() as it is modified in this function. - u, err := url.Parse(plugin.Address) - require.NoError(t, err) - if u.Scheme == "unix" { + if strings.HasPrefix(plugin.Address, "unix://") { // Use a random socket - sock := testutil.TempSocket(t) + sock := filepath.ToSlash(testutil.TempSocket(t)) + if !strings.HasPrefix(sock, "/") { + sock = "/" + sock + } plugin.Address = "unix://" + sock } + require.NoError(t, plugin.Init()) var acc testutil.Accumulator require.NoError(t, plugin.Start(&acc)) defer plugin.Stop() // Get the address - var addr string - if plugin.isStream { - addr = plugin.tcpListener.Addr().String() - } else { - addr = plugin.udpListener.LocalAddr().String() - } + addr := plugin.socket.Address().String() // Create a fake sender var client net.Conn @@ -241,17 +247,19 @@ func TestCases(t *testing.T) { require.NoError(t, err) tlscfg.ServerName = "localhost" - client, err = tls.Dial(u.Scheme, addr, tlscfg) + client, err = tls.Dial(plugin.url.Scheme, addr, tlscfg) require.NoError(t, err) } else { - client, err = net.Dial(u.Scheme, addr) + client, err = net.Dial(plugin.url.Scheme, addr) require.NoError(t, err) } defer client.Close() // Send the data and afterwards stop client and plugin - _, err = client.Write(inputData) - require.NoError(t, err) + for i, msg := range messages { + _, err := client.Write(msg) + require.NoErrorf(t, err, "message %d failed with content %q", i, string(msg)) + } client.Close() // Check the metric nevertheless as we might get some metrics despite errors. @@ -277,3 +285,69 @@ func TestCases(t *testing.T) { }) } } + +func TestSocketClosed(t *testing.T) { + // Setup the plugin + plugin := &Syslog{ + Address: "tcp://127.0.0.1:0", + Config: socket.Config{}, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + // Get the address + addr := plugin.socket.Address().String() + + // Create a fake sender + client, err := net.Dial("tcp", addr) + require.NoError(t, err) + defer client.Close() + + // Send a message to check if the socket is really active + msg := []byte(`72 <13>1 2024-02-15T11:12:24.718151+01:00 Hugin sven - - [] Connection test`) + _, err = client.Write(msg) + require.NoError(t, err) + + // Stop the plugin and check if the socket is closed and unreachable + plugin.Stop() + + require.Eventually(t, func() bool { + _, err := client.Write(msg) + return err != nil + }, 3*time.Second, 100*time.Millisecond) +} + +func TestIssue10121(t *testing.T) { + // Setup the plugin + plugin := &Syslog{ + Address: "tcp://127.0.0.1:0", + Config: socket.Config{ + ReadTimeout: config.Duration(10 * time.Millisecond), + }, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + // Get the address + addr := plugin.socket.Address().String() + + // Create a fake sender + client, err := net.Dial("tcp", addr) + require.NoError(t, err) + defer client.Close() + + // Messages should eventually timeout + msg := []byte(`72 <13>1 2024-02-15T11:12:24.718151+01:00 Hugin sven - - [] Connection test`) + require.Eventually(t, func() bool { + _, err := client.Write(msg) + return err != nil + }, 3*time.Second, 250*time.Millisecond) +} diff --git a/plugins/outputs/syslog/syslog.go b/plugins/outputs/syslog/syslog.go index fbf574fade84d..cb2538d931873 100644 --- a/plugins/outputs/syslog/syslog.go +++ b/plugins/outputs/syslog/syslog.go @@ -16,7 +16,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" - framing "github.com/influxdata/telegraf/internal/syslog" tlsint "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" ) @@ -33,7 +32,7 @@ type Syslog struct { DefaultAppname string Sdids []string Separator string `toml:"sdparam_separator"` - Framing framing.Framing + Framing string `toml:"framing"` Trailer nontransparent.TrailerType Log telegraf.Logger `toml:"-"` net.Conn @@ -44,6 +43,17 @@ type Syslog struct { func (*Syslog) SampleConfig() string { return sampleConfig } +func (s *Syslog) Init() error { + // Check framing and set default + switch s.Framing { + case "": + s.Framing = "octet-counting" + case "octet-counting", "non-transparent": + default: + return fmt.Errorf("invalid 'framing' %q", s.Framing) + } + return nil +} func (s *Syslog) Connect() error { s.initializeSyslogMapper() @@ -141,7 +151,7 @@ func (s *Syslog) getSyslogMessageBytesWithFraming(msg *rfc5424.SyslogMessage) ([ } msgBytes := []byte(msgString) - if s.Framing == framing.OctetCounting { + if s.Framing == "octet-counting" { return append([]byte(strconv.Itoa(len(msgBytes))+" "), msgBytes...), nil } // Non-transparent framing @@ -167,7 +177,6 @@ func (s *Syslog) initializeSyslogMapper() { func newSyslog() *Syslog { return &Syslog{ - Framing: framing.OctetCounting, Trailer: nontransparent.LF, Separator: "_", DefaultSeverityCode: uint8(5), // notice diff --git a/plugins/outputs/syslog/syslog_test.go b/plugins/outputs/syslog/syslog_test.go index f4948b42171d4..8e5e76967c11d 100644 --- a/plugins/outputs/syslog/syslog_test.go +++ b/plugins/outputs/syslog/syslog_test.go @@ -10,7 +10,6 @@ import ( "github.com/influxdata/go-syslog/v3/nontransparent" "github.com/influxdata/telegraf" - framing "github.com/influxdata/telegraf/internal/syslog" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" ) @@ -18,6 +17,7 @@ import ( func TestGetSyslogMessageWithFramingOctectCounting(t *testing.T) { // Init plugin s := newSyslog() + require.NoError(t, s.Init()) s.initializeSyslogMapper() // Init metrics @@ -41,8 +41,9 @@ func TestGetSyslogMessageWithFramingOctectCounting(t *testing.T) { func TestGetSyslogMessageWithFramingNonTransparent(t *testing.T) { // Init plugin s := newSyslog() + require.NoError(t, s.Init()) s.initializeSyslogMapper() - s.Framing = framing.NonTransparent + s.Framing = "non-transparent" // Init metrics m1 := metric.New( @@ -65,8 +66,9 @@ func TestGetSyslogMessageWithFramingNonTransparent(t *testing.T) { func TestGetSyslogMessageWithFramingNonTransparentNul(t *testing.T) { // Init plugin s := newSyslog() + require.NoError(t, s.Init()) s.initializeSyslogMapper() - s.Framing = framing.NonTransparent + s.Framing = "non-transparent" s.Trailer = nontransparent.NUL // Init metrics @@ -92,6 +94,7 @@ func TestSyslogWriteWithTcp(t *testing.T) { require.NoError(t, err) s := newSyslog() + require.NoError(t, s.Init()) s.Address = "tcp://" + listener.Addr().String() err = s.Connect() @@ -108,6 +111,7 @@ func TestSyslogWriteWithUdp(t *testing.T) { require.NoError(t, err) s := newSyslog() + require.NoError(t, s.Init()) s.Address = "udp://" + listener.LocalAddr().String() err = s.Connect() @@ -140,7 +144,7 @@ func testSyslogWriteWithStream(t *testing.T, s *Syslog, lconn net.Conn) { } func testSyslogWriteWithPacket(t *testing.T, s *Syslog, lconn net.PacketConn) { - s.Framing = framing.NonTransparent + s.Framing = "non-transparent" metrics := []telegraf.Metric{} m1 := metric.New( "testmetric", @@ -168,6 +172,7 @@ func TestSyslogWriteErr(t *testing.T) { require.NoError(t, err) s := newSyslog() + require.NoError(t, s.Init()) s.Address = "tcp://" + listener.Addr().String() err = s.Connect() @@ -199,6 +204,7 @@ func TestSyslogWriteReconnect(t *testing.T) { require.NoError(t, err) s := newSyslog() + require.NoError(t, s.Init()) s.Address = "tcp://" + listener.Addr().String() err = s.Connect() diff --git a/testutil/capturelog.go b/testutil/capturelog.go index 2c21885525301..174a2605f08a8 100644 --- a/testutil/capturelog.go +++ b/testutil/capturelog.go @@ -89,6 +89,12 @@ func (l *CaptureLogger) Info(args ...interface{}) { l.loga(LevelInfo, args...) } +func (l *CaptureLogger) NMessages() int { + l.Lock() + defer l.Unlock() + return len(l.messages) +} + func (l *CaptureLogger) Messages() []Entry { l.Lock() msgs := make([]Entry, len(l.messages))