Skip to content

Commit

Permalink
chore(inputs.syslog): Use common/socket implementation (#14837)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Feb 23, 2024
1 parent f0656a4 commit 0e6b4f0
Show file tree
Hide file tree
Showing 19 changed files with 1,268 additions and 765 deletions.
63 changes: 0 additions & 63 deletions internal/syslog/framing.go

This file was deleted.

44 changes: 0 additions & 44 deletions internal/syslog/framing_test.go

This file was deleted.

107 changes: 84 additions & 23 deletions plugins/common/socket/datagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ package socket
import (
"errors"
"fmt"
"io"
"net"
"net/url"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
Expand All @@ -18,48 +22,101 @@ type packetListener struct {
MaxDecompressionSize int64
SocketMode string
ReadBufferSize int
OnData CallbackData
OnError CallbackError
Log telegraf.Logger

conn net.PacketConn
decoder internal.ContentDecoder
path string
wg sync.WaitGroup
}

func (l *packetListener) listen() {
buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet
for {
n, _, err := l.conn.ReadFrom(buf)
if err != nil {
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
if l.OnError != nil {
l.OnError(err)
func (l *packetListener) listenData(onData CallbackData, onError CallbackError) {
l.wg.Add(1)

go func() {
defer l.wg.Done()

buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet
for {
n, src, err := l.conn.ReadFrom(buf)
if err != nil {
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
if onError != nil {
onError(err)
}
}
break
}

body, err := l.decoder.Decode(buf[:n])
if err != nil && onError != nil {
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
}
break
}

body, err := l.decoder.Decode(buf[:n])
if err != nil && l.OnError != nil {
l.OnError(fmt.Errorf("unable to decode incoming packet: %w", err))
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
}
onData(src, body)
}
}()
}

l.OnData(body)
}
func (l *packetListener) listenConnection(onConnection CallbackConnection, onError CallbackError) {
l.wg.Add(1)
go func() {
defer l.wg.Done()
defer l.conn.Close()

buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet
for {
// Wait for packets and read them
n, src, err := l.conn.ReadFrom(buf)
if err != nil {
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
if onError != nil {
onError(err)
}
}
break
}

// Decode the contents depending on the given encoding
body, err := l.decoder.Decode(buf[:n])
if err != nil && onError != nil {
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
}

// Workaround to provide remote endpoints for Unix-type sockets
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
}

// Create a pipe and notify the caller via Callback that new data is
// available. Afterwards write the data. Please note: Write() will
// blocks until all data is consumed!
reader, writer := io.Pipe()
go onConnection(src, reader)
if _, err := writer.Write(body); err != nil && onError != nil {
onError(err)
}
writer.Close()
}
}()
}

func (l *packetListener) setupUnixgram(u *url.URL, socketMode string) error {
err := os.Remove(u.Path)
if err != nil && !errors.Is(err, os.ErrNotExist) {
l.path = filepath.FromSlash(u.Path)
if runtime.GOOS == "windows" && strings.Contains(l.path, ":") {
l.path = strings.TrimPrefix(l.path, `\`)
}
if err := os.Remove(l.path); err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("removing socket failed: %w", err)
}

conn, err := net.ListenPacket(u.Scheme, u.Path)
conn, err := net.ListenPacket(u.Scheme, l.path)
if err != nil {
return fmt.Errorf("listening (unixgram) failed: %w", err)
}
l.path = u.Path
l.conn = conn

// Set permissions on socket
Expand Down Expand Up @@ -167,10 +224,14 @@ func (l *packetListener) close() error {
if err := l.conn.Close(); err != nil {
return err
}
l.wg.Wait()

if l.path != "" {
err := os.Remove(l.path)
if err != nil && !errors.Is(err, os.ErrNotExist) {
fn := filepath.FromSlash(l.path)
if runtime.GOOS == "windows" && strings.Contains(fn, ":") {
fn = strings.TrimPrefix(fn, `\`)
}
if err := os.Remove(fn); err != nil && !errors.Is(err, os.ErrNotExist) {
// Ignore file-not-exists errors when removing the socket
return err
}
Expand Down
37 changes: 37 additions & 0 deletions plugins/common/socket/socket.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
## Permission for unix sockets (only available on unix sockets)
## This setting may not be respected by some platforms. To safely restrict
## permissions it is recommended to place the socket into a previously
## created directory with the desired permissions.
## ex: socket_mode = "777"
# socket_mode = ""

## Maximum number of concurrent connections (only available on stream sockets like TCP)
## Zero means unlimited.
# max_connections = 0

## Read timeout (only available on stream sockets like TCP)
## Zero means unlimited.
# read_timeout = "0s"

## Optional TLS configuration (only available on stream sockets like TCP)
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Enables client authentication if set.
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]

## Maximum socket buffer size (in bytes when no unit specified)
## For stream sockets, once the buffer fills up, the sender will start
## backing up. For datagram sockets, once the buffer fills up, metrics will
## start dropping. Defaults to the OS default.
# read_buffer_size = "64KiB"

## Period between keep alive probes (only applies to TCP sockets)
## Zero disables keep alive probes. Defaults to the OS configuration.
# keep_alive_period = "5m"

## Content encoding for message payloads
## Can be set to "gzip" for compressed payloads or "identity" for no encoding.
# content_encoding = "identity"

## Maximum size of decoded packet (in bytes when no unit specified)
# max_decompression_size = "500MB"
Loading

0 comments on commit 0e6b4f0

Please sign in to comment.