From 43df788546956a2504a448ce4f760789978e1b92 Mon Sep 17 00:00:00 2001 From: Bojan Date: Tue, 17 Aug 2021 22:29:04 -0300 Subject: [PATCH 1/2] Add filenotify package that supports polling watcher. Add command line arguments to fluent-bit-watcher. Add polling watcher support to fluent-bit-watcher. Signed-off-by: Bojan --- .../v1alpha2/fluentbit_types.go | 2 + cmd/fluent-bit-watcher/Dockerfile | 15 +- cmd/fluent-bit-watcher/main.go | 58 ++- .../logging.kubesphere.io_fluentbits.yaml | 5 + manifests/setup/fluentbit-operator-crd.yaml | 5 + manifests/setup/setup.yaml | 8 +- pkg/filenotify/filenotify.go | 53 +++ pkg/filenotify/fsnotify.go | 24 ++ pkg/filenotify/poller.go | 330 ++++++++++++++++++ pkg/operator/daemonset.go | 1 + 10 files changed, 480 insertions(+), 21 deletions(-) create mode 100644 pkg/filenotify/filenotify.go create mode 100644 pkg/filenotify/fsnotify.go create mode 100644 pkg/filenotify/poller.go diff --git a/api/fluentbitoperator/v1alpha2/fluentbit_types.go b/api/fluentbitoperator/v1alpha2/fluentbit_types.go index ffb1c7504..9e1f99406 100644 --- a/api/fluentbitoperator/v1alpha2/fluentbit_types.go +++ b/api/fluentbitoperator/v1alpha2/fluentbit_types.go @@ -28,6 +28,8 @@ import ( type FluentBitSpec struct { // Fluent Bit image. Image string `json:"image,omitempty"` + // Fluent Bit Watcher command line arguments. + Args []string `json:"args,omitempty"` // Fluent Bit image pull policy. ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"` // Fluent Bit image pull secret diff --git a/cmd/fluent-bit-watcher/Dockerfile b/cmd/fluent-bit-watcher/Dockerfile index fb7e7a845..68ba13378 100644 --- a/cmd/fluent-bit-watcher/Dockerfile +++ b/cmd/fluent-bit-watcher/Dockerfile @@ -1,19 +1,16 @@ FROM golang:1.13.6-alpine3.11 as buildergo RUN mkdir -p /fluent-bit -COPY cmd/fluent-bit-watcher/main.go go.mod /fluent-bit/ -WORKDIR /fluent-bit -RUN CGO_ENABLED=0 go build -i -ldflags '-w -s' -o fluent-bit main.go +RUN mkdir -p /code +COPY . /code/ +WORKDIR /code +RUN echo $(ls -al /code) +RUN CGO_ENABLED=0 go build -i -ldflags '-w -s' -o /fluent-bit/fluent-bit /code/cmd/fluent-bit-watcher/main.go -# FROM gcr.io/distroless/cc-debian10 FROM fluent/fluent-bit:1.8.3 -MAINTAINER KubeSphere LABEL Description="Fluent Bit docker image" Vendor="KubeSphere" Version="1.0" COPY conf/fluent-bit.conf conf/parsers.conf /fluent-bit/etc/ COPY --from=buildergo /fluent-bit/fluent-bit /fluent-bit/bin/fluent-bit-watcher -# -EXPOSE 2020 - # Entry point -CMD ["/fluent-bit/bin/fluent-bit-watcher", "-c", "/fluent-bit/etc/fluent-bit.conf"] +CMD ["/fluent-bit/bin/fluent-bit-watcher"] diff --git a/cmd/fluent-bit-watcher/main.go b/cmd/fluent-bit-watcher/main.go index f2733f153..09ef375ea 100644 --- a/cmd/fluent-bit-watcher/main.go +++ b/cmd/fluent-bit-watcher/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "flag" "math" "os" "os/exec" @@ -14,14 +15,17 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" + "kubesphere.io/fluentbit-operator/pkg/filenotify" ) const ( - binPath = "/fluent-bit/bin/fluent-bit" - cfgPath = "/fluent-bit/etc/fluent-bit.conf" - watchDir = "/fluent-bit/config" - MaxDelayTime = time.Minute * 5 - ResetTime = time.Minute * 10 + defaultBinPath = "/fluent-bit/bin/fluent-bit" + defaultCfgPath = "/fluent-bit/etc/fluent-bit.conf" + defaultWatchDir = "/fluent-bit/config" + defaultPollInterval = 1 * time.Second + + MaxDelayTime = 5 * time.Minute + ResetTime = 10 * time.Minute ) var ( @@ -33,7 +37,22 @@ var ( timerCancel context.CancelFunc ) +var configPath string +var binPath string +var watchPath string +var poll bool +var pollInterval time.Duration + func main() { + + flag.StringVar(&binPath, "b", defaultBinPath, "The fluent bit binary path.") + flag.StringVar(&configPath, "c", defaultCfgPath, "The config file path.") + flag.StringVar(&watchPath, "watch-path", defaultWatchDir, "The path to watch.") + flag.BoolVar(&poll, "poll", true, "Use poll watcher instead of ionotify.") + flag.DurationVar(&pollInterval, "poll-interval", defaultPollInterval, "Poll interval if using poll watcher.") + + flag.Parse() + logger = log.NewLogfmtLogger(os.Stdout) timerCtx, timerCancel = context.WithCancel(context.Background()) @@ -77,14 +96,14 @@ func main() { } { // Watch the config file, if the config file changed, stop Fluent bit. - watcher, err := fsnotify.NewWatcher() + watcher, err := newWatcher(poll, pollInterval) if err != nil { _ = level.Error(logger).Log("err", err) return } // Start watcher. - err = watcher.Add(watchDir) + err = watcher.Add(watchPath) if err != nil { _ = level.Error(logger).Log("err", err) return @@ -98,7 +117,7 @@ func main() { select { case <-cancel: return nil - case event := <-watcher.Events: + case event := <-watcher.Events(): if !isValidEvent(event) { continue } @@ -110,7 +129,7 @@ func main() { stop() resetTimer() _ = level.Info(logger).Log("msg", "Config file changed, stopped Fluent Bit") - case <-watcher.Errors: + case <-watcher.Errors(): _ = level.Error(logger).Log("msg", "Watcher stopped") return nil } @@ -130,9 +149,26 @@ func main() { _ = level.Info(logger).Log("msg", "See you next time!") } +func newWatcher(poll bool, interval time.Duration) (filenotify.FileWatcher, error) { + var err error + var watcher filenotify.FileWatcher + + if poll { + watcher = filenotify.NewPollingWatcher(interval) + } else { + watcher, err = filenotify.New(interval) + } + + if err != nil { + return nil, err + } + + return watcher, nil +} + // Inspired by https://github.com/jimmidyson/configmap-reload func isValidEvent(event fsnotify.Event) bool { - return event.Op&fsnotify.Create == fsnotify.Create + return event.Op == fsnotify.Create || event.Op == fsnotify.Write } func start() { @@ -144,7 +180,7 @@ func start() { return } - cmd = exec.Command(binPath, "-c", cfgPath) + cmd = exec.Command(binPath, "-c", configPath) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr if err := cmd.Start(); err != nil { diff --git a/config/crd/bases/logging.kubesphere.io_fluentbits.yaml b/config/crd/bases/logging.kubesphere.io_fluentbits.yaml index 62caaf8c5..e88cb4c81 100644 --- a/config/crd/bases/logging.kubesphere.io_fluentbits.yaml +++ b/config/crd/bases/logging.kubesphere.io_fluentbits.yaml @@ -629,6 +629,11 @@ spec: type: array type: object type: object + args: + description: Fluent Bit Watcher command line arguments. + items: + type: string + type: array containerLogRealPath: description: Container log path type: string diff --git a/manifests/setup/fluentbit-operator-crd.yaml b/manifests/setup/fluentbit-operator-crd.yaml index 28fcac19e..7a0ac37d3 100644 --- a/manifests/setup/fluentbit-operator-crd.yaml +++ b/manifests/setup/fluentbit-operator-crd.yaml @@ -1335,6 +1335,11 @@ spec: type: array type: object type: object + args: + description: Fluent Bit Watcher command line arguments. + items: + type: string + type: array containerLogRealPath: description: Container log path type: string diff --git a/manifests/setup/setup.yaml b/manifests/setup/setup.yaml index fda996d56..42598c17b 100644 --- a/manifests/setup/setup.yaml +++ b/manifests/setup/setup.yaml @@ -1340,6 +1340,11 @@ spec: type: array type: object type: object + args: + description: Fluent Bit Watcher command line arguments. + items: + type: string + type: array containerLogRealPath: description: Container log path type: string @@ -4351,7 +4356,8 @@ spec: - command: - /bin/sh - -c - - set -ex; echo CONTAINER_ROOT_DIR=$(docker info -f '{{.DockerRootDir}}') > /fluentbit-operator/fluent-bit.env + - set -ex; echo CONTAINER_ROOT_DIR=$(docker info -f '{{.DockerRootDir}}') + > /fluentbit-operator/fluent-bit.env image: docker:19.03 name: setenv volumeMounts: diff --git a/pkg/filenotify/filenotify.go b/pkg/filenotify/filenotify.go new file mode 100644 index 000000000..d87d7fc4c --- /dev/null +++ b/pkg/filenotify/filenotify.go @@ -0,0 +1,53 @@ +// Adapted from https://github.com/gohugoio/hugo +// Apache License 2.0 +// Copyright Hugo Authors +// +// Package filenotify provides a mechanism for watching file(s) for changes. +// Generally leans on fsnotify, but provides a poll-based notifier which fsnotify does not support. +// These are wrapped up in a common interface so that either can be used interchangeably in your code. +// +// This package is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License. +// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9 +package filenotify + +import ( + "time" + + "github.com/fsnotify/fsnotify" +) + +// FileWatcher is an interface for implementing file notification watchers +type FileWatcher interface { + Events() <-chan fsnotify.Event + Errors() <-chan error + Add(name string) error + Remove(name string) error + Close() error +} + +// New tries to use an fs-event watcher, and falls back to the poller if there is an error +func New(interval time.Duration) (FileWatcher, error) { + if watcher, err := NewEventWatcher(); err == nil { + return watcher, nil + } + return NewPollingWatcher(interval), nil +} + +// NewPollingWatcher returns a poll-based file watcher +func NewPollingWatcher(interval time.Duration) FileWatcher { + return &filePoller{ + interval: interval, + done: make(chan struct{}), + events: make(chan fsnotify.Event), + errors: make(chan error), + } +} + +// NewEventWatcher returns an fs-event based file watcher +func NewEventWatcher() (FileWatcher, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + return &fsNotifyWatcher{watcher}, nil +} diff --git a/pkg/filenotify/fsnotify.go b/pkg/filenotify/fsnotify.go new file mode 100644 index 000000000..84f0393f2 --- /dev/null +++ b/pkg/filenotify/fsnotify.go @@ -0,0 +1,24 @@ +// Adapted from https://github.com/gohugoio/hugo +// Apache License 2.0 +// Copyright Hugo Authors +// +// Package filenotify is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License. +// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9 +package filenotify + +import "github.com/fsnotify/fsnotify" + +// fsNotifyWatcher wraps the fsnotify package to satisfy the FileNotifier interface +type fsNotifyWatcher struct { + *fsnotify.Watcher +} + +// Events returns the fsnotify event channel receiver +func (w *fsNotifyWatcher) Events() <-chan fsnotify.Event { + return w.Watcher.Events +} + +// Errors returns the fsnotify error channel receiver +func (w *fsNotifyWatcher) Errors() <-chan error { + return w.Watcher.Errors +} diff --git a/pkg/filenotify/poller.go b/pkg/filenotify/poller.go new file mode 100644 index 000000000..ea138ee26 --- /dev/null +++ b/pkg/filenotify/poller.go @@ -0,0 +1,330 @@ +// Adapted from https://github.com/gohugoio/hugo +// Apache License 2.0 +// Copyright Hugo Authors +// +// Package filenotify is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License. +// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9 +package filenotify + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + "github.com/fsnotify/fsnotify" +) + +var ( + // errPollerClosed is returned when the poller is closed + errPollerClosed = errors.New("poller is closed") + // errNoSuchWatch is returned when trying to remove a watch that doesn't exist + errNoSuchWatch = errors.New("watch does not exist") +) + +// filePoller is used to poll files for changes, especially in cases where fsnotify +// can't be run (e.g. when inotify handles are exhausted) +// filePoller satisfies the FileWatcher interface +type filePoller struct { + // the duration between polls. + interval time.Duration + // watches is the list of files currently being polled, close the associated channel to stop the watch + watches map[string]struct{} + // Will be closed when done. + done chan struct{} + // events is the channel to listen to for watch events + events chan fsnotify.Event + // errors is the channel to listen to for watch errors + errors chan error + // mu locks the poller for modification + mu sync.Mutex + // closed is used to specify when the poller has already closed + closed bool +} + +// Add adds a filename to the list of watches +// once added the file is polled for changes in a separate goroutine +func (w *filePoller) Add(name string) error { + w.mu.Lock() + defer w.mu.Unlock() + + if w.closed { + return errPollerClosed + } + + item, err := newItemToWatch(name) + if err != nil { + return err + } + if item.left.FileInfo == nil { + return os.ErrNotExist + } + + if w.watches == nil { + w.watches = make(map[string]struct{}) + } + if _, exists := w.watches[name]; exists { + return fmt.Errorf("watch exists") + } + w.watches[name] = struct{}{} + + go w.watch(item) + return nil +} + +// Remove stops and removes watch with the specified name +func (w *filePoller) Remove(name string) error { + w.mu.Lock() + defer w.mu.Unlock() + return w.remove(name) +} + +func (w *filePoller) remove(name string) error { + if w.closed { + return errPollerClosed + } + + _, exists := w.watches[name] + if !exists { + return errNoSuchWatch + } + delete(w.watches, name) + return nil +} + +// Events returns the event channel +// This is used for notifications on events about watched files +func (w *filePoller) Events() <-chan fsnotify.Event { + return w.events +} + +// Errors returns the errors channel +// This is used for notifications about errors on watched files +func (w *filePoller) Errors() <-chan error { + return w.errors +} + +// Close closes the poller +// All watches are stopped, removed, and the poller cannot be added to +func (w *filePoller) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + + if w.closed { + return nil + } + w.closed = true + close(w.done) + for name := range w.watches { + w.remove(name) + } + + return nil +} + +// sendEvent publishes the specified event to the events channel +func (w *filePoller) sendEvent(e fsnotify.Event) error { + select { + case w.events <- e: + case <-w.done: + return fmt.Errorf("closed") + } + return nil +} + +// sendErr publishes the specified error to the errors channel +func (w *filePoller) sendErr(e error) error { + select { + case w.errors <- e: + case <-w.done: + return fmt.Errorf("closed") + } + return nil +} + +// watch watches item for changes until done is closed. +func (w *filePoller) watch(item *itemToWatch) { + ticker := time.NewTicker(w.interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + case <-w.done: + return + } + + evs, err := item.checkForChanges() + if err != nil { + if err := w.sendErr(err); err != nil { + return + } + } + + item.left, item.right = item.right, item.left + + for _, ev := range evs { + if err := w.sendEvent(ev); err != nil { + return + } + } + + } +} + +// recording records the state of a file or a dir. +type recording struct { + os.FileInfo + + // Set if FileInfo is a dir. + entries map[string]os.FileInfo +} + +func (r *recording) clear() { + r.FileInfo = nil + if r.entries != nil { + for k := range r.entries { + delete(r.entries, k) + } + } +} + +func (r *recording) record(filename string) error { + r.clear() + + fi, err := os.Stat(filename) + if err != nil && !os.IsNotExist(err) { + return err + } + + if fi == nil { + return nil + } + + r.FileInfo = fi + + // If fi is a dir, we watch the files inside that directory (not recursively). + // This matches the behaviour of fsnotity. + if fi.IsDir() { + f, err := os.Open(filename) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + defer f.Close() + + fis, err := f.Readdir(-1) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + + for _, fi := range fis { + r.entries[fi.Name()] = fi + } + } + + return nil +} + +// itemToWatch may be a file or a dir. +type itemToWatch struct { + // Full path to the filename. + filename string + + // Snapshots of the stat state of this file or dir. + left *recording + right *recording +} + +func newItemToWatch(filename string) (*itemToWatch, error) { + r := &recording{ + entries: make(map[string]os.FileInfo), + } + err := r.record(filename) + if err != nil { + return nil, err + } + + return &itemToWatch{filename: filename, left: r}, nil + +} + +func (item *itemToWatch) checkForChanges() ([]fsnotify.Event, error) { + if item.right == nil { + item.right = &recording{ + entries: make(map[string]os.FileInfo), + } + } + + err := item.right.record(item.filename) + if err != nil && !os.IsNotExist(err) { + return nil, err + } + + dirOp := checkChange(item.left.FileInfo, item.right.FileInfo) + + if dirOp != 0 { + evs := []fsnotify.Event{fsnotify.Event{Op: dirOp, Name: item.filename}} + return evs, nil + } + + if item.left.FileInfo == nil || !item.left.IsDir() { + // Done. + return nil, nil + } + + leftIsIn := false + left, right := item.left.entries, item.right.entries + if len(right) > len(left) { + left, right = right, left + leftIsIn = true + } + + var evs []fsnotify.Event + + for name, fi1 := range left { + fi2 := right[name] + fil, fir := fi1, fi2 + if leftIsIn { + fil, fir = fir, fil + } + op := checkChange(fil, fir) + if op != 0 { + evs = append(evs, fsnotify.Event{Op: op, Name: filepath.Join(item.filename, name)}) + } + + } + + return evs, nil + +} + +func checkChange(fi1, fi2 os.FileInfo) fsnotify.Op { + if fi1 == nil && fi2 != nil { + return fsnotify.Create + } + if fi1 != nil && fi2 == nil { + return fsnotify.Remove + } + if fi1 == nil && fi2 == nil { + return 0 + } + if fi1.IsDir() || fi2.IsDir() { + return 0 + } + if fi1.Mode() != fi2.Mode() { + return fsnotify.Chmod + } + if fi1.ModTime() != fi2.ModTime() || fi1.Size() != fi2.Size() { + return fsnotify.Write + } + + return 0 +} diff --git a/pkg/operator/daemonset.go b/pkg/operator/daemonset.go index d6c5ce2be..d5430b7cd 100644 --- a/pkg/operator/daemonset.go +++ b/pkg/operator/daemonset.go @@ -112,6 +112,7 @@ func MakeDaemonSet(fb v1alpha2.FluentBit, logPath string) appsv1.DaemonSet { { Name: "fluent-bit", Image: fb.Spec.Image, + Args: fb.Spec.Args, ImagePullPolicy: fb.Spec.ImagePullPolicy, Ports: []corev1.ContainerPort{ { From 3fb4118d88c48fbbfa8fdcb6cb0aaed378e34c4c Mon Sep 17 00:00:00 2001 From: Bojan Date: Wed, 18 Aug 2021 14:25:03 -0300 Subject: [PATCH 2/2] set poll to default false. set watcher dockerfile entrypoint Signed-off-by: Bojan --- cmd/fluent-bit-watcher/Dockerfile | 2 +- cmd/fluent-bit-watcher/main.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/fluent-bit-watcher/Dockerfile b/cmd/fluent-bit-watcher/Dockerfile index 68ba13378..2eb50860a 100644 --- a/cmd/fluent-bit-watcher/Dockerfile +++ b/cmd/fluent-bit-watcher/Dockerfile @@ -13,4 +13,4 @@ COPY conf/fluent-bit.conf conf/parsers.conf /fluent-bit/etc/ COPY --from=buildergo /fluent-bit/fluent-bit /fluent-bit/bin/fluent-bit-watcher # Entry point -CMD ["/fluent-bit/bin/fluent-bit-watcher"] +ENTRYPOINT ["/fluent-bit/bin/fluent-bit-watcher"] diff --git a/cmd/fluent-bit-watcher/main.go b/cmd/fluent-bit-watcher/main.go index 09ef375ea..cfe7b12f2 100644 --- a/cmd/fluent-bit-watcher/main.go +++ b/cmd/fluent-bit-watcher/main.go @@ -48,7 +48,7 @@ func main() { flag.StringVar(&binPath, "b", defaultBinPath, "The fluent bit binary path.") flag.StringVar(&configPath, "c", defaultCfgPath, "The config file path.") flag.StringVar(&watchPath, "watch-path", defaultWatchDir, "The path to watch.") - flag.BoolVar(&poll, "poll", true, "Use poll watcher instead of ionotify.") + flag.BoolVar(&poll, "poll", false, "Use poll watcher instead of ionotify.") flag.DurationVar(&pollInterval, "poll-interval", defaultPollInterval, "Poll interval if using poll watcher.") flag.Parse()