Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pluggable blob storage for pkg/registry #1209

Merged
merged 7 commits into from
Dec 23, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions internal/verify/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ type verifyReader struct {
gotSize, wantSize int64
}

// Error provides information about the failed hash verification.
type Error struct {
got string
want v1.Hash
gotSize int64
}

func (v Error) Error() string {
return fmt.Sprintf("error verifying %s checksum after reading %d bytes; got %q, want %q",
v.want.Algorithm, v.gotSize, v.got, v.want)
}

// Read implements io.Reader
func (vc *verifyReader) Read(b []byte) (int, error) {
n, err := vc.inner.Read(b)
Expand All @@ -48,8 +60,11 @@ func (vc *verifyReader) Read(b []byte) (int, error) {
}
got := hex.EncodeToString(vc.hasher.Sum(make([]byte, 0, vc.hasher.Size())))
if want := vc.expected.Hex; got != want {
return n, fmt.Errorf("error verifying %s checksum after reading %d bytes; got %q, want %q",
vc.expected.Algorithm, vc.gotSize, got, want)
return n, Error{
got: got,
want: vc.expected,
gotSize: vc.gotSize,
}
}
}
return n, err
Expand Down
228 changes: 191 additions & 37 deletions pkg/registry/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@ package registry

import (
"bytes"
"crypto/sha256"
"encoding/hex"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
"path"
"strings"
"sync"

"github.com/google/go-containerregistry/internal/verify"
v1 "github.com/google/go-containerregistry/pkg/v1"
)

// Returns whether this url should be handled by the blob handler
Expand All @@ -44,10 +48,90 @@ func isBlob(req *http.Request) bool {
elem[len(elem)-2] == "uploads")
}

// blobHandler represents a blob storage backend.
//
// For all methods, repo is a string that can be passed to
// pkg/name.NewRepository to validate the repository.
type blobHandler interface {
imjasonh marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this must be public to be useful.

I think one of the things I realized in my fork was that I wanted the default implementation to be public as well, so that I could compose my own BlobHandler with the default to layer functionality on.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's what I have in my fork:

// BlobHandler is the interface for the storage layer underneath this registry.
type BlobHandler interface {
	// Stat returns the size of the blob whose hash is specified,
	// if it exists. If not, it returns (0, error).
	Stat(repo name.Repository, h v1.Hash) (int64, error)

	// Get returns true and a reader for consuming the blob specified with the hash,
	// if it exists.  It now, it returns (nil, error).
	Get(repo name.Repository, h v1.Hash) (io.ReadCloser, error)

	// Store stores the stream of content with the given hash, or returns the error
	// encountered doing so.
	Store(repo name.Repository, h v1.Hash, content io.ReadCloser) error
}

I think plumbing context.Context through makes sense, and am on board with that.

For completeness, this is what I have for manifests:

// ManifestHandler is the interface for the metadata storage layer underneath
// this registry.
type ManifestHandler interface {
	// ListRepositories enumerates up to some count of repositories starting at
	// a specified offset.  This is used to implement the "catalog" handler.
	ListRepositories(offset, count int) ([]name.Repository, error)

	// GetRepository fetches a handler for interacting with a collection of
	// manifests rooted under a particular repository.
	GetRepository(repo name.Repository) (RepositoryHandler, error)
}

// RepositoryHandler is the interface for accessing manifest data under a
// particular repository.
type RepositoryHandler interface {
	// GetDigest fetches the raw bytes of the manifest, and its media type,
	// or returns an error indicating why it cannot.
	GetDigest(v1.Hash) ([]byte, types.MediaType, error)
	// GetTag fetches the raw bytes of the manifest currently labeled by the
	// provided tag, and its media type, or returns an error indicating why
	// it cannot.
	GetTag(string) ([]byte, types.MediaType, error)

	// DeleteDigest removes the manifest with the given hash, it returns an
	// error when the digest does not exist.
	DeleteDigest(v1.Hash) error
	// DeleteTag removes the tag with the given name, the digest is left untouched,
	// it returns an error when the digest does not exist.
	DeleteTag(string) error

	// PutDigest adds the provided manifest content, with the associated media type
	// to the store under the provided hash name.  It returns an error if this cannot
	// be completed.
	PutDigest(v1.Hash, []byte, types.MediaType) error
	// PutTag applies the provided tag to the given hash.  It returns an error if
	// this is not possible.
	PutTag(string, v1.Hash) error

	// ListTags enumerates up to some count of tags starting at
	// a specified offset.  This is used to implement the "tags" handler.
	ListTags(offset, count int) ([]string, error)
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the intention is that we add support for manifests and uploads and then export it when we agree we like the shape. This gives us maximum flexibility to change things willy nilly.

// Stat returns the size of the blob, or errNotFound if the blob wasn't found.
Stat(ctx context.Context, repo string, h v1.Hash) (int64, error)

// Get gets the blob contents, or errNotFound if the blob wasn't found.
Get(ctx context.Context, repo string, h v1.Hash) (io.ReadCloser, error)

// Put puts the blob contents.
//
// The contents will be verified against the expected size and digest
// as the contents are read, and an error will be returned if these
// don't match. Implementations should return that error, or a wrapper
// around that error, to return the correct error when these don't match.
Put(ctx context.Context, repo string, h v1.Hash, rc io.ReadCloser) error
}

// redirectError represents a signal that the blob handler doesn't have the blob
// contents, but that those contents are at another location which registry
// clients should redirect to.
type redirectError struct {
Comment on lines +79 to +82
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kinda regret letting Scott use errors for non-error cases like this in genreconciler, but that ship sailed there. Perhaps we can have the methods return a Result type or something, which can be an error, but don't have to be an error.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I can justify this as a valid error scenario: "oops, I don't have it, but I know who does"

I could collapse this with errNotFound, so the same error type expresses "oops not found[, but I know who does]" if Location is set.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4xx and 5xx are clear error cases, but 3xx feels like a reach.

I'm not gonna die on this hill, it just feels like a funny API.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't disagree, it's weird. Luckily everything's still internal so we can change it to something we like better if we find it.

// Location is the location to find the contents.
Location string

// Code is the HTTP redirect status code to return to clients.
Code int
}

func (e redirectError) Error() string { return fmt.Sprintf("redirecting (%d): %s", e.Code, e.Location) }

// errNotFound represents an error locating the blob.
var errNotFound = errors.New("not found")

func errTODO(msg string) *regError {
return &regError{
Status: http.StatusInternalServerError,
Code: "INTERNAL_SERVER_ERROR",
Message: msg,
}
}

type memHandler struct {
m map[string][]byte
lock sync.Mutex
}

func (m *memHandler) Stat(_ context.Context, _ string, h v1.Hash) (int64, error) {
m.lock.Lock()
defer m.lock.Unlock()

b, found := m.m[h.String()]
if !found {
return 0, errNotFound
}
return int64(len(b)), nil
}
func (m *memHandler) Get(_ context.Context, _ string, h v1.Hash) (io.ReadCloser, error) {
m.lock.Lock()
defer m.lock.Unlock()

b, found := m.m[h.String()]
if !found {
return nil, errNotFound
}
return ioutil.NopCloser(bytes.NewReader(b)), nil
}
func (m *memHandler) Put(_ context.Context, _ string, h v1.Hash, rc io.ReadCloser) error {
m.lock.Lock()
defer m.lock.Unlock()

defer rc.Close()
var buf bytes.Buffer
io.Copy(&buf, rc)
m.m[h.String()] = buf.Bytes()
return nil
}

// blobs
type blobs struct {
// Blobs are content addresses. we store them globally underneath their sha and make no distinctions per image.
contents map[string][]byte
blobHandler blobHandler

// Each upload gets a unique id that writes occur to until finalized.
uploads map[string][]byte
lock sync.Mutex
Expand All @@ -72,40 +156,78 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {
digest := req.URL.Query().Get("digest")
contentRange := req.Header.Get("Content-Range")

repo := req.URL.Host + path.Join(elem[1:len(elem)-2]...)

switch req.Method {
case http.MethodHead:
b.lock.Lock()
defer b.lock.Unlock()
b, ok := b.contents[target]
if !ok {
h, err := v1.NewHash(target)
if err != nil {
return &regError{
Status: http.StatusBadRequest,
Code: "NAME_INVALID",
Message: "invalid digest",
}
}

size, err := b.blobHandler.Stat(req.Context(), repo, h)
if errors.Is(err, errNotFound) {
imjasonh marked this conversation as resolved.
Show resolved Hide resolved
return &regError{
Status: http.StatusNotFound,
Code: "BLOB_UNKNOWN",
Message: "Unknown blob",
}
} else if err != nil {
return errTODO(err.Error())
}

resp.Header().Set("Content-Length", fmt.Sprint(len(b)))
resp.Header().Set("Docker-Content-Digest", target)
resp.Header().Set("Content-Length", fmt.Sprint(size))
resp.Header().Set("Docker-Content-Digest", h.String())
resp.WriteHeader(http.StatusOK)
return nil

case http.MethodGet:
b.lock.Lock()
defer b.lock.Unlock()
b, ok := b.contents[target]
if !ok {
h, err := v1.NewHash(target)
if err != nil {
return &regError{
Status: http.StatusBadRequest,
Code: "NAME_INVALID",
Message: "invalid digest",
}
}

size, err := b.blobHandler.Stat(req.Context(), repo, h)
if errors.Is(err, errNotFound) {
return &regError{
Status: http.StatusNotFound,
Code: "BLOB_UNKNOWN",
Message: "Unknown blob",
}
} else if err != nil {
return errTODO(err.Error())
}

resp.Header().Set("Content-Length", fmt.Sprint(len(b)))
resp.Header().Set("Docker-Content-Digest", target)
rc, err := b.blobHandler.Get(req.Context(), repo, h)
if errors.Is(err, errNotFound) {
return &regError{
Status: http.StatusNotFound,
Code: "BLOB_UNKNOWN",
Message: "Unknown blob",
}
} else if err != nil {
var rerr redirectError
if errors.As(err, &rerr) {
http.Redirect(resp, req, rerr.Location, rerr.Code)
return nil
}

return errTODO(err.Error())
}
defer rc.Close()

resp.Header().Set("Content-Length", fmt.Sprint(size))
resp.Header().Set("Docker-Content-Digest", h.String())
resp.WriteHeader(http.StatusOK)
io.Copy(resp, bytes.NewReader(b))
io.Copy(resp, rc)
return nil

case http.MethodPost:
Expand All @@ -120,22 +242,32 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {
}

if digest != "" {
l := &bytes.Buffer{}
io.Copy(l, req.Body)
rd := sha256.Sum256(l.Bytes())
d := "sha256:" + hex.EncodeToString(rd[:])
if d != digest {
h, err := v1.NewHash(digest)
if err != nil {
return &regError{
Status: http.StatusBadRequest,
Code: "DIGEST_INVALID",
Message: "digest does not match contents",
Code: "NAME_INVALID",
Message: "invalid digest",
}
}

b.lock.Lock()
defer b.lock.Unlock()
b.contents[d] = l.Bytes()
resp.Header().Set("Docker-Content-Digest", d)
vrc, err := verify.ReadCloser(req.Body, req.ContentLength, h)
if err != nil {
return errTODO(err.Error())
}
defer vrc.Close()

if err := b.blobHandler.Put(req.Context(), repo, h, vrc); err != nil {
if errors.As(err, &verify.Error{}) {
return &regError{
Status: http.StatusBadRequest,
Code: "DIGEST_INVALID",
Message: "digest does not match contents",
}
}
return errTODO(err.Error())
}
resp.Header().Set("Docker-Content-Digest", h.String())
resp.WriteHeader(http.StatusCreated)
return nil
}
Expand Down Expand Up @@ -220,21 +352,43 @@ func (b *blobs) handle(resp http.ResponseWriter, req *http.Request) *regError {

b.lock.Lock()
defer b.lock.Unlock()
l := bytes.NewBuffer(b.uploads[target])
io.Copy(l, req.Body)
rd := sha256.Sum256(l.Bytes())
d := "sha256:" + hex.EncodeToString(rd[:])
if d != digest {

h, err := v1.NewHash(digest)
if err != nil {
return &regError{
Status: http.StatusBadRequest,
Code: "DIGEST_INVALID",
Message: "digest does not match contents",
Code: "NAME_INVALID",
Message: "invalid digest",
}
}

defer req.Body.Close()
in := ioutil.NopCloser(io.MultiReader(bytes.NewBuffer(b.uploads[target]), req.Body))

size := int64(verify.SizeUnknown)
if req.ContentLength > 0 {
size = int64(len(b.uploads[target])) + req.ContentLength
}

vrc, err := verify.ReadCloser(in, size, h)
if err != nil {
return errTODO(err.Error())
}
defer vrc.Close()
imjasonh marked this conversation as resolved.
Show resolved Hide resolved

if err := b.blobHandler.Put(req.Context(), repo, h, vrc); err != nil {
if errors.As(err, &verify.Error{}) {
return &regError{
Status: http.StatusBadRequest,
Code: "DIGEST_INVALID",
Message: "digest does not match contents",
}
}
return errTODO(err.Error())
}

b.contents[d] = l.Bytes()
delete(b.uploads, target)
resp.Header().Set("Docker-Content-Digest", d)
resp.Header().Set("Docker-Content-Digest", h.String())
resp.WriteHeader(http.StatusCreated)
return nil

Expand Down
3 changes: 3 additions & 0 deletions pkg/registry/depcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ func TestDeps(t *testing.T) {
"github.com/google/go-containerregistry/internal/httptest",
"github.com/google/go-containerregistry/pkg/v1",
"github.com/google/go-containerregistry/pkg/v1/types",

"github.com/google/go-containerregistry/internal/verify",
"github.com/google/go-containerregistry/internal/and",
),
})
}
30 changes: 0 additions & 30 deletions pkg/registry/example_test.go

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func New(opts ...Option) http.Handler {
r := &registry{
log: log.New(os.Stderr, "", log.LstdFlags),
blobs: blobs{
contents: map[string][]byte{},
uploads: map[string][]byte{},
blobHandler: &memHandler{m: map[string][]byte{}},
uploads: map[string][]byte{},
},
manifests: manifests{
manifests: map[string]map[string]manifest{},
Expand Down
Loading