Skip to content

Commit

Permalink
chore: improve concurrency and context cancelation
Browse files Browse the repository at this point in the history
Addressing comments from @alberto-miranda #16 (comment)
  • Loading branch information
fntlnz committed Aug 16, 2024
1 parent 2d1e0eb commit 3d58b00
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 43 deletions.
37 changes: 22 additions & 15 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
package cmd

import (
"context"
"os"
"os/signal"
"syscall"
"time"

"log/slog"
Expand All @@ -26,6 +30,7 @@ import (
"github.com/seqeralabs/staticreg/pkg/server"
"github.com/seqeralabs/staticreg/pkg/server/staticreg"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)

var (
Expand All @@ -40,6 +45,9 @@ var serveCmd = &cobra.Command{
Short: "Serves a webserver with an HTML listing of all images and tags in a v2 registry",
Run: func(cmd *cobra.Command, args []string) {
ctx := cmd.Context()
ctx, stop := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
defer stop()

log := logger.FromContext(ctx)
log.Info("starting server",
slog.Duration("cache-duration", cacheDuration),
Expand All @@ -50,6 +58,7 @@ var serveCmd = &cobra.Command{

client := registry.New(rootCfg)
asyncClient := async.New(client, refreshInterval)
defer asyncClient.Stop(context.Background())

filler := filler.New(asyncClient, rootCfg.RegistryHostname, "/")

Expand All @@ -60,24 +69,22 @@ var serveCmd = &cobra.Command{
return
}

errCh := make(chan error, 1)
go func() {
errCh <- srv.Start()
}()
g, ctx := errgroup.WithContext(ctx)

go func() {
errCh <- asyncClient.Start(ctx)
}()
g.Go(func() error {
return srv.Start(ctx)
})

select {
case <-ctx.Done():
return
case err := <-errCh:
if err == nil {
slog.Error("operations exited unexpectedly")
return
g.Go(func() error {
return asyncClient.Start(ctx)
})

if err := g.Wait(); err != nil {
if ctx.Err() != nil {
log.Info("context cancelled, shutting down")
} else {
slog.Error("unexpected error", logger.ErrAttr(err))
}
slog.Error("unexpected error", logger.ErrAttr(err))
return
}
},
Expand Down
77 changes: 52 additions & 25 deletions pkg/registry/async/async_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ package async

import (
"context"
"fmt"
"errors"
"log/slog"
"time"

v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/puzpuzpuz/xsync/v3"
"golang.org/x/sync/errgroup"

"github.com/cenkalti/backoff/v4"
"github.com/seqeralabs/staticreg/pkg/observability/logger"
Expand All @@ -31,22 +32,35 @@ import (
const imageInfoRequestsBufSize = 10
const tagRequestBufferSize = 10

var (
ErrNoTagsFound = errors.New("no tags found")
ErrImageInfoNotFound = errors.New("image info not found")
)

// Async is a struct that wraps an underlying registry.Client
// to provide asynchronous methods for interacting with a container registry.
// It continuously syncs data from the registry in a separate goroutine.
type Async struct {
// underlying is the actual registry client that does the registry operations, remember this is just a wrapper!
underlying registry.Client

// refreshInterval represents the time to wait to synchronize repositories again after a successful synchronization
refreshInterval time.Duration

// repos is an in memory list of all the repository names in the registry
repos []string

// repositoryTags represents the list of tags for each repository
repositoryTags *xsync.MapOf[string, []string]

// imageInfo contains the image information indexed by repo name and tag
imageInfo *xsync.MapOf[imageInfoKey, imageInfo]

// repositoryRequestBuffer generates requests for the `handleRepositoryRequest`
// handler that is responsible for retrieving the tags for a given image and
// scheduling new jobs on `imageInfoRequestsBuffer`
repositoryRequestBuffer chan repositoryRequest
// imageInfoRequestsBuffer is responsible for feeding `handleImageInfoRequest`
// so that image info is retrieved for each <repo,tag> combination
imageInfoRequestsBuffer chan imageInfoRequest
}

Expand All @@ -68,52 +82,51 @@ type imageInfo struct {
reference string
}

func (c *Async) Stop(ctx context.Context) {
close(c.imageInfoRequestsBuffer)
close(c.repositoryRequestBuffer)
}

func (c *Async) Start(ctx context.Context) error {
// TODO(fntlnz): maybe instead of errCh use a backoff and retry ops
errCh := make(chan error, 1)
g, ctx := errgroup.WithContext(ctx)

go func() {
g.Go(func() error {
for {
err := backoff.Retry(func() error {
return c.synchronizeRepositories(ctx)
}, backoff.WithContext(newExponentialBackoff(), ctx))

if err != nil {
errCh <- err
return err
}

time.Sleep(c.refreshInterval)
}
}()
})

go func() {
g.Go(func() error {
for {
select {
case <-ctx.Done():
return
return ctx.Err()
case req := <-c.repositoryRequestBuffer:
c.handleRepositoryRequest(ctx, req)
}
}
}()
})

go func() {
g.Go(func() error {
for {
select {
case <-ctx.Done():
return
return ctx.Err()
case req := <-c.imageInfoRequestsBuffer:
c.handleImageInfoRequest(ctx, req)
}
}
}()
})

select {
case <-ctx.Done():
return nil
case err := <-errCh:
return err
}
return g.Wait()
}

func (c *Async) synchronizeRepositories(ctx context.Context) error {
Expand All @@ -126,7 +139,14 @@ func (c *Async) synchronizeRepositories(ctx context.Context) error {
c.repos = repos

for _, r := range repos {
c.repositoryRequestBuffer <- repositoryRequest{repo: r}
if err := ctx.Err(); err != nil {
return nil
}
select {
case c.repositoryRequestBuffer <- repositoryRequest{repo: r}:
default:
return nil
}
}

return nil
Expand All @@ -146,13 +166,19 @@ func (c *Async) handleRepositoryRequest(ctx context.Context, req repositoryReque
c.repositoryTags.Store(req.repo, tags)

for _, t := range tags {
c.imageInfoRequestsBuffer <- imageInfoRequest{
if ctx.Err() != nil {
return
}

select {
case c.imageInfoRequestsBuffer <- imageInfoRequest{
repo: req.repo,
tag: t,
}:
default:
return
}
}

return
}

func (c *Async) handleImageInfoRequest(ctx context.Context, req imageInfoRequest) {
Expand All @@ -176,10 +202,11 @@ func (c *Async) RepoList(ctx context.Context) ([]string, error) {
return c.repos, nil
}

// TagList contains
func (c *Async) TagList(ctx context.Context, repo string) ([]string, error) {
tags, ok := c.repositoryTags.Load(repo)
if !ok {
return nil, fmt.Errorf("no tags found") // TODO(fntlnz): make an error var
return nil, ErrNoTagsFound
}
return tags, nil
}
Expand All @@ -191,7 +218,7 @@ func (c *Async) ImageInfo(ctx context.Context, repo string, tag string) (image v
}
info, ok := c.imageInfo.Load(key)
if !ok {
return nil, "", fmt.Errorf("image info not found") // TODO(fntlnz): make an error var
return nil, "", ErrImageInfoNotFound
}
return info.image, info.reference, nil
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"context"
"log/slog"
"net/http"
"strings"
Expand All @@ -9,8 +10,8 @@ import (
cache "github.com/chenyahui/gin-cache"
"github.com/chenyahui/gin-cache/persist"
sloggin "github.com/samber/slog-gin"
"golang.org/x/sync/errgroup"

// "github.com/chenyahui/gin-cache/persist"
"github.com/gin-gonic/gin"
)

Expand Down Expand Up @@ -78,8 +79,14 @@ func New(
}, nil
}

func (s *Server) Start() error {
return s.server.ListenAndServe()
func (s *Server) Start(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
g.Go(s.server.ListenAndServe)
g.Go(func() error {
<-ctx.Done()
return s.server.Shutdown(context.Background())
})
return g.Wait()
}

func injectLoggerMiddleware(log *slog.Logger) gin.HandlerFunc {
Expand Down

0 comments on commit 3d58b00

Please sign in to comment.