Skip to content

Commit

Permalink
feat: purge cache daemon first blood
Browse files Browse the repository at this point in the history
  • Loading branch information
pedrokiefer authored and cezarsa committed Jan 11, 2021
1 parent e62693e commit 3bd526e
Show file tree
Hide file tree
Showing 5 changed files with 372 additions and 0 deletions.
36 changes: 36 additions & 0 deletions cmd/purger/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package main

import (
"log"

"github.com/google/gops/agent"
"github.com/tsuru/rpaas-operator/internal/purge"
)

func main() {
if err := agent.Listen(agent.Options{}); err != nil {
log.Fatalf("could not initialize gops agent: %v", err)
}
defer agent.Close()

k, err := purge.NewK8S()
if err != nil {
log.Fatalf("could not create RPaaS API: %v", err)
}

w, err := purge.NewWithClient(k)
if err != nil {
log.Fatalf("could not create RPaaS API: %v", err)
}

w.Watch()

a, err := purge.New(w)
if err != nil {
log.Fatalf("could not create RPaaS API: %v", err)
}

if err := a.Start(); err != nil {
log.Fatalf("could not start the RPaaS API server: %v", err)
}
}
145 changes: 145 additions & 0 deletions internal/purge/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package purge

import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"

echoPrometheus "github.com/globocom/echo-prometheus"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/tsuru/rpaas-operator/internal/pkg/rpaas/nginx"
"golang.org/x/net/http2"
)

var metricsMiddleware = echoPrometheus.MetricsMiddleware()

type purge struct {
sync.Mutex

watcher *Watcher
cacheManager nginx.NginxManager

Address string

ShutdownTimeout time.Duration

started bool
e *echo.Echo
shutdown chan struct{}
}

func New(w *Watcher) (*purge, error) {
p := &purge{
watcher: w,
cacheManager: nginx.NewNginxManager(),
Address: `:9990`,
ShutdownTimeout: 30 * time.Second,
e: echo.New(),
shutdown: make(chan struct{}),
}
p.setupEcho(p.e)
return p, nil
}

func (p *purge) startServer() error {
return p.e.StartH2CServer(p.Address, &http2.Server{})
}

// Start runs the web server.
func (p *purge) Start() error {
p.Lock()
p.started = true
p.Unlock()
go p.handleSignals()
if err := p.startServer(); err != http.ErrServerClosed {
fmt.Printf("problem to start the webserver: %+v", err)
return err
}
fmt.Println("Shutting down the webserver...")
return nil
}

// Stop shut down the web server.
func (p *purge) Stop() error {
p.Lock()
defer p.Unlock()
if !p.started {
return fmt.Errorf("web server is already down")
}
if p.shutdown == nil {
return fmt.Errorf("shutdown channel is not defined")
}
close(p.shutdown)
ctx, cancel := context.WithTimeout(context.Background(), p.ShutdownTimeout)
defer cancel()
return p.e.Shutdown(ctx)
}

func (p *purge) handleSignals() {
quit := make(chan os.Signal, 2)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
select {
case <-quit:
p.Stop()
case <-p.shutdown:
}
}

func (p *purge) setupEcho(e *echo.Echo) {
e.HideBanner = true
e.HTTPErrorHandler = func(err error, c echo.Context) {
var (
code = http.StatusInternalServerError
msg interface{}
)

if he, ok := err.(*echo.HTTPError); ok {
code = he.Code
msg = he.Message
if he.Internal != nil {
msg = fmt.Sprintf("%v, %v", err, he.Internal)
}
} else {
msg = err.Error()
}
if _, ok := msg.(string); ok {
msg = echo.Map{"message": msg}
}

e.Logger.Error(err)

if !c.Response().Committed {
if c.Request().Method == http.MethodHead {
err = c.NoContent(code)
} else {
err = c.JSON(code, msg)
}
if err != nil {
e.Logger.Error(err)
}
}
}

e.Use(middleware.Recover())
e.Use(middleware.Logger())
e.Use(metricsMiddleware)
//e.Use(OpenTracingMiddleware)
//e.Use(errorMiddleware)

e.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
e.GET("/healthcheck", healthcheck)

e.POST("/resource/:instance/purge", p.cachePurge)
e.POST("/resource/:instance/purge/bulk", p.cachePurgeBulk)
}

func healthcheck(c echo.Context) error {
return c.String(http.StatusOK, "OK")
}
88 changes: 88 additions & 0 deletions internal/purge/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package purge

import (
"context"
"encoding/json"
"fmt"
"net/http"

"github.com/labstack/echo/v4"

"github.com/tsuru/rpaas-operator/internal/pkg/rpaas"
)

func (p *purge) cachePurge(c echo.Context) error {
if c.Request().ContentLength == 0 {
return echo.NewHTTPError(http.StatusBadRequest, "Request body can't be empty")
}
ctx := c.Request().Context()
var args rpaas.PurgeCacheArgs
err := c.Bind(&args)
if err != nil {
return err
}
name := c.Param("instance")
if len(name) == 0 {
return c.String(http.StatusBadRequest, "instance is required")
}

count, err := p.PurgeCache(ctx, name, args)
if err != nil {
return err
}
return c.String(http.StatusOK, fmt.Sprintf("Object purged on %d servers", count))
}

func (p *purge) cachePurgeBulk(c echo.Context) error {
if c.Request().ContentLength == 0 {
return echo.NewHTTPError(http.StatusBadRequest, "Request body can't be empty")
}
ctx := c.Request().Context()

name := c.Param("instance")
if len(name) == 0 {
return c.String(http.StatusBadRequest, "instance is required")
}

var argsList []rpaas.PurgeCacheArgs
if err := json.NewDecoder(c.Request().Body).Decode(&argsList); err != nil {
return c.String(http.StatusBadRequest, err.Error())
}

status := http.StatusOK
var results []rpaas.PurgeCacheBulkResult
for _, args := range argsList {
var r rpaas.PurgeCacheBulkResult
count, err := p.PurgeCache(ctx, name, args)
if err != nil {
status = http.StatusInternalServerError
r = rpaas.PurgeCacheBulkResult{Path: args.Path, Error: err.Error()}
} else {
r = rpaas.PurgeCacheBulkResult{Path: args.Path, InstancesPurged: count}
}
results = append(results, r)
}

return c.JSON(status, results)
}

func (p *purge) PurgeCache(ctx context.Context, name string, args rpaas.PurgeCacheArgs) (int, error) {
if args.Path == "" {
return 0, rpaas.ValidationError{Msg: "path is required"}
}
pods, port, err := p.watcher.ListPods(name)
if err != nil {
return 0, rpaas.NotFoundError{Msg: fmt.Sprintf("Failed to find pods: %w", err)}
}
purgeCount := 0
for _, pod := range pods {
if !pod.Running {
continue
}
if err = p.cacheManager.PurgeCache(pod.Address, args.Path, port, args.PreservePath); err != nil {
continue
}
purgeCount++
}
return purgeCount, nil
}
21 changes: 21 additions & 0 deletions internal/purge/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package purge

import (
"os"

"github.com/tsuru/rpaas-operator/pkg/observability"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)

func NewK8S() (kubernetes.Interface, error) {
kubeconfig := os.Getenv("KUBECONFIG")

config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
config.WrapTransport = observability.OpentracingTransport

return kubernetes.NewForConfig(config)
}
82 changes: 82 additions & 0 deletions internal/purge/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package purge

import (
"time"

"github.com/tsuru/rpaas-operator/internal/pkg/rpaas"
"github.com/tsuru/rpaas-operator/internal/pkg/rpaas/nginx"
"github.com/tsuru/rpaas-operator/pkg/util"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
v1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
)

// Should be exported from rpaas/k8s.go
const (
defaultNamespace = "rpaasv2"
defaultKeyLabelPrefix = "rpaas.extensions.tsuru.io"

nginxContainerName = "nginx"
)

type Watcher struct {
Client kubernetes.Interface
Informer v1informers.PodInformer

stopCh chan struct{}
}

func NewWithClient(c kubernetes.Interface) (*Watcher, error) {
return &Watcher{
Client: c,
stopCh: make(chan struct{}),
}, nil
}

func (w *Watcher) Watch() {
informerFactory := informers.NewFilteredSharedInformerFactory(w.Client, time.Minute, metav1.NamespaceAll, nil)
informerFactory.Start(w.stopCh)

w.Informer = informerFactory.Core().V1().Pods()
w.Informer.Informer()
}

func (w *Watcher) ListPods(instance string) ([]rpaas.PodStatus, int32, error) {
labelSet := labels.Set{
"rpaas.extensions.tsuru.io/instance-name": instance,
}
sel := labels.SelectorFromSet(labelSet)

pods := []rpaas.PodStatus{}

list, err := w.Informer.Lister().List(sel)
if err != nil {
// Todo
return pods, -1, err
}

port := util.PortByName(list[0].Spec.Containers[0].Ports, nginx.PortNameManagement)

for _, p := range list {
ps, err := w.podStatus(p)
if err != nil {
continue
}
pods = append(pods, ps)
}
return pods, port, nil
}

func (w *Watcher) podStatus(pod *v1.Pod) (rpaas.PodStatus, error) {
allRunning := true
for _, cs := range pod.Status.ContainerStatuses {
allRunning = allRunning && cs.Ready
}
return rpaas.PodStatus{
Address: pod.Status.PodIP,
Running: allRunning,
}, nil
}

0 comments on commit 3bd526e

Please sign in to comment.