Skip to content

Commit

Permalink
fix merge
Browse files Browse the repository at this point in the history
  • Loading branch information
wxiaoguang committed May 19, 2023
1 parent bf8db10 commit 3cc1f10
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 45 deletions.
4 changes: 4 additions & 0 deletions modules/web/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ var argTypeProvider = map[reflect.Type]func(req *http.Request) ResponseStatusPro
reflect.TypeOf(&context.PrivateContext{}): func(req *http.Request) ResponseStatusProvider { return context.GetPrivateContext(req) },
}

func RegisterHandleTypeProvider[T any](fn func(req *http.Request) ResponseStatusProvider) {
argTypeProvider[reflect.TypeOf((*T)(nil)).Elem()] = fn
}

// responseWriter is a wrapper of http.ResponseWriter, to check whether the response has been written
type responseWriter struct {
respWriter http.ResponseWriter
Expand Down
99 changes: 55 additions & 44 deletions routers/api/actions/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package actions

// Github Actions Artifacts API Simple Description
// GitHub Actions Artifacts API Simple Description
//
// 1. Upload artifact
// 1.1. Post upload url
Expand Down Expand Up @@ -63,7 +63,6 @@ package actions

import (
"compress/gzip"
gocontext "context"
"crypto/md5"
"encoding/base64"
"errors"
Expand Down Expand Up @@ -92,9 +91,25 @@ const (

const artifactRouteBase = "/_apis/pipelines/workflows/{run_id}/artifacts"

func ArtifactsRoutes(goctx gocontext.Context, prefix string) *web.Route {
type artifactContextKeyType struct{}

var artifactContextKey = artifactContextKeyType{}

type ArtifactContext struct {
*context.Base

ActionTask *actions.ActionTask
}

func init() {
web.RegisterHandleTypeProvider[*ArtifactContext](func(req *http.Request) web.ResponseStatusProvider {
return req.Context().Value(artifactContextKey).(*ArtifactContext)
})
}

func ArtifactsRoutes(prefix string) *web.Route {
m := web.NewRoute()
m.Use(withContexter(goctx))
m.Use(ArtifactContexter())

r := artifactRoutes{
prefix: prefix,
Expand All @@ -115,15 +130,14 @@ func ArtifactsRoutes(goctx gocontext.Context, prefix string) *web.Route {
return m
}

// withContexter initializes a package context for a request.
func withContexter(goctx gocontext.Context) func(next http.Handler) http.Handler {
func ArtifactContexter() func(next http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
ctx := context.Context{
Resp: context.NewResponse(resp),
Data: map[string]interface{}{},
}
defer ctx.Close()
base, baseCleanUp := context.NewBaseContext(resp, req)
defer baseCleanUp()

ctx := &ArtifactContext{Base: base}
ctx.AppendContextValue(artifactContextKey, ctx)

// action task call server api with Bearer ACTIONS_RUNTIME_TOKEN
// we should verify the ACTIONS_RUNTIME_TOKEN
Expand All @@ -132,23 +146,22 @@ func withContexter(goctx gocontext.Context) func(next http.Handler) http.Handler
ctx.Error(http.StatusUnauthorized, "Bad authorization header")
return
}

authToken := strings.TrimPrefix(authHeader, "Bearer ")
task, err := actions.GetRunningTaskByToken(req.Context(), authToken)
if err != nil {
log.Error("Error runner api getting task: %v", err)
ctx.Error(http.StatusInternalServerError, "Error runner api getting task")
return
}
ctx.Data["task"] = task

if err := task.LoadJob(goctx); err != nil {
if err := task.LoadJob(req.Context()); err != nil {
log.Error("Error runner api getting job: %v", err)
ctx.Error(http.StatusInternalServerError, "Error runner api getting job")
return
}

ctx.Req = context.WithContext(req, &ctx)

ctx.ActionTask = task
next.ServeHTTP(ctx.Resp, ctx.Req)
})
}
Expand All @@ -175,13 +188,8 @@ type getUploadArtifactResponse struct {
FileContainerResourceURL string `json:"fileContainerResourceUrl"`
}

func (ar artifactRoutes) validateRunID(ctx *context.Context) (*actions.ActionTask, int64, bool) {
task, ok := ctx.Data["task"].(*actions.ActionTask)
if !ok {
log.Error("Error getting task in context")
ctx.Error(http.StatusInternalServerError, "Error getting task in context")
return nil, 0, false
}
func (ar artifactRoutes) validateRunID(ctx *ArtifactContext) (*actions.ActionTask, int64, bool) {
task := ctx.ActionTask
runID := ctx.ParamsInt64("run_id")
if task.Job.RunID != runID {
log.Error("Error runID not match")
Expand All @@ -192,7 +200,7 @@ func (ar artifactRoutes) validateRunID(ctx *context.Context) (*actions.ActionTas
}

// getUploadArtifactURL generates a URL for uploading an artifact
func (ar artifactRoutes) getUploadArtifactURL(ctx *context.Context) {
func (ar artifactRoutes) getUploadArtifactURL(ctx *ArtifactContext) {
task, runID, ok := ar.validateRunID(ctx)
if !ok {
return
Expand Down Expand Up @@ -220,7 +228,7 @@ func (ar artifactRoutes) getUploadArtifactURL(ctx *context.Context) {

// getUploadFileSize returns the size of the file to be uploaded.
// The raw size is the size of the file as reported by the header X-TFS-FileLength.
func (ar artifactRoutes) getUploadFileSize(ctx *context.Context) (int64, int64, error) {
func (ar artifactRoutes) getUploadFileSize(ctx *ArtifactContext) (int64, int64, error) {
contentLength := ctx.Req.ContentLength
xTfsLength, _ := strconv.ParseInt(ctx.Req.Header.Get(artifactXTfsFileLengthHeader), 10, 64)
if xTfsLength > 0 {
Expand All @@ -229,7 +237,7 @@ func (ar artifactRoutes) getUploadFileSize(ctx *context.Context) (int64, int64,
return contentLength, contentLength, nil
}

func (ar artifactRoutes) saveUploadChunk(ctx *context.Context,
func (ar artifactRoutes) saveUploadChunk(ctx *ArtifactContext,
artifact *actions.ActionArtifact,
contentSize, runID int64,
) (int64, error) {
Expand Down Expand Up @@ -273,7 +281,7 @@ func (ar artifactRoutes) saveUploadChunk(ctx *context.Context,
// The rules are from https://github.com/actions/toolkit/blob/main/packages/artifact/src/internal/path-and-artifact-name-validation.ts#L32
var invalidArtifactNameChars = strings.Join([]string{"\\", "/", "\"", ":", "<", ">", "|", "*", "?", "\r", "\n"}, "")

func (ar artifactRoutes) uploadArtifact(ctx *context.Context) {
func (ar artifactRoutes) uploadArtifact(ctx *ArtifactContext) {
_, runID, ok := ar.validateRunID(ctx)
if !ok {
return
Expand Down Expand Up @@ -341,7 +349,7 @@ func (ar artifactRoutes) uploadArtifact(ctx *context.Context) {

// comfirmUploadArtifact comfirm upload artifact.
// if all chunks are uploaded, merge them to one file.
func (ar artifactRoutes) comfirmUploadArtifact(ctx *context.Context) {
func (ar artifactRoutes) comfirmUploadArtifact(ctx *ArtifactContext) {
_, runID, ok := ar.validateRunID(ctx)
if !ok {
return
Expand All @@ -364,7 +372,7 @@ type chunkItem struct {
Path string
}

func (ar artifactRoutes) mergeArtifactChunks(ctx *context.Context, runID int64) error {
func (ar artifactRoutes) mergeArtifactChunks(ctx *ArtifactContext, runID int64) error {
storageDir := fmt.Sprintf("tmp%d", runID)
var chunks []*chunkItem
if err := ar.fs.IterateObjects(storageDir, func(path string, obj storage.Object) error {
Expand Down Expand Up @@ -415,14 +423,20 @@ func (ar artifactRoutes) mergeArtifactChunks(ctx *context.Context, runID int64)

// use multiReader
readers := make([]io.Reader, 0, len(allChunks))
readerClosers := make([]io.Closer, 0, len(allChunks))
closeReaders := func() {
for _, r := range readers {
_ = r.(io.Closer).Close() // it guarantees to be io.Closer by the following loop's Open function
}
readers = nil
}
defer closeReaders()

for _, c := range allChunks {
reader, err := ar.fs.Open(c.Path)
if err != nil {
var readCloser io.ReadCloser
if readCloser, err = ar.fs.Open(c.Path); err != nil {
return fmt.Errorf("open chunk error: %v, %s", err, c.Path)
}
readers = append(readers, reader)
readerClosers = append(readerClosers, reader)
readers = append(readers, readCloser)
}
mergedReader := io.MultiReader(readers...)

Expand All @@ -445,11 +459,6 @@ func (ar artifactRoutes) mergeArtifactChunks(ctx *context.Context, runID int64)
return fmt.Errorf("merged file size is not equal to chunk length")
}

// close readers
for _, r := range readerClosers {
r.Close()
}

// save storage path to artifact
log.Debug("[artifact] merge chunks to artifact: %d, %s", artifact.ID, storagePath)
artifact.StoragePath = storagePath
Expand All @@ -458,6 +467,8 @@ func (ar artifactRoutes) mergeArtifactChunks(ctx *context.Context, runID int64)
return fmt.Errorf("update artifact error: %v", err)
}

closeReaders() // close before delete

// drop chunks
for _, c := range cs {
if err := ar.fs.Delete(c.Path); err != nil {
Expand All @@ -479,21 +490,21 @@ type (
}
)

func (ar artifactRoutes) listArtifacts(ctx *context.Context) {
func (ar artifactRoutes) listArtifacts(ctx *ArtifactContext) {
_, runID, ok := ar.validateRunID(ctx)
if !ok {
return
}

artficats, err := actions.ListArtifactsByRunID(ctx, runID)
artifacts, err := actions.ListArtifactsByRunID(ctx, runID)
if err != nil {
log.Error("Error getting artifacts: %v", err)
ctx.Error(http.StatusInternalServerError, err.Error())
return
}

artficatsData := make([]listArtifactsResponseItem, 0, len(artficats))
for _, a := range artficats {
artficatsData := make([]listArtifactsResponseItem, 0, len(artifacts))
for _, a := range artifacts {
artficatsData = append(artficatsData, listArtifactsResponseItem{
Name: a.ArtifactName,
FileContainerResourceURL: ar.buildArtifactURL(runID, a.ID, "path"),
Expand All @@ -517,7 +528,7 @@ type (
}
)

func (ar artifactRoutes) getDownloadArtifactURL(ctx *context.Context) {
func (ar artifactRoutes) getDownloadArtifactURL(ctx *ArtifactContext) {
_, runID, ok := ar.validateRunID(ctx)
if !ok {
return
Expand Down Expand Up @@ -546,7 +557,7 @@ func (ar artifactRoutes) getDownloadArtifactURL(ctx *context.Context) {
ctx.JSON(http.StatusOK, respData)
}

func (ar artifactRoutes) downloadArtifact(ctx *context.Context) {
func (ar artifactRoutes) downloadArtifact(ctx *ArtifactContext) {
_, runID, ok := ar.validateRunID(ctx)
if !ok {
return
Expand Down
2 changes: 1 addition & 1 deletion routers/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func NormalRoutes(ctx context.Context) *web.Route {
// In Github, it uses ACTIONS_RUNTIME_URL=https://pipelines.actions.githubusercontent.com/fLgcSHkPGySXeIFrg8W8OBSfeg3b5Fls1A1CwX566g8PayEGlg/
// TODO: this prefix should be generated with a token string with runner ?
prefix = "/api/actions_pipeline"
r.Mount(prefix, actions_router.ArtifactsRoutes(ctx, prefix))
r.Mount(prefix, actions_router.ArtifactsRoutes(prefix))
}

return r
Expand Down

0 comments on commit 3cc1f10

Please sign in to comment.