diff --git a/internal/auditlog/tail.go b/internal/auditlog/tail.go index 892196c..c6f9f82 100644 --- a/internal/auditlog/tail.go +++ b/internal/auditlog/tail.go @@ -50,8 +50,6 @@ func Tail(ctx context.Context, projectID, clusterName string, cb func(*audit.Aud } func read(ctx context.Context, stream loggingpb.LoggingServiceV2_TailLogEntriesClient, cb func(*audit.AuditLog) error) error { - slog.Info("reading logs") - for { select { case <-ctx.Done(): @@ -64,28 +62,29 @@ func read(ctx context.Context, stream loggingpb.LoggingServiceV2_TailLogEntriesC case err != nil: return fmt.Errorf("stream receive failed: %w", err) default: - for _, entry := range resp.GetEntries() { - payload := entry.GetProtoPayload() - if payload == nil { - slog.Warn("unexpected payload type") - continue - } + } + + for _, entry := range resp.GetEntries() { + payload := entry.GetProtoPayload() + if payload == nil { + slog.Warn("unexpected payload type") + continue + } - msg, err := payload.UnmarshalNew() - if err != nil { - slog.Warn("failed to unmarshal payload", slog.Any("error", err)) - continue - } + msg, err := payload.UnmarshalNew() + if err != nil { + slog.Warn("failed to unmarshal payload", slog.Any("error", err)) + continue + } - auditLog, ok := msg.(*audit.AuditLog) - if !ok { - slog.Warn("unexpected payload type", slog.Any("type", fmt.Sprintf("%t", msg))) - continue - } + auditLog, ok := msg.(*audit.AuditLog) + if !ok { + slog.Warn("unexpected payload type", slog.Any("type", fmt.Sprintf("%t", msg))) + continue + } - if err = cb(auditLog); err != nil { - return fmt.Errorf("callback failed: %w", err) - } + if err = cb(auditLog); err != nil { + return fmt.Errorf("callback failed: %w", err) } } } diff --git a/internal/datastore/badgerdb.go b/internal/datastore/badgerdb.go index 73169be..dcc930d 100644 --- a/internal/datastore/badgerdb.go +++ b/internal/datastore/badgerdb.go @@ -1,8 +1,10 @@ package datastore import ( + "encoding/json" "errors" "fmt" + "time" "github.com/dgraph-io/badger/v4" @@ -15,6 +17,13 @@ type Store struct { db *badger.DB } +type Entry struct { + Resource k8s.Resource `json:"resource"` + Suspended bool `json:"suspended"` + UpdatedBy string `json:"updatedBy"` + UpdatedAt time.Time `json:"updatedAt"` +} + func NewBadgerStore(path string) (*Store, error) { if path == "" { return nil, errors.New("badger store path cannot be empty") @@ -28,8 +37,8 @@ func NewBadgerStore(path string) (*Store, error) { }, nil } -func (s *Store) IsSuspended(resource k8s.Resource) (bool, error) { - var suspended bool +func (s *Store) GetEntry(resource k8s.Resource) (Entry, error) { + var entry Entry err := s.db.View(func(txn *badger.Txn) error { item, err := txn.Get(buildKey(resource)) if err != nil { @@ -42,23 +51,46 @@ func (s *Store) IsSuspended(resource k8s.Resource) (bool, error) { if err != nil { return fmt.Errorf("failed to get value: %w", err) } - if len(val) != 1 { - return fmt.Errorf("expected value length of 1, got %d", len(val)) + if err = json.Unmarshal(val, &entry); err != nil { + return fmt.Errorf("failed to unmarshal entry: %w", err) } - suspended = val[0] == 1 return nil }) - return suspended, err + return entry, err } -func (s *Store) SetSuspended(resource k8s.Resource, suspended bool) error { +func (s *Store) SaveEntry(entry Entry) error { return s.db.Update(func(txn *badger.Txn) error { - var b byte - if suspended { - b = 1 + data, err := json.Marshal(entry) + if err != nil { + return fmt.Errorf("failed ot marshal entry: %w", err) + } + return txn.Set(buildKey(entry.Resource), data) + }) +} + +func (s *Store) AllEntries() ([]Entry, error) { + entries := make([]Entry, 0) + err := s.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + + prefix := []byte("resource:") + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + val, err := item.ValueCopy(nil) + if err != nil { + return fmt.Errorf("failed to get value: %w", err) + } + var entry Entry + if err = json.Unmarshal(val, &entry); err != nil { + return fmt.Errorf("failed to unmarshal entry: %w", err) + } + entries = append(entries, entry) } - return txn.Set(buildKey(resource), []byte{b}) + return nil }) + return entries, err } func (s *Store) Close() error { @@ -66,5 +98,5 @@ func (s *Store) Close() error { } func buildKey(resource k8s.Resource) []byte { - return []byte(fmt.Sprintf("%s:%s:%s", resource.Kind, resource.Namespace, resource.Name)) + return []byte(fmt.Sprintf("resource:%s:%s:%s", resource.Kind, resource.Namespace, resource.Name)) } diff --git a/internal/k8s/resource.go b/internal/k8s/resource.go index af4f1a2..02d9715 100644 --- a/internal/k8s/resource.go +++ b/internal/k8s/resource.go @@ -9,6 +9,7 @@ type Resource struct { Namespace string Kind string Name string + Path string } func ResourceFromPath(path string) (Resource, error) { @@ -20,5 +21,6 @@ func ResourceFromPath(path string) (Resource, error) { Namespace: parts[3], Kind: strings.TrimSuffix(parts[4], "s"), Name: parts[5], + Path: path, }, nil } diff --git a/internal/watch/watcher.go b/internal/watch/watcher.go index 9a245ee..e336862 100644 --- a/internal/watch/watcher.go +++ b/internal/watch/watcher.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "time" "google.golang.org/genproto/googleapis/cloud/audit" @@ -43,8 +44,9 @@ type k8sClient interface { } type store interface { - IsSuspended(resource k8s.Resource) (bool, error) - SetSuspended(resource k8s.Resource, suspended bool) error + GetEntry(k8s.Resource) (datastore.Entry, error) + SaveEntry(datastore.Entry) error + AllEntries() ([]datastore.Entry, error) } type notifier interface { @@ -52,6 +54,21 @@ type notifier interface { } func (w *Watcher) Watch(ctx context.Context) error { + // Re-check resource suspension states, in case any have been modified since the process was last running + slog.Info("re-checking resource suspension states") + entries, err := w.store.AllEntries() + if err != nil { + return fmt.Errorf("failed to fetch all entries: %w", err) + } + for _, entry := range entries { + if err = w.checkSuspensionStatus(ctx, entry.Resource, ""); err != nil { + // We don't return an error here, as CRD versions are liable to change as fluxcd is upgraded + slog.Warn("failed to re-check suspension status", slog.Any("error", err)) + } + } + + // Watch for new modifications + slog.Info("watching for resource modifications") return auditlog.Tail(ctx, w.googleCloudProjectID, w.gkeClusterName, func(logEntry *audit.AuditLog) error { if code := logEntry.GetStatus().GetCode(); code != 0 { slog.Warn("operation appeared to fail", slog.Int("code", int(code))) @@ -66,48 +83,62 @@ func (w *Watcher) Watch(ctx context.Context) error { return err } - res, err := w.k8sClient.GetRawResource(ctx, resourceName) - if err != nil { - return fmt.Errorf("failed to get raw resource: %w", err) + if err = w.checkSuspensionStatus(ctx, resource, email); err != nil { + return fmt.Errorf("failed to re-check suspension status: %w", err) } - spec, ok := res["spec"].(map[string]any) - if !ok { - return errors.New("unexpected response payload") - } - isSuspended, _ := spec["suspend"].(bool) + return nil + }) +} - var modified bool - wasSuspended, err := w.store.IsSuspended(resource) - if err != nil { - if !errors.Is(err, datastore.ErrNotFound) { - return err - } - modified = true - } else { - modified = isSuspended != wasSuspended - } +func (w *Watcher) checkSuspensionStatus(ctx context.Context, resource k8s.Resource, updatedBy string) error { + res, err := w.k8sClient.GetRawResource(ctx, resource.Path) + if err != nil { + return fmt.Errorf("failed to get raw resource: %w", err) + } - if !modified { - return nil // Probably something else about the resource modified - } + spec, ok := res["spec"].(map[string]any) + if !ok { + return errors.New("unexpected response payload") + } + suspended, _ := spec["suspend"].(bool) - if err = w.store.SetSuspended(resource, isSuspended); err != nil { + var updated bool + entry, err := w.store.GetEntry(resource) + if err != nil { + if !errors.Is(err, datastore.ErrNotFound) { return err } + updated = true + entry = datastore.Entry{} + } else { + updated = suspended != entry.Suspended + } + + if !updated { + return nil // Probably something else about the resource modified + } + + entry.Resource = resource + entry.Suspended = suspended + entry.UpdatedBy = updatedBy + entry.UpdatedAt = time.Now().UTC() + + if err = w.store.SaveEntry(entry); err != nil { + return err + } - slog.Info( - "suspension status modified", - slog.String("resource", resourceName), - slog.String("user", email), - slog.Bool("suspended", isSuspended), - ) - - return w.notifier.Notify(ctx, notification.Notification{ - Resource: resource, - Suspended: isSuspended, - Email: email, - GoogleCloudProjectID: w.googleCloudProjectID, - }) + slog.Info( + "suspension status updated", + slog.String("resource", resource.Path), + slog.String("user", updatedBy), + slog.Bool("suspended", suspended), + ) + + return w.notifier.Notify(ctx, notification.Notification{ + Resource: resource, + Suspended: suspended, + Email: updatedBy, + GoogleCloudProjectID: w.googleCloudProjectID, }) }