diff --git a/cmd/crane/cmd/serve.go b/cmd/crane/cmd/serve.go index 5b11153eb..4af54d38c 100644 --- a/cmd/crane/cmd/serve.go +++ b/cmd/crane/cmd/serve.go @@ -37,7 +37,8 @@ func newCmdRegistry() *cobra.Command { } func newCmdServe() *cobra.Command { - return &cobra.Command{ + var disk bool + cmd := &cobra.Command{ Use: "serve", Short: "Serve an in-memory registry implementation", Long: `This sub-command serves an in-memory registry implementation on port :8080 (or $PORT) @@ -60,9 +61,16 @@ Contents are only stored in memory, and when the process exits, pushed data is l porti := listener.Addr().(*net.TCPAddr).Port port = fmt.Sprintf("%d", porti) + bh := registry.NewInMemoryBlobHandler() + if disk { + tmp := os.TempDir() + log.Printf("storing blobs in %s", tmp) + bh = registry.NewDiskBlobHandler(tmp) + } + s := &http.Server{ ReadHeaderTimeout: 5 * time.Second, // prevent slowloris, quiet linter - Handler: registry.New(), + Handler: registry.New(registry.WithBlobHandler(bh)), } log.Printf("serving on port %s", port) @@ -81,4 +89,7 @@ Contents are only stored in memory, and when the process exits, pushed data is l return nil }, } + cmd.Flags().BoolVar(&disk, "blobs-to-disk", false, "Store blobs on disk") + cmd.Flags().MarkHidden("blobs-to-disk") + return cmd } diff --git a/pkg/registry/blobs.go b/pkg/registry/blobs.go index 4bf2c65e5..8386ffdf9 100644 --- a/pkg/registry/blobs.go +++ b/pkg/registry/blobs.go @@ -48,24 +48,24 @@ func isBlob(req *http.Request) bool { elem[len(elem)-2] == "uploads") } -// blobHandler represents a minimal blob storage backend, capable of serving +// BlobHandler represents a minimal blob storage backend, capable of serving // blob contents. -type blobHandler interface { +type BlobHandler interface { // Get gets the blob contents, or errNotFound if the blob wasn't found. Get(ctx context.Context, repo string, h v1.Hash) (io.ReadCloser, error) } -// blobStatHandler is an extension interface representing a blob storage +// BlobStatHandler is an extension interface representing a blob storage // backend that can serve metadata about blobs. -type blobStatHandler interface { +type BlobStatHandler interface { // Stat returns the size of the blob, or errNotFound if the blob wasn't // found, or redirectError if the blob can be found elsewhere. Stat(ctx context.Context, repo string, h v1.Hash) (int64, error) } -// blobPutHandler is an extension interface representing a blob storage backend +// BlobPutHandler is an extension interface representing a blob storage backend // that can write blob contents. -type blobPutHandler interface { +type BlobPutHandler interface { // Put puts the blob contents. // // The contents will be verified against the expected size and digest @@ -75,9 +75,9 @@ type blobPutHandler interface { Put(ctx context.Context, repo string, h v1.Hash, rc io.ReadCloser) error } -// blobDeleteHandler is an extension interface representing a blob storage +// BlobDeleteHandler is an extension interface representing a blob storage // backend that can delete blob contents. -type blobDeleteHandler interface { +type BlobDeleteHandler interface { // Delete the blob contents. Delete(ctx context.Context, repo string, h v1.Hash) error } @@ -103,6 +103,8 @@ type memHandler struct { lock sync.Mutex } +func NewInMemoryBlobHandler() BlobHandler { return &memHandler{m: map[string][]byte{}} } + func (m *memHandler) Stat(_ context.Context, _ string, h v1.Hash) (int64, error) { m.lock.Lock() defer m.lock.Unlock() @@ -149,7 +151,7 @@ func (m *memHandler) Delete(_ context.Context, _ string, h v1.Hash) error { // blobs type blobs struct { - blobHandler blobHandler + blobHandler BlobHandler // Each upload gets a unique id that writes occur to until finalized. uploads map[string][]byte @@ -190,7 +192,7 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError { } var size int64 - if bsh, ok := b.blobHandler.(blobStatHandler); ok { + if bsh, ok := b.blobHandler.(BlobStatHandler); ok { size, err = bsh.Stat(req.Context(), repo, h) if errors.Is(err, errNotFound) { return regErrBlobUnknown @@ -238,7 +240,7 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError { var size int64 var r io.Reader - if bsh, ok := b.blobHandler.(blobStatHandler); ok { + if bsh, ok := b.blobHandler.(BlobStatHandler); ok { size, err = bsh.Stat(req.Context(), repo, h) if errors.Is(err, errNotFound) { return regErrBlobUnknown @@ -292,7 +294,7 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError { return nil case http.MethodPost: - bph, ok := b.blobHandler.(blobPutHandler) + bph, ok := b.blobHandler.(BlobPutHandler) if !ok { return regErrUnsupported } @@ -393,7 +395,7 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError { return nil case http.MethodPut: - bph, ok := b.blobHandler.(blobPutHandler) + bph, ok := b.blobHandler.(BlobPutHandler) if !ok { return regErrUnsupported } @@ -454,7 +456,7 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError { return nil case http.MethodDelete: - bdh, ok := b.blobHandler.(blobDeleteHandler) + bdh, ok := b.blobHandler.(BlobDeleteHandler) if !ok { return regErrUnsupported } diff --git a/pkg/registry/blobs_disk.go b/pkg/registry/blobs_disk.go new file mode 100644 index 000000000..c7bc47bff --- /dev/null +++ b/pkg/registry/blobs_disk.go @@ -0,0 +1,73 @@ +// Copyright 2023 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registry + +import ( + "context" + "errors" + "io" + "os" + "path/filepath" + "sync" + + v1 "github.com/google/go-containerregistry/pkg/v1" +) + +type diskHandler struct { + dir string + lock sync.Mutex +} + +func NewDiskBlobHandler(dir string) BlobHandler { return &diskHandler{dir: dir} } + +func (m *diskHandler) Stat(_ context.Context, _ string, h v1.Hash) (int64, error) { + m.lock.Lock() + defer m.lock.Unlock() + + fi, err := os.Stat(filepath.Join(m.dir, h.String())) + if errors.Is(err, os.ErrNotExist) { + return 0, errNotFound + } else if err != nil { + return 0, err + } + return fi.Size(), nil +} +func (m *diskHandler) Get(_ context.Context, _ string, h v1.Hash) (io.ReadCloser, error) { + m.lock.Lock() + defer m.lock.Unlock() + + return os.Open(filepath.Join(m.dir, h.String())) +} +func (m *diskHandler) Put(_ context.Context, _ string, h v1.Hash, rc io.ReadCloser) error { + m.lock.Lock() + defer m.lock.Unlock() + + f, err := os.Create(filepath.Join(m.dir, h.String())) + if err != nil { + return err + } + defer f.Close() + + if _, err := io.Copy(f, rc); err != nil { + return err + } + return nil +} +func (m *diskHandler) Delete(_ context.Context, _ string, h v1.Hash) error { + m.lock.Lock() + defer m.lock.Unlock() + + return os.Remove(filepath.Join(m.dir, h.String())) +} diff --git a/pkg/registry/blobs_disk_test.go b/pkg/registry/blobs_disk_test.go new file mode 100644 index 000000000..df29318f0 --- /dev/null +++ b/pkg/registry/blobs_disk_test.go @@ -0,0 +1,83 @@ +// Copyright 2023 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registry_test + +import ( + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/google/go-containerregistry/pkg/name" + "github.com/google/go-containerregistry/pkg/registry" + "github.com/google/go-containerregistry/pkg/v1/random" + "github.com/google/go-containerregistry/pkg/v1/remote" + "github.com/google/go-containerregistry/pkg/v1/validate" +) + +func TestDiskPush(t *testing.T) { + dir := t.TempDir() + reg := registry.New(registry.WithBlobHandler(registry.NewDiskBlobHandler(dir))) + srv := httptest.NewServer(reg) + defer srv.Close() + + ref, err := name.ParseReference(strings.TrimPrefix(srv.URL, "http://") + "/foo/bar:latest") + if err != nil { + t.Fatal(err) + } + img, err := random.Image(1024, 5) + if err != nil { + t.Fatal(err) + } + if err := remote.Write(ref, img); err != nil { + t.Fatalf("remote.Write: %v", err) + } + + // Test we can read and validate the image. + if _, err := remote.Image(ref); err != nil { + t.Fatalf("remote.Image: %v", err) + } + if err := validate.Image(img); err != nil { + t.Fatalf("validate.Image: %v", err) + } + + // Collect the layer SHAs we expect to find. + want := map[string]bool{} + if h, err := img.ConfigName(); err != nil { + t.Fatal(err) + } else { + want[h.String()] = true + } + ls, err := img.Layers() + if err != nil { + t.Fatal(err) + } + for _, l := range ls { + if h, err := l.Digest(); err != nil { + t.Fatal(err) + } else { + want[h.String()] = true + } + } + + // Test the blobs are there on disk. + for dig := range want { + if _, err := os.Stat(filepath.Join(dir, dig)); err != nil { + t.Fatalf("os.Stat(%s): %v", dig, err) + } + t.Logf("Found %s", dig) + } +} diff --git a/pkg/registry/registry.go b/pkg/registry/registry.go index a98d77a93..2f8fd1127 100644 --- a/pkg/registry/registry.go +++ b/pkg/registry/registry.go @@ -136,3 +136,9 @@ func WithWarning(prob float64, msg string) Option { r.warnings[prob] = msg } } + +func WithBlobHandler(h BlobHandler) Option { + return func(r *registry) { + r.blobs.blobHandler = h + } +}