Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(inputs.syslog): Use common/socket implementation #14837

Merged
merged 20 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 0 additions & 63 deletions internal/syslog/framing.go

This file was deleted.

44 changes: 0 additions & 44 deletions internal/syslog/framing_test.go

This file was deleted.

107 changes: 84 additions & 23 deletions plugins/common/socket/datagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ package socket
import (
"errors"
"fmt"
"io"
"net"
"net/url"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
Expand All @@ -18,48 +22,101 @@ type packetListener struct {
MaxDecompressionSize int64
SocketMode string
ReadBufferSize int
OnData CallbackData
OnError CallbackError
Log telegraf.Logger

conn net.PacketConn
decoder internal.ContentDecoder
path string
wg sync.WaitGroup
}

func (l *packetListener) listen() {
buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet
for {
n, _, err := l.conn.ReadFrom(buf)
if err != nil {
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
if l.OnError != nil {
l.OnError(err)
func (l *packetListener) listenData(onData CallbackData, onError CallbackError) {
l.wg.Add(1)

go func() {
defer l.wg.Done()

buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet
for {
n, src, err := l.conn.ReadFrom(buf)
if err != nil {
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
if onError != nil {
onError(err)
}
}
break
}

body, err := l.decoder.Decode(buf[:n])
if err != nil && onError != nil {
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
}
break
}

body, err := l.decoder.Decode(buf[:n])
if err != nil && l.OnError != nil {
l.OnError(fmt.Errorf("unable to decode incoming packet: %w", err))
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
}
onData(src, body)
}
}()
}

l.OnData(body)
}
func (l *packetListener) listenConnection(onConnection CallbackConnection, onError CallbackError) {
l.wg.Add(1)
go func() {
defer l.wg.Done()
defer l.conn.Close()

buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet
for {
// Wait for packets and read them
n, src, err := l.conn.ReadFrom(buf)
if err != nil {
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
if onError != nil {
onError(err)
}
}
break
}

// Decode the contents depending on the given encoding
body, err := l.decoder.Decode(buf[:n])
if err != nil && onError != nil {
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
}

// Workaround to provide remote endpoints for Unix-type sockets
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
}

// Create a pipe and notify the caller via Callback that new data is
// available. Afterwards write the data. Please note: Write() will
// blocks until all data is consumed!
reader, writer := io.Pipe()
go onConnection(src, reader)
if _, err := writer.Write(body); err != nil && onError != nil {
onError(err)
}
writer.Close()
}
}()
}

func (l *packetListener) setupUnixgram(u *url.URL, socketMode string) error {
err := os.Remove(u.Path)
if err != nil && !errors.Is(err, os.ErrNotExist) {
l.path = filepath.FromSlash(u.Path)
if runtime.GOOS == "windows" && strings.Contains(l.path, ":") {
l.path = strings.TrimPrefix(l.path, `\`)
}
if err := os.Remove(l.path); err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("removing socket failed: %w", err)
}

conn, err := net.ListenPacket(u.Scheme, u.Path)
conn, err := net.ListenPacket(u.Scheme, l.path)
if err != nil {
return fmt.Errorf("listening (unixgram) failed: %w", err)
}
l.path = u.Path
l.conn = conn

// Set permissions on socket
Expand Down Expand Up @@ -167,10 +224,14 @@ func (l *packetListener) close() error {
if err := l.conn.Close(); err != nil {
return err
}
l.wg.Wait()

if l.path != "" {
err := os.Remove(l.path)
if err != nil && !errors.Is(err, os.ErrNotExist) {
fn := filepath.FromSlash(l.path)
if runtime.GOOS == "windows" && strings.Contains(fn, ":") {
fn = strings.TrimPrefix(fn, `\`)
}
if err := os.Remove(fn); err != nil && !errors.Is(err, os.ErrNotExist) {
// Ignore file-not-exists errors when removing the socket
return err
}
Expand Down
37 changes: 37 additions & 0 deletions plugins/common/socket/socket.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
## Permission for unix sockets (only available on unix sockets)
## This setting may not be respected by some platforms. To safely restrict
## permissions it is recommended to place the socket into a previously
## created directory with the desired permissions.
## ex: socket_mode = "777"
# socket_mode = ""

## Maximum number of concurrent connections (only available on stream sockets like TCP)
## Zero means unlimited.
# max_connections = 0

## Read timeout (only available on stream sockets like TCP)
## Zero means unlimited.
# read_timeout = "0s"

## Optional TLS configuration (only available on stream sockets like TCP)
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Enables client authentication if set.
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]

## Maximum socket buffer size (in bytes when no unit specified)
## For stream sockets, once the buffer fills up, the sender will start
## backing up. For datagram sockets, once the buffer fills up, metrics will
## start dropping. Defaults to the OS default.
# read_buffer_size = "64KiB"

## Period between keep alive probes (only applies to TCP sockets)
## Zero disables keep alive probes. Defaults to the OS configuration.
# keep_alive_period = "5m"

## Content encoding for message payloads
## Can be set to "gzip" for compressed payloads or "identity" for no encoding.
# content_encoding = "identity"

## Maximum size of decoded packet (in bytes when no unit specified)
# max_decompression_size = "500MB"
Loading
Loading