Skip to content

Commit

Permalink
Re-check suspend status on start
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-jones committed Jul 13, 2024
1 parent 02cb83c commit 1592d5b
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 70 deletions.
41 changes: 20 additions & 21 deletions internal/auditlog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ func Tail(ctx context.Context, projectID, clusterName string, cb func(*audit.Aud
}

func read(ctx context.Context, stream loggingpb.LoggingServiceV2_TailLogEntriesClient, cb func(*audit.AuditLog) error) error {
slog.Info("reading logs")

for {
select {
case <-ctx.Done():
Expand All @@ -64,28 +62,29 @@ func read(ctx context.Context, stream loggingpb.LoggingServiceV2_TailLogEntriesC
case err != nil:
return fmt.Errorf("stream receive failed: %w", err)
default:
for _, entry := range resp.GetEntries() {
payload := entry.GetProtoPayload()
if payload == nil {
slog.Warn("unexpected payload type")
continue
}
}

for _, entry := range resp.GetEntries() {
payload := entry.GetProtoPayload()
if payload == nil {
slog.Warn("unexpected payload type")
continue
}

msg, err := payload.UnmarshalNew()
if err != nil {
slog.Warn("failed to unmarshal payload", slog.Any("error", err))
continue
}
msg, err := payload.UnmarshalNew()
if err != nil {
slog.Warn("failed to unmarshal payload", slog.Any("error", err))
continue
}

auditLog, ok := msg.(*audit.AuditLog)
if !ok {
slog.Warn("unexpected payload type", slog.Any("type", fmt.Sprintf("%t", msg)))
continue
}
auditLog, ok := msg.(*audit.AuditLog)
if !ok {
slog.Warn("unexpected payload type", slog.Any("type", fmt.Sprintf("%t", msg)))
continue
}

if err = cb(auditLog); err != nil {
return fmt.Errorf("callback failed: %w", err)
}
if err = cb(auditLog); err != nil {
return fmt.Errorf("callback failed: %w", err)
}
}
}
Expand Down
56 changes: 44 additions & 12 deletions internal/datastore/badgerdb.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package datastore

import (
"encoding/json"
"errors"
"fmt"
"time"

"github.com/dgraph-io/badger/v4"

Expand All @@ -15,6 +17,13 @@ type Store struct {
db *badger.DB
}

type Entry struct {
Resource k8s.Resource `json:"resource"`
Suspended bool `json:"suspended"`
UpdatedBy string `json:"updatedBy"`
UpdatedAt time.Time `json:"updatedAt"`
}

func NewBadgerStore(path string) (*Store, error) {
if path == "" {
return nil, errors.New("badger store path cannot be empty")
Expand All @@ -28,8 +37,8 @@ func NewBadgerStore(path string) (*Store, error) {
}, nil
}

func (s *Store) IsSuspended(resource k8s.Resource) (bool, error) {
var suspended bool
func (s *Store) GetEntry(resource k8s.Resource) (Entry, error) {
var entry Entry
err := s.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(buildKey(resource))
if err != nil {
Expand All @@ -42,29 +51,52 @@ func (s *Store) IsSuspended(resource k8s.Resource) (bool, error) {
if err != nil {
return fmt.Errorf("failed to get value: %w", err)
}
if len(val) != 1 {
return fmt.Errorf("expected value length of 1, got %d", len(val))
if err = json.Unmarshal(val, &entry); err != nil {
return fmt.Errorf("failed to unmarshal entry: %w", err)
}
suspended = val[0] == 1
return nil
})
return suspended, err
return entry, err
}

func (s *Store) SetSuspended(resource k8s.Resource, suspended bool) error {
func (s *Store) SaveEntry(entry Entry) error {
return s.db.Update(func(txn *badger.Txn) error {
var b byte
if suspended {
b = 1
data, err := json.Marshal(entry)
if err != nil {
return fmt.Errorf("failed ot marshal entry: %w", err)
}
return txn.Set(buildKey(entry.Resource), data)
})
}

func (s *Store) AllEntries() ([]Entry, error) {
entries := make([]Entry, 0)
err := s.db.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()

prefix := []byte("resource:")
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
val, err := item.ValueCopy(nil)
if err != nil {
return fmt.Errorf("failed to get value: %w", err)
}
var entry Entry
if err = json.Unmarshal(val, &entry); err != nil {
return fmt.Errorf("failed to unmarshal entry: %w", err)
}
entries = append(entries, entry)
}
return txn.Set(buildKey(resource), []byte{b})
return nil
})
return entries, err
}

func (s *Store) Close() error {
return s.db.Close()
}

func buildKey(resource k8s.Resource) []byte {
return []byte(fmt.Sprintf("%s:%s:%s", resource.Kind, resource.Namespace, resource.Name))
return []byte(fmt.Sprintf("resource:%s:%s:%s", resource.Kind, resource.Namespace, resource.Name))
}
2 changes: 2 additions & 0 deletions internal/k8s/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type Resource struct {
Namespace string
Kind string
Name string
Path string
}

func ResourceFromPath(path string) (Resource, error) {
Expand All @@ -20,5 +21,6 @@ func ResourceFromPath(path string) (Resource, error) {
Namespace: parts[3],
Kind: strings.TrimSuffix(parts[4], "s"),
Name: parts[5],
Path: path,
}, nil
}
105 changes: 68 additions & 37 deletions internal/watch/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"time"

"google.golang.org/genproto/googleapis/cloud/audit"

Expand Down Expand Up @@ -43,15 +44,31 @@ type k8sClient interface {
}

type store interface {
IsSuspended(resource k8s.Resource) (bool, error)
SetSuspended(resource k8s.Resource, suspended bool) error
GetEntry(k8s.Resource) (datastore.Entry, error)
SaveEntry(datastore.Entry) error
AllEntries() ([]datastore.Entry, error)
}

type notifier interface {
Notify(context.Context, notification.Notification) error
}

func (w *Watcher) Watch(ctx context.Context) error {
// Re-check resource suspension states, in case any have been modified since the process was last running
slog.Info("re-checking resource suspension states")
entries, err := w.store.AllEntries()
if err != nil {
return fmt.Errorf("failed to fetch all entries: %w", err)
}
for _, entry := range entries {
if err = w.checkSuspensionStatus(ctx, entry.Resource, "<unknown>"); err != nil {
// We don't return an error here, as CRD versions are liable to change as fluxcd is upgraded
slog.Warn("failed to re-check suspension status", slog.Any("error", err))
}
}

// Watch for new modifications
slog.Info("watching for resource modifications")
return auditlog.Tail(ctx, w.googleCloudProjectID, w.gkeClusterName, func(logEntry *audit.AuditLog) error {
if code := logEntry.GetStatus().GetCode(); code != 0 {
slog.Warn("operation appeared to fail", slog.Int("code", int(code)))
Expand All @@ -66,48 +83,62 @@ func (w *Watcher) Watch(ctx context.Context) error {
return err
}

res, err := w.k8sClient.GetRawResource(ctx, resourceName)
if err != nil {
return fmt.Errorf("failed to get raw resource: %w", err)
if err = w.checkSuspensionStatus(ctx, resource, email); err != nil {
return fmt.Errorf("failed to re-check suspension status: %w", err)
}

spec, ok := res["spec"].(map[string]any)
if !ok {
return errors.New("unexpected response payload")
}
isSuspended, _ := spec["suspend"].(bool)
return nil
})
}

var modified bool
wasSuspended, err := w.store.IsSuspended(resource)
if err != nil {
if !errors.Is(err, datastore.ErrNotFound) {
return err
}
modified = true
} else {
modified = isSuspended != wasSuspended
}
func (w *Watcher) checkSuspensionStatus(ctx context.Context, resource k8s.Resource, updatedBy string) error {
res, err := w.k8sClient.GetRawResource(ctx, resource.Path)
if err != nil {
return fmt.Errorf("failed to get raw resource: %w", err)
}

if !modified {
return nil // Probably something else about the resource modified
}
spec, ok := res["spec"].(map[string]any)
if !ok {
return errors.New("unexpected response payload")
}
suspended, _ := spec["suspend"].(bool)

if err = w.store.SetSuspended(resource, isSuspended); err != nil {
var updated bool
entry, err := w.store.GetEntry(resource)
if err != nil {
if !errors.Is(err, datastore.ErrNotFound) {
return err
}
updated = true
entry = datastore.Entry{}
} else {
updated = suspended != entry.Suspended
}

if !updated {
return nil // Probably something else about the resource modified
}

entry.Resource = resource
entry.Suspended = suspended
entry.UpdatedBy = updatedBy
entry.UpdatedAt = time.Now().UTC()

if err = w.store.SaveEntry(entry); err != nil {
return err
}

slog.Info(
"suspension status modified",
slog.String("resource", resourceName),
slog.String("user", email),
slog.Bool("suspended", isSuspended),
)

return w.notifier.Notify(ctx, notification.Notification{
Resource: resource,
Suspended: isSuspended,
Email: email,
GoogleCloudProjectID: w.googleCloudProjectID,
})
slog.Info(
"suspension status updated",
slog.String("resource", resource.Path),
slog.String("user", updatedBy),
slog.Bool("suspended", suspended),
)

return w.notifier.Notify(ctx, notification.Notification{
Resource: resource,
Suspended: suspended,
Email: updatedBy,
GoogleCloudProjectID: w.googleCloudProjectID,
})
}

0 comments on commit 1592d5b

Please sign in to comment.