Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

collector/filesystem: s/MNT_NOWAIT/MNT_WAIT #2960

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions collector/filesystem_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package collector

import (
"errors"
"time"
"unsafe"

"github.com/go-kit/log/level"
Expand All @@ -38,17 +39,28 @@ const (
readOnly = 0x1 // MNT_RDONLY
)

// Expose filesystem fullness.
// GetStats exposes filesystem fullness.
func (c *filesystemCollector) GetStats() (stats []filesystemStats, err error) {
var mntbuf *C.struct_statfs
count := C.getmntinfo(&mntbuf, C.MNT_NOWAIT)
if count == 0 {
return nil, errors.New("getmntinfo() failed")
// `getmntinfo` relies on `getfsstat` in some variants, and is blocking in general.
count := 0
countCh := make(chan int, 1)
var mountBuf *C.struct_statfs
go func(mountBuf **C.struct_statfs) {
countCh <- int(C.getmntinfo(mountBuf, C.MNT_WAIT))
close(countCh)
}(&mountBuf)
select {
case count = <-countCh:
if count <= 0 {
return nil, errors.New("getmntinfo failed")
}
case <-time.After(*mountTimeout):
return nil, errors.New("getmntinfo timed out")
}

mnt := (*[1 << 20]C.struct_statfs)(unsafe.Pointer(mntbuf))
mnt := (*[1 << 20]C.struct_statfs)(unsafe.Pointer(mountBuf))
stats = []filesystemStats{}
for i := 0; i < int(count); i++ {
for i := 0; i < count; i++ {
mountpoint := C.GoString(&mnt[i].f_mntonname[0])
if c.excludedMountPointsPattern.MatchString(mountpoint) {
level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", mountpoint)
Expand Down
3 changes: 3 additions & 0 deletions collector/filesystem_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ var (
"collector.filesystem.ignored-mount-points",
"Regexp of mount points to ignore for filesystem collector.",
).Hidden().String()
mountTimeout = kingpin.Flag("collector.filesystem.mount-timeout",
"how long to wait for a mount to respond before marking it as stale").
Hidden().Default("5s").Duration()

fsTypesExcludeSet bool
fsTypesExclude = kingpin.Flag(
Expand Down
43 changes: 36 additions & 7 deletions collector/filesystem_freebsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package collector

import (
"errors"
"time"

"github.com/go-kit/log/level"
"golang.org/x/sys/unix"
)
Expand All @@ -28,16 +31,42 @@ const (

// Expose filesystem fullness.
func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
n, err := unix.Getfsstat(nil, unix.MNT_NOWAIT)
if err != nil {
var mountPointCount int
nChan := make(chan int, 1)
errChan := make(chan error, 1)
go func() {
var err error
var n int
n, err = unix.Getfsstat(nil, unix.MNT_WAIT)
if err != nil {
errChan <- err
return
}
nChan <- n
}()
select {
case mountPointCount = <-nChan:
case err := <-errChan:
return nil, err
case <-time.After(*mountTimeout):
return nil, errors.New("getfsstat timed out")
}
buf := make([]unix.Statfs_t, n)
_, err = unix.Getfsstat(buf, unix.MNT_NOWAIT)
if err != nil {
return nil, err

buf := make([]unix.Statfs_t, mountPointCount)
go func(buf []unix.Statfs_t) {
_, err := unix.Getfsstat(buf, unix.MNT_WAIT)
errChan <- err
}(buf)
select {
case err := <-errChan:
if err != nil {
return nil, err
}
case <-time.After(*mountTimeout):
return nil, errors.New("getfsstat timed out")
}
stats := []filesystemStats{}

var stats []filesystemStats
for _, fs := range buf {
mountpoint := unix.ByteSliceToString(fs.Mntonname[:])
if c.excludedMountPointsPattern.MatchString(mountpoint) {
Expand Down
138 changes: 58 additions & 80 deletions collector/filesystem_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,114 +37,115 @@ const (
defFSTypesExcluded = "^(autofs|binfmt_misc|bpf|cgroup2?|configfs|debugfs|devpts|devtmpfs|fusectl|hugetlbfs|iso9660|mqueue|nsfs|overlay|proc|procfs|pstore|rpc_pipefs|securityfs|selinuxfs|squashfs|sysfs|tracefs)$"
)

var mountTimeout = kingpin.Flag("collector.filesystem.mount-timeout",
"how long to wait for a mount to respond before marking it as stale").
Hidden().Default("5s").Duration()
var statWorkerCount = kingpin.Flag("collector.filesystem.stat-workers",
"how many stat calls to process simultaneously").
Hidden().Default("4").Int()
var stuckMounts = make(map[string]struct{})
var stuckMountsMtx = &sync.Mutex{}
var stuckMountsMap = make(map[string]struct{})
var stuckMountsMutex = &sync.Mutex{}

// GetStats returns filesystem stats.
func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
mps, err := mountPointDetails(c.logger)
fsLabels, err := mountPointDetails(c.logger)
if err != nil {
return nil, err
}
stats := []filesystemStats{}
labelChan := make(chan filesystemLabels)
statChan := make(chan filesystemStats)
var fsStats []filesystemStats
fsLabelChan := make(chan filesystemLabels)
fsStatChan := make(chan filesystemStats)
wg := sync.WaitGroup{}

workerCount := *statWorkerCount
if workerCount < 1 {
workerCount = 1
}

for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for labels := range labelChan {
statChan <- c.processStat(labels)
for fsLabel := range fsLabelChan {
fsStatChan <- c.processStat(fsLabel)
}
}()
}

go func() {
for _, labels := range mps {
if c.excludedMountPointsPattern.MatchString(labels.mountPoint) {
level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", labels.mountPoint)
for _, fsLabel := range fsLabels {
if c.excludedMountPointsPattern.MatchString(fsLabel.mountPoint) {
level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", fsLabel.mountPoint)
continue
}
if c.excludedFSTypesPattern.MatchString(labels.fsType) {
level.Debug(c.logger).Log("msg", "Ignoring fs", "type", labels.fsType)
continue
}

stuckMountsMtx.Lock()
if _, ok := stuckMounts[labels.mountPoint]; ok {
labels.deviceError = "mountpoint timeout"
stats = append(stats, filesystemStats{
labels: labels,
deviceError: 1,
})
level.Debug(c.logger).Log("msg", "Mount point is in an unresponsive state", "mountpoint", labels.mountPoint)
stuckMountsMtx.Unlock()
if c.excludedFSTypesPattern.MatchString(fsLabel.fsType) {
level.Debug(c.logger).Log("msg", "Ignoring fs", "type", fsLabel.fsType)
continue
}

stuckMountsMtx.Unlock()
labelChan <- labels
fsLabelChan <- fsLabel
}
close(labelChan)
close(fsLabelChan)
wg.Wait()
close(statChan)
close(fsStatChan)
}()

for stat := range statChan {
stats = append(stats, stat)
for fsStat := range fsStatChan {
fsStats = append(fsStats, fsStat)
}
return stats, nil
return fsStats, nil
}

func (c *filesystemCollector) processStat(labels filesystemLabels) filesystemStats {
func (c *filesystemCollector) processStat(fsLabel filesystemLabels) filesystemStats {
var ro float64
for _, option := range strings.Split(labels.options, ",") {
for _, option := range strings.Split(fsLabel.options, ",") {
if option == "ro" {
ro = 1
break
}
}

success := make(chan struct{})
go stuckMountWatcher(labels.mountPoint, success, c.logger)

// If the mount point is stuck, mark it as such and return early.
// This is done to avoid blocking the stat call indefinitely.
// NOTE: For instance, this can happen when an NFS mount is unreachable.
buf := new(unix.Statfs_t)
err := unix.Statfs(rootfsFilePath(labels.mountPoint), buf)
stuckMountsMtx.Lock()
close(success)

// If the mount has been marked as stuck, unmark it and log it's recovery.
if _, ok := stuckMounts[labels.mountPoint]; ok {
level.Debug(c.logger).Log("msg", "Mount point has recovered, monitoring will resume", "mountpoint", labels.mountPoint)
delete(stuckMounts, labels.mountPoint)
statFsErrChan := make(chan error, 1)
go func(buf *unix.Statfs_t) {
statFsErrChan <- unix.Statfs(rootfsFilePath(fsLabel.mountPoint), buf)
close(statFsErrChan)
}(buf)

select {
case err := <-statFsErrChan:
if err != nil {
level.Debug(c.logger).Log("msg", "Error on statfs() system call", "rootfs", rootfsFilePath(fsLabel.mountPoint), "err", err)
fsLabel.deviceError = err.Error()
}
case <-time.After(*mountTimeout):
stuckMountsMutex.Lock()
if _, ok := stuckMountsMap[fsLabel.mountPoint]; ok {
level.Debug(c.logger).Log("msg", "Mount point timed out, it is being labeled as stuck and will not be monitored", "mountpoint", fsLabel.mountPoint)
stuckMountsMap[fsLabel.mountPoint] = struct{}{}
fsLabel.deviceError = "mountpoint timeout"
}
stuckMountsMutex.Unlock()
}
stuckMountsMtx.Unlock()

if err != nil {
labels.deviceError = err.Error()
level.Debug(c.logger).Log("msg", "Error on statfs() system call", "rootfs", rootfsFilePath(labels.mountPoint), "err", err)
// Check if the mount point has recovered and remove it from the stuck map.
if _, isOpen := <-statFsErrChan; !isOpen {
stuckMountsMutex.Lock()
if _, ok := stuckMountsMap[fsLabel.mountPoint]; ok {
level.Debug(c.logger).Log("msg", "Mount point has recovered, monitoring will resume", "mountpoint", fsLabel.mountPoint)
delete(stuckMountsMap, fsLabel.mountPoint)
}
stuckMountsMutex.Unlock()
}

// If the mount point is stuck or statfs errored, mark it as such and return.
if fsLabel.deviceError != "" {
return filesystemStats{
labels: labels,
labels: fsLabel,
deviceError: 1,
ro: ro,
}
}

return filesystemStats{
labels: labels,
labels: fsLabel,
size: float64(buf.Blocks) * float64(buf.Bsize),
free: float64(buf.Bfree) * float64(buf.Bsize),
avail: float64(buf.Bavail) * float64(buf.Bsize),
Expand All @@ -154,29 +155,6 @@ func (c *filesystemCollector) processStat(labels filesystemLabels) filesystemSta
}
}

// stuckMountWatcher listens on the given success channel and if the channel closes
// then the watcher does nothing. If instead the timeout is reached, the
// mount point that is being watched is marked as stuck.
func stuckMountWatcher(mountPoint string, success chan struct{}, logger log.Logger) {
mountCheckTimer := time.NewTimer(*mountTimeout)
defer mountCheckTimer.Stop()
select {
case <-success:
// Success
case <-mountCheckTimer.C:
// Timed out, mark mount as stuck
stuckMountsMtx.Lock()
select {
case <-success:
// Success came in just after the timeout was reached, don't label the mount as stuck
default:
level.Debug(logger).Log("msg", "Mount point timed out, it is being labeled as stuck and will not be monitored", "mountpoint", mountPoint)
stuckMounts[mountPoint] = struct{}{}
}
stuckMountsMtx.Unlock()
}
}

func mountPointDetails(logger log.Logger) ([]filesystemLabels, error) {
file, err := os.Open(procFilePath("1/mounts"))
if errors.Is(err, os.ErrNotExist) {
Expand Down
49 changes: 38 additions & 11 deletions collector/filesystem_openbsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package collector

import (
"errors"
"time"

"github.com/go-kit/log/level"
"golang.org/x/sys/unix"
)
Expand All @@ -26,21 +29,45 @@ const (
defFSTypesExcluded = "^devfs$"
)

// Expose filesystem fullness.
func (c *filesystemCollector) GetStats() (stats []filesystemStats, err error) {
var mnt []unix.Statfs_t
size, err := unix.Getfsstat(mnt, unix.MNT_NOWAIT)
if err != nil {
return nil, err
// GetStats exposes filesystem fullness.
func (c *filesystemCollector) GetStats() (stats []filesystemStats, fsstatErr error) {
var mountPointCount int
nChan := make(chan int, 1)
errChan := make(chan error, 1)
go func() {
var statErr error
var n int
n, statErr = unix.Getfsstat(nil, unix.MNT_WAIT)
if statErr != nil {
errChan <- statErr
return
}
nChan <- n
}()
select {
case mountPointCount = <-nChan:
case statErr := <-errChan:
return nil, statErr
case <-time.After(*mountTimeout):
return nil, errors.New("getfsstat timed out")
}
mnt = make([]unix.Statfs_t, size)
_, err = unix.Getfsstat(mnt, unix.MNT_NOWAIT)
if err != nil {
return nil, err

buf := make([]unix.Statfs_t, mountPointCount)
go func(buf []unix.Statfs_t) {
_, fsstatErr = unix.Getfsstat(buf, unix.MNT_WAIT)
errChan <- fsstatErr
}(buf)
select {
case err := <-errChan:
if err != nil {
return nil, err
}
case <-time.After(*mountTimeout):
return nil, errors.New("getfsstat timed out")
}

stats = []filesystemStats{}
for _, v := range mnt {
for _, v := range buf {
mountpoint := unix.ByteSliceToString(v.F_mntonname[:])
if c.excludedMountPointsPattern.MatchString(mountpoint) {
level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", mountpoint)
Expand Down
Loading