From a8ae211908891ed01049f2c9f80ff8666f9efa3b Mon Sep 17 00:00:00 2001 From: Nathan Hammond Date: Fri, 12 May 2023 23:33:20 +0800 Subject: [PATCH 1/3] Move http cache writing to cacheitem. --- cli/internal/cache/cache_http.go | 90 ++++++----------------------- cli/internal/cacheitem/cacheitem.go | 10 +++- cli/internal/cacheitem/create.go | 11 ++++ cli/internal/cacheitem/restore.go | 9 ++- 4 files changed, 45 insertions(+), 75 deletions(-) diff --git a/cli/internal/cache/cache_http.go b/cli/internal/cache/cache_http.go index f335a954949bc..e32c6b5782f47 100644 --- a/cli/internal/cache/cache_http.go +++ b/cli/internal/cache/cache_http.go @@ -4,24 +4,16 @@ package cache import ( - "archive/tar" "bytes" "errors" "fmt" "io" "io/ioutil" - log "log" "net/http" - "os" - "path/filepath" "strconv" - "time" - - "github.com/DataDog/zstd" "github.com/vercel/turbo/cli/internal/analytics" "github.com/vercel/turbo/cli/internal/cacheitem" - "github.com/vercel/turbo/cli/internal/tarpatch" "github.com/vercel/turbo/cli/internal/turbopath" ) @@ -51,19 +43,15 @@ func (l limiter) release() { <-l } -// mtime is the time we attach for the modification time of all files. -var mtime = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) - -// nobody is the usual uid / gid of the 'nobody' user. -const nobody = 65534 - -func (cache *httpCache) Put(_ turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error { +func (cache *httpCache) Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error { // if cache.writable { cache.requestLimiter.acquire() defer cache.requestLimiter.release() r, w := io.Pipe() - go cache.write(w, hash, files) + + cacheErrorChan := make(chan error, 1) + go cache.write(w, anchor, files, cacheErrorChan) // Read the entire artifact tar into memory so we can easily compute the signature. // Note: retryablehttp.NewRequest reads the files into memory anyways so there's no @@ -79,69 +67,29 @@ func (cache *httpCache) Put(_ turbopath.AbsoluteSystemPath, hash string, duratio return fmt.Errorf("failed to store files in HTTP cache: %w", err) } } + + cacheCreateError := <-cacheErrorChan + if cacheCreateError != nil { + return cacheCreateError + } + return cache.client.PutArtifact(hash, artifactBody, duration, tag) } // write writes a series of files into the given Writer. -func (cache *httpCache) write(w io.WriteCloser, hash string, files []turbopath.AnchoredSystemPath) { - defer w.Close() - defer func() { _ = w.Close() }() - zw := zstd.NewWriter(w) - defer func() { _ = zw.Close() }() - tw := tar.NewWriter(zw) - defer func() { _ = tw.Close() }() - for _, file := range files { - // log.Printf("caching file %v", file) - if err := cache.storeFile(tw, file); err != nil { - log.Printf("[ERROR] Error uploading artifact %s to HTTP cache due to: %s", file, err) - // TODO(jaredpalmer): How can we cancel the request at this point? - } - } -} +func (cache *httpCache) write(w io.WriteCloser, anchor turbopath.AbsoluteSystemPath, files []turbopath.AnchoredSystemPath, cacheErrorChan chan error) { + cacheItem := cacheitem.CreateWriter(w) -func (cache *httpCache) storeFile(tw *tar.Writer, repoRelativePath turbopath.AnchoredSystemPath) error { - absoluteFilePath := repoRelativePath.RestoreAnchor(cache.repoRoot) - info, err := absoluteFilePath.Lstat() - if err != nil { - return err - } - target := "" - if info.Mode()&os.ModeSymlink != 0 { - target, err = absoluteFilePath.Readlink() + for _, file := range files { + err := cacheItem.AddFile(anchor, file) if err != nil { - return err + _ = cacheItem.Close() + cacheErrorChan <- err + return } } - hdr, err := tarpatch.FileInfoHeader(repoRelativePath.ToUnixPath(), info, filepath.ToSlash(target)) - if err != nil { - return err - } - // Ensure posix path for filename written in header. - hdr.Name = repoRelativePath.ToUnixPath().ToString() - // Zero out all timestamps. - hdr.ModTime = mtime - hdr.AccessTime = mtime - hdr.ChangeTime = mtime - // Strip user/group ids. - hdr.Uid = nobody - hdr.Gid = nobody - hdr.Uname = "nobody" - hdr.Gname = "nobody" - if err := tw.WriteHeader(hdr); err != nil { - return err - } else if info.IsDir() || target != "" { - return nil // nothing to write - } - f, err := absoluteFilePath.Open() - if err != nil { - return err - } - defer func() { _ = f.Close() }() - _, err = io.Copy(tw, f) - if errors.Is(err, tar.ErrWriteTooLong) { - log.Printf("Error writing %v to tar file, info: %v, mode: %v, is regular: %v", repoRelativePath, info, info.Mode(), info.Mode().IsRegular()) - } - return err + + cacheErrorChan <- cacheItem.Close() } func (cache *httpCache) Fetch(_ turbopath.AbsoluteSystemPath, key string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { diff --git a/cli/internal/cacheitem/cacheitem.go b/cli/internal/cacheitem/cacheitem.go index 62d8964208456..839171ee904a7 100644 --- a/cli/internal/cacheitem/cacheitem.go +++ b/cli/internal/cacheitem/cacheitem.go @@ -31,7 +31,7 @@ type CacheItem struct { tw *tar.Writer zw io.WriteCloser fileBuffer *bufio.Writer - handle io.Reader + handle interface{} compressed bool } @@ -72,7 +72,13 @@ func (ci *CacheItem) Close() error { // GetSha returns the SHA-512 hash for the CacheItem. func (ci *CacheItem) GetSha() ([]byte, error) { sha := sha512.New() - if _, err := io.Copy(sha, ci.handle); err != nil { + + reader, isReader := ci.handle.(io.Reader) + if !isReader { + panic("can't read from this cache item") + } + + if _, err := io.Copy(sha, reader); err != nil { return nil, err } diff --git a/cli/internal/cacheitem/create.go b/cli/internal/cacheitem/create.go index 452f63ae6be60..a4ba158e05a1c 100644 --- a/cli/internal/cacheitem/create.go +++ b/cli/internal/cacheitem/create.go @@ -32,6 +32,17 @@ func Create(path turbopath.AbsoluteSystemPath) (*CacheItem, error) { return cacheItem, nil } +// CreateWriter makes a new CacheItem using the specified writer. +func CreateWriter(writer io.WriteCloser) *CacheItem { + cacheItem := &CacheItem{ + handle: writer, + compressed: true, + } + + cacheItem.init() + return cacheItem +} + // init prepares the CacheItem for writing. // Wires all the writers end-to-end: // tar.Writer -> zstd.Writer -> fileBuffer -> file diff --git a/cli/internal/cacheitem/restore.go b/cli/internal/cacheitem/restore.go index a2d8c31c2b640..be60000f778af 100644 --- a/cli/internal/cacheitem/restore.go +++ b/cli/internal/cacheitem/restore.go @@ -41,9 +41,14 @@ func (ci *CacheItem) Restore(anchor turbopath.AbsoluteSystemPath) ([]turbopath.A var tr *tar.Reader var closeError error + reader, isReader := ci.handle.(io.Reader) + if !isReader { + panic("can't read from this cache item") + } + // We're reading a tar, possibly wrapped in zstd. if ci.compressed { - zr := zstd.NewReader(ci.handle) + zr := zstd.NewReader(reader) // The `Close` function for compression effectively just returns the singular // error field on the decompressor instance. This is extremely unlikely to be @@ -52,7 +57,7 @@ func (ci *CacheItem) Restore(anchor turbopath.AbsoluteSystemPath) ([]turbopath.A defer func() { closeError = zr.Close() }() tr = tar.NewReader(zr) } else { - tr = tar.NewReader(ci.handle) + tr = tar.NewReader(reader) } // On first attempt to restore it's possible that a link target doesn't exist. From 97a2965455a3573679b4ea4868a9f125f10175f5 Mon Sep 17 00:00:00 2001 From: Nathan Hammond Date: Mon, 15 May 2023 14:03:11 +0800 Subject: [PATCH 2/3] CI cacheitem consistency check. --- .../turborepo-compare-cache-item.yml | 79 +++++++++++++++++++ scripts/server.js | 42 ++++++++++ 2 files changed, 121 insertions(+) create mode 100644 .github/workflows/turborepo-compare-cache-item.yml create mode 100755 scripts/server.js diff --git a/.github/workflows/turborepo-compare-cache-item.yml b/.github/workflows/turborepo-compare-cache-item.yml new file mode 100644 index 0000000000000..becc939b2cea8 --- /dev/null +++ b/.github/workflows/turborepo-compare-cache-item.yml @@ -0,0 +1,79 @@ +name: Turborepo Compare Cache Item + +on: + workflow_dispatch: + inputs: + version: + description: Turborepo release to test. + type: string + default: "canary" + +jobs: + generate_cache_artifact: + strategy: + matrix: + os: [macos-latest, ubuntu-latest, windows-latest] + runs-on: ${{ matrix.os }} + + steps: + - name: Setup Node.js + uses: actions/setup-node@v3 + with: + node-version: 18 + + - name: create-turbo + run: | + npm install -g pnpm turbo@${{ inputs.version }} + pnpm dlx create-turbo@${{ inputs.version }} my-turborepo pnpm + + - name: Run build + run: | + cd my-turborepo + turbo run build --filter=docs --filter=web --summarize --skip-infer -vvv + + - name: Grab Turborepo artifacts + uses: actions/upload-artifact@v3 + with: + name: cache-item-${{ matrix.os }}-${{ inputs.version }} + path: | + my-turborepo/node_modules/.cache/turbo + my-turborepo/.turbo/runs + retention-days: 1 + + use_cache_artifact: + needs: generate_cache_artifact + strategy: + fail-fast: false + matrix: + os: [macos-latest, ubuntu-latest, windows-latest] + cache_os: [macos-latest, ubuntu-latest, windows-latest] + runs-on: ${{ matrix.os }} + + steps: + - name: Setup Node.js + uses: actions/setup-node@v3 + with: + node-version: 18 + + - name: create-turbo + run: | + npm install -g pnpm turbo@${{ inputs.version }} + pnpm dlx create-turbo@${{ inputs.version }} my-turborepo pnpm + + - name: Download cache artifacts + uses: actions/download-artifact@v3 + with: + name: cache-item-${{ matrix.cache_os }}-${{ inputs.version }} + path: my-turborepo + + - name: Check for cache hit + run: | + cd my-turborepo + rm .turbo/runs/*.json + turbo run build --filter=docs --filter=web --summarize --skip-infer -vvv + cat .turbo/runs/*.json | jq -e '.execution.cached == 2' + + - name: Check for functional server + run: | + curl https://raw.githubusercontent.com/vercel/turbo/main/scripts/server.js -O + node server.js my-turborepo/apps/docs diff --git a/scripts/server.js b/scripts/server.js new file mode 100755 index 0000000000000..71ccbf644f245 --- /dev/null +++ b/scripts/server.js @@ -0,0 +1,42 @@ +#!/usr/bin/env node + +const { spawn } = require("child_process"); +const { platform } = require("process"); + +const path = process.argv[2]; + +async function main() { + let errored = false; + + await new Promise((resolve) => { + const command = platform === "win32" ? "pnpm.cmd" : "pnpm"; + const server = spawn(command, ["run", "start"], { cwd: path }); + + server.stdout.on("data", (data) => { + console.log("stdout:"); + console.log(`${data}`); + + // Stable for 5s. + setTimeout(() => { + server.kill(); + }, 5000); + }); + + server.stderr.on("data", (data) => { + console.log("stderr:"); + console.log(`${data}`); + + errored = true; + server.kill(); + }); + + server.on("exit", () => { + console.log(`exit: ${+errored}`); + resolve(); + }); + }); + + process.exit(errored); +} + +main(); From d90427e241c927ad96f081fc3616ed7ff4af2900 Mon Sep 17 00:00:00 2001 From: Nathan Hammond Date: Wed, 17 May 2023 20:27:56 +0800 Subject: [PATCH 3/3] Add unit test --- cli/internal/cache/cache_http_test.go | 49 ++++++++++++++++++++++++--- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/cli/internal/cache/cache_http_test.go b/cli/internal/cache/cache_http_test.go index 7883e358b7ad3..f3a3f23faa00e 100644 --- a/cli/internal/cache/cache_http_test.go +++ b/cli/internal/cache/cache_http_test.go @@ -5,10 +5,11 @@ import ( "bytes" "errors" "net/http" + "os" "testing" "github.com/DataDog/zstd" - + "github.com/vercel/turbo/cli/internal/cacheitem" "github.com/vercel/turbo/cli/internal/fs" "github.com/vercel/turbo/cli/internal/turbopath" "github.com/vercel/turbo/cli/internal/util" @@ -17,9 +18,21 @@ import ( type errorResp struct { err error + t *testing.T } func (sr *errorResp) PutArtifact(hash string, body []byte, duration int, tag string) error { + sr.t.Helper() + outdir := turbopath.AbsoluteSystemPathFromUpstream(sr.t.TempDir()) + cache := cacheitem.FromReader(bytes.NewReader(body), true) + restored, err := cache.Restore(outdir) + + sr.t.Log(restored) + assert.Equal(sr.t, restored[0].ToString(), "one") + assert.Equal(sr.t, restored[1].ToString(), "two") + assert.Equal(sr.t, len(restored), 2) + assert.NilError(sr.t, err, "Restoration was successful.") + return sr.err } @@ -240,6 +253,34 @@ func TestRestoreInvalidTar(t *testing.T) { assert.Equal(t, string(contents), string(expectedContents), "expected to not overwrite file") } -// Note that testing Put will require mocking the filesystem and is not currently the most -// interesting test. The current implementation directly returns the error from PutArtifact. -// We should still add the test once feasible to avoid future breakage. +func Test_httpCache_Put(t *testing.T) { + root := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) + _ = root.Join("one").WriteFile(nil, 0644) + _ = root.Join("two").WriteFile(nil, 0644) + + clientErr := errors.New("PutArtifact") + client := &errorResp{err: clientErr, t: t} + + cache := newHTTPCache(Opts{}, client, nil, root) + + assert.ErrorIs( + t, + cache.Put(root, "000", 10, []turbopath.AnchoredSystemPath{"one", "two"}), + clientErr, + "Succeeds at writing, cache item is successfully passed through.", + ) + + assert.ErrorIs( + t, + cache.Put(root, "000", 10, []turbopath.AnchoredSystemPath{"one", "two", "missing"}), + os.ErrNotExist, + "Errors with missing file.", + ) + + assert.ErrorIs( + t, + cache.Put(root, "000", 10, []turbopath.AnchoredSystemPath{"missing", "one", "two"}), + os.ErrNotExist, + "Errors with missing file at first load.", + ) +}