Skip to content

Commit

Permalink
Merge pull request #126 from bojand/poll_watcher
Browse files Browse the repository at this point in the history
Add support for polling watcher
  • Loading branch information
Benjamin Huo committed Aug 19, 2021
2 parents 5a4a5f6 + 3fb4118 commit 5694f2a
Show file tree
Hide file tree
Showing 10 changed files with 480 additions and 21 deletions.
2 changes: 2 additions & 0 deletions api/fluentbitoperator/v1alpha2/fluentbit_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
type FluentBitSpec struct {
// Fluent Bit image.
Image string `json:"image,omitempty"`
// Fluent Bit Watcher command line arguments.
Args []string `json:"args,omitempty"`
// Fluent Bit image pull policy.
ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
// Fluent Bit image pull secret
Expand Down
15 changes: 6 additions & 9 deletions cmd/fluent-bit-watcher/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
FROM golang:1.13.6-alpine3.11 as buildergo
RUN mkdir -p /fluent-bit
COPY cmd/fluent-bit-watcher/main.go go.mod /fluent-bit/
WORKDIR /fluent-bit
RUN CGO_ENABLED=0 go build -i -ldflags '-w -s' -o fluent-bit main.go
RUN mkdir -p /code
COPY . /code/
WORKDIR /code
RUN echo $(ls -al /code)
RUN CGO_ENABLED=0 go build -i -ldflags '-w -s' -o /fluent-bit/fluent-bit /code/cmd/fluent-bit-watcher/main.go

# FROM gcr.io/distroless/cc-debian10
FROM fluent/fluent-bit:1.8.3
MAINTAINER KubeSphere <kubesphere@yunify.com>
LABEL Description="Fluent Bit docker image" Vendor="KubeSphere" Version="1.0"

COPY conf/fluent-bit.conf conf/parsers.conf /fluent-bit/etc/
COPY --from=buildergo /fluent-bit/fluent-bit /fluent-bit/bin/fluent-bit-watcher

#
EXPOSE 2020

# Entry point
CMD ["/fluent-bit/bin/fluent-bit-watcher", "-c", "/fluent-bit/etc/fluent-bit.conf"]
ENTRYPOINT ["/fluent-bit/bin/fluent-bit-watcher"]
58 changes: 47 additions & 11 deletions cmd/fluent-bit-watcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"flag"
"math"
"os"
"os/exec"
Expand All @@ -14,14 +15,17 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"kubesphere.io/fluentbit-operator/pkg/filenotify"
)

const (
binPath = "/fluent-bit/bin/fluent-bit"
cfgPath = "/fluent-bit/etc/fluent-bit.conf"
watchDir = "/fluent-bit/config"
MaxDelayTime = time.Minute * 5
ResetTime = time.Minute * 10
defaultBinPath = "/fluent-bit/bin/fluent-bit"
defaultCfgPath = "/fluent-bit/etc/fluent-bit.conf"
defaultWatchDir = "/fluent-bit/config"
defaultPollInterval = 1 * time.Second

MaxDelayTime = 5 * time.Minute
ResetTime = 10 * time.Minute
)

var (
Expand All @@ -33,7 +37,22 @@ var (
timerCancel context.CancelFunc
)

var configPath string
var binPath string
var watchPath string
var poll bool
var pollInterval time.Duration

func main() {

flag.StringVar(&binPath, "b", defaultBinPath, "The fluent bit binary path.")
flag.StringVar(&configPath, "c", defaultCfgPath, "The config file path.")
flag.StringVar(&watchPath, "watch-path", defaultWatchDir, "The path to watch.")
flag.BoolVar(&poll, "poll", false, "Use poll watcher instead of ionotify.")
flag.DurationVar(&pollInterval, "poll-interval", defaultPollInterval, "Poll interval if using poll watcher.")

flag.Parse()

logger = log.NewLogfmtLogger(os.Stdout)

timerCtx, timerCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -77,14 +96,14 @@ func main() {
}
{
// Watch the config file, if the config file changed, stop Fluent bit.
watcher, err := fsnotify.NewWatcher()
watcher, err := newWatcher(poll, pollInterval)
if err != nil {
_ = level.Error(logger).Log("err", err)
return
}

// Start watcher.
err = watcher.Add(watchDir)
err = watcher.Add(watchPath)
if err != nil {
_ = level.Error(logger).Log("err", err)
return
Expand All @@ -98,7 +117,7 @@ func main() {
select {
case <-cancel:
return nil
case event := <-watcher.Events:
case event := <-watcher.Events():
if !isValidEvent(event) {
continue
}
Expand All @@ -110,7 +129,7 @@ func main() {
stop()
resetTimer()
_ = level.Info(logger).Log("msg", "Config file changed, stopped Fluent Bit")
case <-watcher.Errors:
case <-watcher.Errors():
_ = level.Error(logger).Log("msg", "Watcher stopped")
return nil
}
Expand All @@ -130,9 +149,26 @@ func main() {
_ = level.Info(logger).Log("msg", "See you next time!")
}

func newWatcher(poll bool, interval time.Duration) (filenotify.FileWatcher, error) {
var err error
var watcher filenotify.FileWatcher

if poll {
watcher = filenotify.NewPollingWatcher(interval)
} else {
watcher, err = filenotify.New(interval)
}

if err != nil {
return nil, err
}

return watcher, nil
}

// Inspired by https://github.com/jimmidyson/configmap-reload
func isValidEvent(event fsnotify.Event) bool {
return event.Op&fsnotify.Create == fsnotify.Create
return event.Op == fsnotify.Create || event.Op == fsnotify.Write
}

func start() {
Expand All @@ -144,7 +180,7 @@ func start() {
return
}

cmd = exec.Command(binPath, "-c", cfgPath)
cmd = exec.Command(binPath, "-c", configPath)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/logging.kubesphere.io_fluentbits.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,11 @@ spec:
type: array
type: object
type: object
args:
description: Fluent Bit Watcher command line arguments.
items:
type: string
type: array
containerLogRealPath:
description: Container log path
type: string
Expand Down
5 changes: 5 additions & 0 deletions manifests/setup/fluentbit-operator-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,11 @@ spec:
type: array
type: object
type: object
args:
description: Fluent Bit Watcher command line arguments.
items:
type: string
type: array
containerLogRealPath:
description: Container log path
type: string
Expand Down
8 changes: 7 additions & 1 deletion manifests/setup/setup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,11 @@ spec:
type: array
type: object
type: object
args:
description: Fluent Bit Watcher command line arguments.
items:
type: string
type: array
containerLogRealPath:
description: Container log path
type: string
Expand Down Expand Up @@ -3269,7 +3274,8 @@ spec:
- command:
- /bin/sh
- -c
- set -ex; echo CONTAINER_ROOT_DIR=$(docker info -f '{{.DockerRootDir}}') > /fluentbit-operator/fluent-bit.env
- set -ex; echo CONTAINER_ROOT_DIR=$(docker info -f '{{.DockerRootDir}}')
> /fluentbit-operator/fluent-bit.env
image: docker:19.03
name: setenv
volumeMounts:
Expand Down
53 changes: 53 additions & 0 deletions pkg/filenotify/filenotify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Adapted from https://github.com/gohugoio/hugo
// Apache License 2.0
// Copyright Hugo Authors
//
// Package filenotify provides a mechanism for watching file(s) for changes.
// Generally leans on fsnotify, but provides a poll-based notifier which fsnotify does not support.
// These are wrapped up in a common interface so that either can be used interchangeably in your code.
//
// This package is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License.
// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9
package filenotify

import (
"time"

"github.com/fsnotify/fsnotify"
)

// FileWatcher is an interface for implementing file notification watchers
type FileWatcher interface {
Events() <-chan fsnotify.Event
Errors() <-chan error
Add(name string) error
Remove(name string) error
Close() error
}

// New tries to use an fs-event watcher, and falls back to the poller if there is an error
func New(interval time.Duration) (FileWatcher, error) {
if watcher, err := NewEventWatcher(); err == nil {
return watcher, nil
}
return NewPollingWatcher(interval), nil
}

// NewPollingWatcher returns a poll-based file watcher
func NewPollingWatcher(interval time.Duration) FileWatcher {
return &filePoller{
interval: interval,
done: make(chan struct{}),
events: make(chan fsnotify.Event),
errors: make(chan error),
}
}

// NewEventWatcher returns an fs-event based file watcher
func NewEventWatcher() (FileWatcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
return &fsNotifyWatcher{watcher}, nil
}
24 changes: 24 additions & 0 deletions pkg/filenotify/fsnotify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Adapted from https://github.com/gohugoio/hugo
// Apache License 2.0
// Copyright Hugo Authors
//
// Package filenotify is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License.
// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9
package filenotify

import "github.com/fsnotify/fsnotify"

// fsNotifyWatcher wraps the fsnotify package to satisfy the FileNotifier interface
type fsNotifyWatcher struct {
*fsnotify.Watcher
}

// Events returns the fsnotify event channel receiver
func (w *fsNotifyWatcher) Events() <-chan fsnotify.Event {
return w.Watcher.Events
}

// Errors returns the fsnotify error channel receiver
func (w *fsNotifyWatcher) Errors() <-chan error {
return w.Watcher.Errors
}
Loading

0 comments on commit 5694f2a

Please sign in to comment.