Skip to content

Commit

Permalink
registry comments
Browse files Browse the repository at this point in the history
Signed-off-by: Liang Zheng <zhengliang0901@gmail.com>
  • Loading branch information
microyahoo committed Jan 21, 2024
1 parent c377191 commit 1d6638e
Show file tree
Hide file tree
Showing 25 changed files with 162 additions and 98 deletions.
27 changes: 23 additions & 4 deletions cmd/registry/config-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,32 @@ storage:
enabled: true
cache:
blobdescriptor: inmemory
filesystem:
rootdirectory: /mnt/ceph/registry
s3:
accesskey: testy
secretkey: testy
region: us-west-1
regionendpoint: http://10.9.8.73:80
# forcepathstyle: true
# accelerate: false
bucket: test2
# encrypt: true
# chunksize: 5242880
multipartcopychunksize: 33554432
multipartcopymaxconcurrency: 100
multipartcopythresholdsize: 33554432
# rootdirectory: /s3/object/name/prefix
# usedualstack: false
loglevel: debug
redirect:
disable: true
# filesystem:
# rootdirectory: /mnt/ceph/registry
maintenance:
uploadpurging:
enabled: false
http:
addr: :5000
addr: :19088
# addr: :5000
debug:
addr: :5001
prometheus:
Expand All @@ -25,6 +44,6 @@ http:
X-Content-Type-Options: [nosniff]
health:
storagedriver:
enabled: true
enabled: false
interval: 10s
threshold: 3
4 changes: 4 additions & 0 deletions internal/client/blob_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
)

type httpBlobUpload struct {
Expand Down Expand Up @@ -45,6 +46,7 @@ func (hbu *httpBlobUpload) ReadFrom(r io.Reader) (n int64, err error) {

req.Header.Set("Content-Type", "application/octet-stream")

dcontext.GetLogger(hbu.ctx).Debugf("****httpBlobUpload.ReadFrom: request: %+v, header: %s, hbu.location=%s, hbu.offset=%d", req, req.Header, hbu.location, hbu.offset)
resp, err := hbu.client.Do(req)
if err != nil {
return 0, err
Expand Down Expand Up @@ -81,6 +83,8 @@ func (hbu *httpBlobUpload) Write(p []byte) (n int, err error) {
req.Header.Set("Content-Length", fmt.Sprintf("%d", len(p)))
req.Header.Set("Content-Type", "application/octet-stream")

dcontext.GetLogger(hbu.ctx).Debugf("****httpBlobUpload: request: %+v, header: %s, len(data)=%d, hbu.location=%s, hbu.offset=%d", req, req.Header, len(p), hbu.location, hbu.offset)

resp, err := hbu.client.Do(req)
if err != nil {
return 0, err
Expand Down
4 changes: 3 additions & 1 deletion internal/client/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/client/transport"
"github.com/distribution/distribution/v3/internal/dcontext"
v2 "github.com/distribution/distribution/v3/registry/api/v2"
"github.com/distribution/distribution/v3/registry/storage/cache"
"github.com/distribution/distribution/v3/registry/storage/cache/memory"
Expand Down Expand Up @@ -770,7 +771,7 @@ func (bs *blobs) Create(ctx context.Context, options ...distribution.BlobCreateO
values = append(values, url.Values{"from": {opts.Mount.From.Name()}, "mount": {opts.Mount.From.Digest().String()}})
}

u, err := bs.ub.BuildBlobUploadURL(bs.name, values...)
u, err := bs.ub.BuildBlobUploadURL(bs.name, values...) // build blob upload url
if err != nil {
return nil, err
}
Expand All @@ -779,6 +780,7 @@ func (bs *blobs) Create(ctx context.Context, options ...distribution.BlobCreateO
if err != nil {
return nil, err
}
dcontext.GetLogger(ctx).Debugf("****blobs.Create: url: %+v, request: %+v", u, req)

resp, err := bs.client.Do(req)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion manifest/ocischema/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Manifest struct {
manifest.Versioned

// Config references the image configuration as a blob.
Config distribution.Descriptor `json:"config"`
Config distribution.Descriptor `json:"config"` // manifest 中存储的 config 信息

// Layers lists descriptors for the layers referenced by the
// configuration.
Expand Down
2 changes: 1 addition & 1 deletion notifications/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (b *bridge) createBlobEventAndWrite(action string, repo reference.Named, de
return err
}

return b.sink.Write(*event)
return b.sink.Write(*event) // 将 event 写入 channel
}

func (b *bridge) createBlobEvent(action string, repo reference.Named, desc distribution.Descriptor) (*Event, error) {
Expand Down
2 changes: 1 addition & 1 deletion registry/api/v2/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func RouterWithPrefix(prefix string) *mux.Router {
router.StrictSlash(true)

for _, descriptor := range routeDescriptors {
router.Path(descriptor.Path).Name(descriptor.Name)
router.Path(descriptor.Path).Name(descriptor.Name) // 注册路由
}

return rootRouter
Expand Down
6 changes: 3 additions & 3 deletions registry/api/v2/urls.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (ub *URLBuilder) BuildManifestURL(ref reference.Named) (string, error) {

// BuildBlobURL constructs the url for the blob identified by name and dgst.
func (ub *URLBuilder) BuildBlobURL(ref reference.Canonical) (string, error) {
route := ub.cloneRoute(RouteNameBlob)
route := ub.cloneRoute(RouteNameBlob) // /v2/{name}/blobs/{digest}

layerURL, err := route.URL("name", ref.Name(), "digest", ref.Digest().String())
if err != nil {
Expand Down Expand Up @@ -192,14 +192,14 @@ func (ub *URLBuilder) BuildBlobUploadURL(name reference.Named, values ...url.Val
// this url is provided by server implementations during the blob upload
// process.
func (ub *URLBuilder) BuildBlobUploadChunkURL(name reference.Named, uuid string, values ...url.Values) (string, error) {
route := ub.cloneRoute(RouteNameBlobUploadChunk)
route := ub.cloneRoute(RouteNameBlobUploadChunk) // /v2/{name:" + reference.NameRegexp.String() + "}/blobs/uploads/{uuid:[a-zA-Z0-9-_.=]+}

uploadURL, err := route.URL("name", name.Name(), "uuid", uuid)
if err != nil {
return "", err
}

return appendValuesURL(uploadURL, values...).String(), nil
return appendValuesURL(uploadURL, values...).String(), nil // 附加 query 信息
}

// clondedRoute returns a clone of the named route from the router. Routes
Expand Down
16 changes: 8 additions & 8 deletions registry/handlers/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App {
app.register(v2.RouteNameCatalog, catalogDispatcher)
app.register(v2.RouteNameTags, tagsDispatcher)
app.register(v2.RouteNameBlob, blobDispatcher)
app.register(v2.RouteNameBlobUpload, blobUploadDispatcher)
app.register(v2.RouteNameBlobUpload, blobUploadDispatcher) // blobUpload 路由注册
app.register(v2.RouteNameBlobUploadChunk, blobUploadDispatcher)

// override the storage driver's UA string for registry outbound HTTP requests
Expand All @@ -118,7 +118,7 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App {
}

var err error
app.driver, err = factory.Create(app, config.Storage.Type(), storageParams)
app.driver, err = factory.Create(app, config.Storage.Type(), storageParams) // 创建 storage driver
if err != nil {
// TODO(stevvooe): Move the creation of a service into a protected
// method, where this is created lazily. Its status can be queried via
Expand Down Expand Up @@ -239,7 +239,7 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App {
}

// configure storage caches
if cc, ok := config.Storage["cache"]; ok {
if cc, ok := config.Storage["cache"]; ok { // 如果配置了 cache
v, ok := cc["blobdescriptor"]
if !ok {
// Backwards compatible: "layerinfo" == "blobdescriptor"
Expand Down Expand Up @@ -274,7 +274,7 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App {

cacheProvider := memorycache.NewInMemoryBlobDescriptorCacheProvider(blobDescriptorSize)
localOptions := append(options, storage.BlobDescriptorCacheProvider(cacheProvider))
app.registry, err = storage.NewRegistry(app, app.driver, localOptions...)
app.registry, err = storage.NewRegistry(app, app.driver, localOptions...) // 创建 registry,这里是用内存作为缓存
if err != nil {
panic("could not create registry: " + err.Error())
}
Expand Down Expand Up @@ -344,7 +344,7 @@ func (app *App) RegisterHealthChecks(healthRegistries ...*health.Registry) {
healthRegistry = healthRegistries[0]
}

if app.Config.Health.StorageDriver.Enabled {
if app.Config.Health.StorageDriver.Enabled { // storage driver health check
interval := app.Config.Health.StorageDriver.Interval
if interval == 0 {
interval = defaultCheckInterval
Expand Down Expand Up @@ -459,7 +459,7 @@ func (app *App) configureEvents(configuration *configuration.Configuration) {
Ignore: endpoint.Ignore,
})

sinks = append(sinks, endpoint)
sinks = append(sinks, endpoint) // notification endpoints
}

// NOTE(stevvooe): Moving to a new queuing implementation is as easy as
Expand Down Expand Up @@ -669,10 +669,10 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
}

// assign and decorate the authorized repository with an event bridge.
context.Repository, context.RepositoryRemover = notifications.Listen(
context.Repository, context.RepositoryRemover = notifications.Listen( // repositoryListener
repository,
context.App.repoRemover,
app.eventBridge(context, r))
app.eventBridge(context, r)) // 创建 event bridge

context.Repository, err = applyRepoMiddleware(app, context.Repository, app.Config.Middleware["repository"])
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions registry/handlers/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func blobDispatcher(ctx *Context, r *http.Request) http.Handler {

mhandler := handlers.MethodHandler{
http.MethodGet: http.HandlerFunc(blobHandler.GetBlob),
http.MethodHead: http.HandlerFunc(blobHandler.GetBlob),
http.MethodHead: http.HandlerFunc(blobHandler.GetBlob), // head blob
}

if !ctx.readOnly {
Expand All @@ -55,7 +55,7 @@ type blobHandler struct {
func (bh *blobHandler) GetBlob(w http.ResponseWriter, r *http.Request) {
dcontext.GetLogger(bh).Debug("GetBlob")
blobs := bh.Repository.Blobs(bh)
desc, err := blobs.Stat(bh, bh.Digest)
desc, err := blobs.Stat(bh, bh.Digest) // 从 blobStore 中查询指定 digest 的 data 是否存在
if err != nil {
if err == distribution.ErrBlobUnknown {
bh.Errors = append(bh.Errors, errcode.ErrorCodeBlobUnknown.WithDetail(bh.Digest))
Expand Down
33 changes: 18 additions & 15 deletions registry/handlers/blobupload.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
// blobUploadDispatcher constructs and returns the blob upload handler for the
// given request context.
func blobUploadDispatcher(ctx *Context, r *http.Request) http.Handler {
if r.Method == http.MethodPatch {
dcontext.GetLogger(ctx).Debugf("***blobUploadDispatcher: request: %+v, header: %s", r, r.Header)
}
buh := &blobUploadHandler{
Context: ctx,
UUID: getUploadUUID(ctx),
Expand All @@ -30,8 +33,8 @@ func blobUploadDispatcher(ctx *Context, r *http.Request) http.Handler {

if !ctx.readOnly { //not readonly
handler[http.MethodPost] = http.HandlerFunc(buh.StartBlobUpload)
handler[http.MethodPatch] = http.HandlerFunc(buh.PatchBlobData)
handler[http.MethodPut] = http.HandlerFunc(buh.PutBlobUploadComplete)
handler[http.MethodPatch] = http.HandlerFunc(buh.PatchBlobData) // patch blob data
handler[http.MethodPut] = http.HandlerFunc(buh.PutBlobUploadComplete) // blob upload 完成
handler[http.MethodDelete] = http.HandlerFunc(buh.CancelBlobUpload)
}

Expand Down Expand Up @@ -76,8 +79,8 @@ func (buh *blobUploadHandler) StartBlobUpload(w http.ResponseWriter, r *http.Req
}
}

blobs := buh.Repository.Blobs(buh)
upload, err := blobs.Create(buh, options...)
blobs := buh.Repository.Blobs(buh) // 返回 linkedBlobStore registry/storage/registry.go
upload, err := blobs.Create(buh, options...) // 创建 blobWriter
if err != nil {
if ebm, ok := err.(distribution.ErrBlobMounted); ok {
if err := buh.writeBlobCreatedHeaders(w, ebm.Descriptor); err != nil {
Expand All @@ -91,14 +94,14 @@ func (buh *blobUploadHandler) StartBlobUpload(w http.ResponseWriter, r *http.Req
return
}

buh.Upload = upload
buh.Upload = upload // blobUploadHandler 的 Upload, 即 BlobWriter 在这里赋值

if err := buh.blobUploadResponse(w, r); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}

w.Header().Set("Docker-Upload-UUID", buh.Upload.ID())
w.Header().Set("Docker-Upload-UUID", buh.Upload.ID()) // 返回头里携带 upload UUID
w.WriteHeader(http.StatusAccepted)
}

Expand Down Expand Up @@ -165,7 +168,7 @@ func (buh *blobUploadHandler) PatchBlobData(w http.ResponseWriter, r *http.Reque
}
}

if err := copyFullPayload(buh, w, r, buh.Upload, -1, "blob PATCH"); err != nil {
if err := copyFullPayload(buh, w, r, buh.Upload, -1, "blob PATCH"); err != nil { // blob patch
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err.Error()))
return
}
Expand All @@ -190,7 +193,7 @@ func (buh *blobUploadHandler) PutBlobUploadComplete(w http.ResponseWriter, r *ht
}
defer buh.Upload.Close()

dgstStr := r.FormValue("digest") // TODO(stevvooe): Support multiple digest parameters!
dgstStr := r.FormValue("digest") // TODO(stevvooe): Support multiple digest parameters! 读取 PUT 请求中的 digest 信息

if dgstStr == "" {
// no digest? return error, but allow retry.
Expand All @@ -210,7 +213,7 @@ func (buh *blobUploadHandler) PutBlobUploadComplete(w http.ResponseWriter, r *ht
return
}

desc, err := buh.Upload.Commit(buh, distribution.Descriptor{
desc, err := buh.Upload.Commit(buh, distribution.Descriptor{ // blobWriter commit
Digest: dgst,

// TODO(stevvooe): This isn't wildly important yet, but we should
Expand Down Expand Up @@ -270,7 +273,7 @@ func (buh *blobUploadHandler) CancelBlobUpload(w http.ResponseWriter, r *http.Re
}

func (buh *blobUploadHandler) ResumeBlobUpload(ctx *Context, r *http.Request) http.Handler {
state, err := hmacKey(ctx.Config.HTTP.Secret).unpackUploadState(r.FormValue("_state"))
state, err := hmacKey(ctx.Config.HTTP.Secret).unpackUploadState(r.FormValue("_state")) // 从 request 中获取 upload state 信息
if err != nil {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
dcontext.GetLogger(ctx).Infof("error resolving upload: %v", err)
Expand Down Expand Up @@ -325,11 +328,11 @@ func (buh *blobUploadHandler) blobUploadResponse(w http.ResponseWriter, r *http.
// TODO(stevvooe): Need a better way to manage the upload state automatically.
buh.State.Name = buh.Repository.Named().Name()
buh.State.UUID = buh.Upload.ID()
buh.Upload.Close()
buh.Upload.Close() // 调用 blobWriter 的 Close 方法, 如果 blobWriter 还没有 commit,则会保存 offset
buh.State.Offset = buh.Upload.Size()
buh.State.StartedAt = buh.Upload.StartedAt()

token, err := hmacKey(buh.Config.HTTP.Secret).packUploadState(buh.State)
token, err := hmacKey(buh.Config.HTTP.Secret).packUploadState(buh.State) // 将 upload state 加密成 base64
if err != nil {
dcontext.GetLogger(buh).Infof("error building upload state token: %s", err)
return err
Expand All @@ -338,7 +341,7 @@ func (buh *blobUploadHandler) blobUploadResponse(w http.ResponseWriter, r *http.
uploadURL, err := buh.urlBuilder.BuildBlobUploadChunkURL(
buh.Repository.Named(), buh.Upload.ID(),
url.Values{
"_state": []string{token},
"_state": []string{token}, // 加密过的 upload state 信息
})
if err != nil {
dcontext.GetLogger(buh).Infof("error building upload url: %s", err)
Expand All @@ -351,10 +354,10 @@ func (buh *blobUploadHandler) blobUploadResponse(w http.ResponseWriter, r *http.
}

w.Header().Set("Docker-Upload-UUID", buh.UUID)
w.Header().Set("Location", uploadURL)
w.Header().Set("Location", uploadURL) // upload url

w.Header().Set("Content-Length", "0")
w.Header().Set("Range", fmt.Sprintf("0-%d", endRange))
w.Header().Set("Range", fmt.Sprintf("0-%d", endRange)) // 设置 range

return nil
}
Expand Down
6 changes: 3 additions & 3 deletions registry/handlers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func closeResources(handler http.Handler, closers ...io.Closer) http.Handler {
// upload, it avoids sending a 400 error to keep the logs cleaner.
//
// The copy will be limited to `limit` bytes, if limit is greater than zero.
func copyFullPayload(ctx context.Context, responseWriter http.ResponseWriter, r *http.Request, destWriter io.Writer, limit int64, action string) error {
func copyFullPayload(ctx context.Context, responseWriter http.ResponseWriter, r *http.Request, destWriter io.Writer, limit int64, action string) error { // destWriter 为 linkedBlobStore.Create(), 返回 blobWriter. registry/storage/linkedblobstore.go
// Get a channel that tells us if the client disconnects
clientClosed := r.Context().Done()
body := r.Body
Expand All @@ -39,10 +39,10 @@ func copyFullPayload(ctx context.Context, responseWriter http.ResponseWriter, r

// Read in the data, if any.
start := time.Now()
copied, err := io.CopyBuffer(destWriter, body, make([]byte, 4<<20))
copied, err := io.CopyBuffer(destWriter, body, make([]byte, 4<<20)) // 从 body 中读取并写入到 blobWriter, blobWriter 实现了 ReadFrom 方法
_, ok1 := destWriter.(io.WriterTo)
_, ok2 := body.(io.ReaderFrom)
dcontext.GetLogger(ctx).Infof("****The duration of io.Copy(size: %d) from reader(%T, %t) to writer(%T, %t): %v", copied, body, ok2, destWriter, ok1, time.Since(start))
dcontext.GetLogger(ctx).Infof("****The action duration of io.Copy(action: %s, size: %d) from reader(%T, %t) to writer(%T, %t): %v", action, copied, body, ok2, destWriter, ok1, time.Since(start))
if clientClosed != nil && (err != nil || (r.ContentLength > 0 && copied < r.ContentLength)) {
// Didn't receive as much content as expected. Did the client
// disconnect during the request? If so, avoid returning a 400
Expand Down
Loading

0 comments on commit 1d6638e

Please sign in to comment.