From 82ee11bd42c955b7e99b6344512f5c06ec1f10be Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 4 Oct 2022 17:32:56 -0400 Subject: [PATCH] improve: split updates destination from list grants So that we can block on list grants --- internal/connector/connector.go | 35 ++++++++++++++++++++++----------- internal/server/destinations.go | 7 +++++++ internal/server/middleware.go | 4 +++- 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/internal/connector/connector.go b/internal/connector/connector.go index b1cd58dac3..379a9b362f 100644 --- a/internal/connector/connector.go +++ b/internal/connector/connector.go @@ -192,9 +192,19 @@ func Run(ctx context.Context, options Options) error { if err := syncWithServer(con); err != nil { return err } - wait(ctx, 30*time.Second) - if err := ctx.Err(); err != nil { - return nil + if err := wait(ctx, 30*time.Second); err != nil { + return err + } + } + }) + group.Go(func() error { + for { + if err := syncDestination(con); err != nil { + logging.Errorf("failed to update destination in infra: %v", err) + } + // TODO: how long should this wait? + if err := wait(ctx, 30*time.Second); err != nil { + return err } } }) @@ -280,7 +290,11 @@ func Run(ctx context.Context, options Options) error { } // wait for goroutines to shutdown - return group.Wait() + err = group.Wait() + if errors.Is(err, context.Canceled) { + return nil + } + return err } func httpTransportFromOptions(opts ServerOptions) *http.Transport { @@ -390,11 +404,6 @@ func getEndpointHostPort(k8s *kubernetes.Kubernetes, opts Options) (types.HostPo } func syncWithServer(con connector) error { - if err := syncDestination(con); err != nil { - logging.Errorf("failed to update destination in infra: %v", err) - return nil - } - grants, err := con.client.ListGrants(api.ListGrantsRequest{Resource: con.destination.Name}) if err != nil { logging.Errorf("error listing grants: %v", err) @@ -568,13 +577,15 @@ func slicesEqual(s1, s2 []string) bool { } // wait blocks for the duration of delay, or until the context is done. -func wait(ctx context.Context, delay time.Duration) { +// Returns the context error when the context is done, and returns nil if +// the timer waited the full duration. +func wait(ctx context.Context, delay time.Duration) error { timer := time.NewTimer(delay) select { case <-timer.C: - return + return nil case <-ctx.Done(): timer.Stop() - return + return ctx.Err() } } diff --git a/internal/server/destinations.go b/internal/server/destinations.go index 9dc750b224..82965b6d1b 100644 --- a/internal/server/destinations.go +++ b/internal/server/destinations.go @@ -2,6 +2,7 @@ package server import ( "fmt" + "time" "github.com/gin-gonic/gin" @@ -44,6 +45,12 @@ func (a *API) CreateDestination(c *gin.Context, r *api.CreateDestinationRequest) Version: r.Version, } + // set LastSeenAt if this request came from a connector. The middleware + // can't do this update in the case where the destination did not exist yet + if c.Request.Header.Get(headerInfraDestination) == r.UniqueID { + destination.LastSeenAt = time.Now() + } + err := access.CreateDestination(c, destination) if err != nil { return nil, fmt.Errorf("create destination: %w", err) diff --git a/internal/server/middleware.go b/internal/server/middleware.go index c3747e4566..2f43d7d79e 100644 --- a/internal/server/middleware.go +++ b/internal/server/middleware.go @@ -113,7 +113,7 @@ func authenticateRequest(c *gin.Context, route routeSettings, srv *Server) (acce } if authned.User != nil { - if uniqueID := c.Request.Header.Get("Infra-Destination"); uniqueID != "" { + if uniqueID := c.Request.Header.Get(headerInfraDestination); uniqueID != "" { tx = tx.WithOrgID(authned.Organization.ID) rCtx := access.RequestContext{DBTxn: tx, Authenticated: authned} if err := handleInfraDestinationHeader(rCtx, uniqueID); err != nil { @@ -126,6 +126,8 @@ func authenticateRequest(c *gin.Context, route routeSettings, srv *Server) (acce return authned, err } +const headerInfraDestination = "Infra-Destination" + // validateOrgMatchesRequest checks that if both the accessKeyOrg and the org // from the request are set they have the same ID. If only one is set no // error is returned.