Skip to content

Commit

Permalink
Merge pull request #392 from berty/dev/moul/advanced-caching
Browse files Browse the repository at this point in the history
feat: advanced artifact caching
  • Loading branch information
moul authored Oct 2, 2020
2 parents 587f43f + 77c3539 commit da457cd
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 136 deletions.
3 changes: 2 additions & 1 deletion go/pkg/yolosvc/api_artifactgetfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-chi/chi"
"google.golang.org/grpc/codes"
"moul.io/pkgman/pkg/ipa"
"moul.io/u"
)

func (svc *service) ArtifactGetFile(w http.ResponseWriter, r *http.Request) {
Expand All @@ -29,7 +30,7 @@ func (svc *service) ArtifactGetFile(w http.ResponseWriter, r *http.Request) {
}

artifactPath := filepath.Join(svc.artifactsCachePath, artifact.ID)
if !fileExists(artifactPath) {
if !u.FileExists(artifactPath) {
httpError(w, err, codes.NotFound)
return
}
Expand Down
3 changes: 2 additions & 1 deletion go/pkg/yolosvc/api_artifacticon.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (

"github.com/go-chi/chi"
"google.golang.org/grpc/codes"
"moul.io/u"
)

func (svc *service) ArtifactIcon(w http.ResponseWriter, r *http.Request) {
name := chi.URLParam(r, "name")
name = strings.ReplaceAll(name, "/", string(os.PathSeparator))

p := filepath.Join(svc.artifactsCachePath, "icons", name)
if !fileExists(p) {
if !u.FileExists(p) {
httpError(w, fmt.Errorf("no such icon"), codes.InvalidArgument)
return
}
Expand Down
227 changes: 129 additions & 98 deletions go/pkg/yolosvc/api_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"time"

"berty.tech/yolo/v2/go/pkg/bintray"
Expand All @@ -26,15 +27,28 @@ import (
)

func (svc *service) ArtifactDownloader(w http.ResponseWriter, r *http.Request) {
// FIXME: if caching enabled, lock by artifact ID

id := chi.URLParam(r, "artifactID")
var artifact yolopb.Artifact
err := svc.db.First(&artifact, "ID = ?", id).Error
if err != nil {
if err := svc.db.First(&artifact, "ID = ?", id).Error; err != nil {
httpError(w, err, codes.InvalidArgument)
return
}
svc.logger.Debug("artifact downloader", zap.Any("artifact", artifact))

// save download
{
now := time.Now()
download := yolopb.Download{
HasArtifactID: artifact.ID,
CreatedAt: &now,
// FIXME: user agent for analytics?
}
err := svc.db.Create(&download).Error
if err != nil {
svc.logger.Warn("failed to add download log entry", zap.Error(err))
}
}

switch ext := filepath.Ext(artifact.LocalPath); ext {
case ".unsigned-ipa", ".dummy-signed-ipa":
Expand All @@ -51,39 +65,125 @@ func (svc *service) ArtifactDownloader(w http.ResponseWriter, r *http.Request) {
return
}

// send some headers early, to make loading icon appearing soon on the iOS device
{
filename := strings.TrimSuffix(path.Base(artifact.LocalPath), ext) + ".ipa"
w.Header().Add("Content-Disposition", fmt.Sprintf("attachment; filename=%s", filename))
if artifact.MimeType != "" {
w.Header().Add("Content-Type", artifact.MimeType)
}
}

// FIXME: cache file + send content-length

err := svc.signAndStreamIPA(artifact, w)
var (
cacheKey = artifact.ID + ".signed"
filename = strings.TrimSuffix(path.Base(artifact.LocalPath), ext) + ".ipa"
mimetype = artifact.MimeType
filesize = int64(0) // will be automatically computed if using cache
)
err := svc.sendFileMayCache(filename, cacheKey, mimetype, filesize, w, func(w io.Writer) error {
return svc.signAndStreamIPA(artifact, w)
})
if err != nil {
httpError(w, err, codes.Internal)
}
default:
base := path.Base(artifact.LocalPath)
w.Header().Add("Content-Disposition", fmt.Sprintf("attachment; filename=%s", base))
if artifact.FileSize > 0 {
w.Header().Add("Content-Length", fmt.Sprintf("%d", artifact.FileSize))
var (
cacheKey = artifact.ID
filename = path.Base(artifact.LocalPath)
mimetype = artifact.MimeType
filesize = artifact.FileSize
)
err := svc.sendFileMayCache(filename, cacheKey, mimetype, filesize, w, func(w io.Writer) error {
return svc.artifactDownloadFromProvider(&artifact, w)
})
if err != nil {
httpError(w, err, codes.Internal)
}
}
}

func (svc *service) streamMayCache(cacheKey string, w io.Writer, fn func(io.Writer) error) error {
svc.logger.Debug("stream may cache", zap.String("cachekey", cacheKey))
// if cache is disabled, just stream the file
if svc.artifactsCachePath == "" {
return fn(w)
}

// check if cache already exists or create it
cache := filepath.Join(svc.artifactsCachePath, cacheKey)
{
svc.artifactsCacheMutex.Lock()
if _, found := svc.artifactsCacheMapMutex[cacheKey]; !found {
svc.artifactsCacheMapMutex[cacheKey] = &sync.Mutex{}
}
cacheKeyMutex := svc.artifactsCacheMapMutex[cacheKey]
cacheKeyMutex.Lock()
svc.artifactsCacheMutex.Unlock()

if !u.FileExists(cache) {
out, err := os.Create(cache + ".tmp")
if err != nil {
cacheKeyMutex.Unlock()
return err
}

// FIXME: tee to send directly to the writer

if err := fn(out); err != nil {
out.Close()
os.Remove(cache + ".tmp")
cacheKeyMutex.Unlock()
return err
}
out.Close()

if err := os.Rename(cache+".tmp", cache); err != nil {
os.Remove(cache + ".tmp")
cacheKeyMutex.Unlock()
return err
}
}
if artifact.MimeType != "" {
w.Header().Add("Content-Type", artifact.MimeType)
cacheKeyMutex.Unlock()
}

// send cache + more metadata
{
f, err := os.Open(cache)
if err != nil {
return err
}
defer f.Close()

err := svc.artifactToStream(artifact, w)
_, err = io.Copy(w, f)
if err != nil {
httpError(w, err, codes.Internal)
return err
}
}

return nil
}

func (svc *service) sendFileMayCache(filename, cacheKey, mimetype string, filesize int64, w http.ResponseWriter, fn func(io.Writer) error) error {
svc.logger.Debug("send file may cache", zap.String("cachekey", cacheKey), zap.String("filename", filename), zap.String("mimetype", mimetype), zap.Int64("filesize", filesize))
w.Header().Add("Content-Disposition", fmt.Sprintf("attachment; filename=%s", filename))
if filesize > 0 {
w.Header().Add("Content-Length", fmt.Sprintf("%d", filesize))
}
if mimetype != "" {
w.Header().Add("Content-Type", mimetype)
}
// FIXME: cache-control and expires

// if cache is disabled, just stream fn to the writer
if svc.artifactsCachePath == "" {
return fn(w)
}

// if filesize wasn't set, we compute the size based on cache size
if filesize == 0 {
// check if cache already exists to send filesize
cache := filepath.Join(svc.artifactsCachePath, cacheKey)
if stat, err := os.Stat(cache); err == nil {
w.Header().Add("Content-Length", fmt.Sprintf("%d", stat.Size()))
}
}

return svc.streamMayCache(cacheKey, w, fn)
}

func (svc *service) signAndStreamIPA(artifact yolopb.Artifact, w io.Writer) error {
svc.logger.Debug("sign and stream IPA", zap.Any("artifact", artifact))
// sign ipa
var signed string
{
Expand All @@ -100,7 +200,10 @@ func (svc *service) signAndStreamIPA(artifact yolopb.Artifact, w io.Writer) erro
if err != nil {
return err
}
err = svc.artifactToStream(artifact, f)

err = svc.streamMayCache(artifact.ID, f, func(w io.Writer) error {
return svc.artifactDownloadFromProvider(&artifact, w)
})
if err != nil {
return err
}
Expand Down Expand Up @@ -148,81 +251,9 @@ func (svc *service) signAndStreamIPA(artifact yolopb.Artifact, w io.Writer) erro
return nil
}

func (svc *service) artifactToStream(artifact yolopb.Artifact, w io.Writer) error {
cache := filepath.Join(svc.artifactsCachePath, artifact.ID)
// download missing cache
if svc.artifactsCachePath != "" {
svc.artifactsCacheMutex.Lock()
if !fileExists(cache) {
err := svc.artifactDownloadToFile(&artifact, cache)
if err != nil {
svc.artifactsCacheMutex.Unlock()
return err
}
}
svc.artifactsCacheMutex.Unlock()
}

// save download
now := time.Now()
download := yolopb.Download{
HasArtifactID: artifact.ID,
CreatedAt: &now,
// FIXME: user agent for analytics?
}
err := svc.db.Create(&download).Error
if err != nil {
svc.logger.Warn("failed to add download log entry", zap.Error(err))
}

if svc.artifactsCachePath != "" {
// send cache
f, err := os.Open(cache)
if err != nil {
return err
}
defer f.Close()
_, err = io.Copy(w, f)
if err != nil {
return err
}
} else {
// proxy
ctx := context.Background()
err = svc.artifactDownloadFromProvider(ctx, &artifact, w)
if err != nil {
return err
}
}
return nil
}

func (svc *service) artifactDownloadToFile(artifact *yolopb.Artifact, dest string) error {
out, err := os.Create(dest + ".tmp")
if err != nil {
return err
}

func (svc *service) artifactDownloadFromProvider(artifact *yolopb.Artifact, w io.Writer) error {
svc.logger.Debug("download from provider", zap.Any("artifact", artifact))
ctx := context.Background()
err = svc.artifactDownloadFromProvider(ctx, artifact, out)
if err != nil {
out.Close()
return err
}

out.Close()

err = os.Rename(dest+".tmp", dest)
if err != nil {
return err
}

// FIXME: parse file and update the db with new metadata

return nil
}

func (svc *service) artifactDownloadFromProvider(ctx context.Context, artifact *yolopb.Artifact, w io.Writer) error {
switch artifact.Driver {
case yolopb.Driver_Buildkite:
if svc.bkc == nil {
Expand Down
3 changes: 2 additions & 1 deletion go/pkg/yolosvc/driver_pkgman.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.uber.org/zap"
"moul.io/pkgman/pkg/apk"
"moul.io/pkgman/pkg/ipa"
"moul.io/u"
)

type PkgmanWorkerOpts struct {
Expand All @@ -36,7 +37,7 @@ func (svc *service) PkgmanWorker(ctx context.Context, opts PkgmanWorkerOpts) err

for _, artifact := range artifacts {
cache := filepath.Join(svc.artifactsCachePath, artifact.ID)
if !fileExists(cache) {
if !u.FileExists(cache) {
continue
}
err = svc.pkgmanParseArtifactFile(artifact, cache)
Expand Down
Loading

0 comments on commit da457cd

Please sign in to comment.