Skip to content

Commit

Permalink
improve: split updates destination from list grants
Browse files Browse the repository at this point in the history
So that we can block on list grants
  • Loading branch information
dnephin committed Oct 6, 2022
1 parent e797b71 commit 82ee11b
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 13 deletions.
35 changes: 23 additions & 12 deletions internal/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,19 @@ func Run(ctx context.Context, options Options) error {
if err := syncWithServer(con); err != nil {
return err
}
wait(ctx, 30*time.Second)
if err := ctx.Err(); err != nil {
return nil
if err := wait(ctx, 30*time.Second); err != nil {
return err
}
}
})
group.Go(func() error {
for {
if err := syncDestination(con); err != nil {
logging.Errorf("failed to update destination in infra: %v", err)
}
// TODO: how long should this wait?
if err := wait(ctx, 30*time.Second); err != nil {
return err
}
}
})
Expand Down Expand Up @@ -280,7 +290,11 @@ func Run(ctx context.Context, options Options) error {
}

// wait for goroutines to shutdown
return group.Wait()
err = group.Wait()
if errors.Is(err, context.Canceled) {
return nil
}
return err
}

func httpTransportFromOptions(opts ServerOptions) *http.Transport {
Expand Down Expand Up @@ -390,11 +404,6 @@ func getEndpointHostPort(k8s *kubernetes.Kubernetes, opts Options) (types.HostPo
}

func syncWithServer(con connector) error {
if err := syncDestination(con); err != nil {
logging.Errorf("failed to update destination in infra: %v", err)
return nil
}

grants, err := con.client.ListGrants(api.ListGrantsRequest{Resource: con.destination.Name})
if err != nil {
logging.Errorf("error listing grants: %v", err)
Expand Down Expand Up @@ -568,13 +577,15 @@ func slicesEqual(s1, s2 []string) bool {
}

// wait blocks for the duration of delay, or until the context is done.
func wait(ctx context.Context, delay time.Duration) {
// Returns the context error when the context is done, and returns nil if
// the timer waited the full duration.
func wait(ctx context.Context, delay time.Duration) error {
timer := time.NewTimer(delay)
select {
case <-timer.C:
return
return nil
case <-ctx.Done():
timer.Stop()
return
return ctx.Err()
}
}
7 changes: 7 additions & 0 deletions internal/server/destinations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"fmt"
"time"

"github.com/gin-gonic/gin"

Expand Down Expand Up @@ -44,6 +45,12 @@ func (a *API) CreateDestination(c *gin.Context, r *api.CreateDestinationRequest)
Version: r.Version,
}

// set LastSeenAt if this request came from a connector. The middleware
// can't do this update in the case where the destination did not exist yet
if c.Request.Header.Get(headerInfraDestination) == r.UniqueID {
destination.LastSeenAt = time.Now()
}

err := access.CreateDestination(c, destination)
if err != nil {
return nil, fmt.Errorf("create destination: %w", err)
Expand Down
4 changes: 3 additions & 1 deletion internal/server/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func authenticateRequest(c *gin.Context, route routeSettings, srv *Server) (acce
}

if authned.User != nil {
if uniqueID := c.Request.Header.Get("Infra-Destination"); uniqueID != "" {
if uniqueID := c.Request.Header.Get(headerInfraDestination); uniqueID != "" {
tx = tx.WithOrgID(authned.Organization.ID)
rCtx := access.RequestContext{DBTxn: tx, Authenticated: authned}
if err := handleInfraDestinationHeader(rCtx, uniqueID); err != nil {
Expand All @@ -126,6 +126,8 @@ func authenticateRequest(c *gin.Context, route routeSettings, srv *Server) (acce
return authned, err
}

const headerInfraDestination = "Infra-Destination"

// validateOrgMatchesRequest checks that if both the accessKeyOrg and the org
// from the request are set they have the same ID. If only one is set no
// error is returned.
Expand Down

0 comments on commit 82ee11b

Please sign in to comment.