-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move http cache writing to cacheitem. #4919
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
name: Turborepo Compare Cache Item | ||
|
||
on: | ||
workflow_dispatch: | ||
inputs: | ||
version: | ||
description: Turborepo release to test. | ||
type: string | ||
default: "canary" | ||
|
||
jobs: | ||
generate_cache_artifact: | ||
strategy: | ||
matrix: | ||
os: [macos-latest, ubuntu-latest, windows-latest] | ||
runs-on: ${{ matrix.os }} | ||
|
||
steps: | ||
- name: Setup Node.js | ||
uses: actions/setup-node@v3 | ||
with: | ||
node-version: 18 | ||
|
||
- name: create-turbo | ||
run: | | ||
npm install -g pnpm turbo@${{ inputs.version }} | ||
pnpm dlx create-turbo@${{ inputs.version }} my-turborepo pnpm | ||
|
||
- name: Run build | ||
run: | | ||
cd my-turborepo | ||
turbo run build --filter=docs --filter=web --summarize --skip-infer -vvv | ||
|
||
- name: Grab Turborepo artifacts | ||
uses: actions/upload-artifact@v3 | ||
with: | ||
name: cache-item-${{ matrix.os }}-${{ inputs.version }} | ||
path: | | ||
my-turborepo/node_modules/.cache/turbo | ||
my-turborepo/.turbo/runs | ||
retention-days: 1 | ||
|
||
use_cache_artifact: | ||
needs: generate_cache_artifact | ||
strategy: | ||
fail-fast: false | ||
matrix: | ||
os: [macos-latest, ubuntu-latest, windows-latest] | ||
cache_os: [macos-latest, ubuntu-latest, windows-latest] | ||
runs-on: ${{ matrix.os }} | ||
|
||
steps: | ||
- name: Setup Node.js | ||
uses: actions/setup-node@v3 | ||
with: | ||
node-version: 18 | ||
|
||
- name: create-turbo | ||
run: | | ||
npm install -g pnpm turbo@${{ inputs.version }} | ||
pnpm dlx create-turbo@${{ inputs.version }} my-turborepo pnpm | ||
|
||
- name: Download cache artifacts | ||
uses: actions/download-artifact@v3 | ||
with: | ||
name: cache-item-${{ matrix.cache_os }}-${{ inputs.version }} | ||
path: my-turborepo | ||
|
||
- name: Check for cache hit | ||
run: | | ||
cd my-turborepo | ||
rm .turbo/runs/*.json | ||
turbo run build --filter=docs --filter=web --summarize --skip-infer -vvv | ||
cat .turbo/runs/*.json | jq -e '.execution.cached == 2' | ||
|
||
- name: Check for functional server | ||
run: | | ||
curl https://raw.githubusercontent.com/vercel/turbo/main/scripts/server.js -O | ||
node server.js my-turborepo/apps/docs |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,24 +4,16 @@ | |
package cache | ||
|
||
import ( | ||
"archive/tar" | ||
"bytes" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"io/ioutil" | ||
log "log" | ||
"net/http" | ||
"os" | ||
"path/filepath" | ||
"strconv" | ||
"time" | ||
|
||
"github.com/DataDog/zstd" | ||
|
||
"github.com/vercel/turbo/cli/internal/analytics" | ||
"github.com/vercel/turbo/cli/internal/cacheitem" | ||
"github.com/vercel/turbo/cli/internal/tarpatch" | ||
"github.com/vercel/turbo/cli/internal/turbopath" | ||
) | ||
|
||
|
@@ -51,19 +43,15 @@ func (l limiter) release() { | |
<-l | ||
} | ||
|
||
// mtime is the time we attach for the modification time of all files. | ||
var mtime = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) | ||
|
||
// nobody is the usual uid / gid of the 'nobody' user. | ||
const nobody = 65534 | ||
|
||
func (cache *httpCache) Put(_ turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error { | ||
func (cache *httpCache) Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error { | ||
// if cache.writable { | ||
cache.requestLimiter.acquire() | ||
defer cache.requestLimiter.release() | ||
|
||
r, w := io.Pipe() | ||
go cache.write(w, hash, files) | ||
|
||
cacheErrorChan := make(chan error, 1) | ||
go cache.write(w, anchor, files, cacheErrorChan) | ||
|
||
// Read the entire artifact tar into memory so we can easily compute the signature. | ||
// Note: retryablehttp.NewRequest reads the files into memory anyways so there's no | ||
|
@@ -79,69 +67,29 @@ func (cache *httpCache) Put(_ turbopath.AbsoluteSystemPath, hash string, duratio | |
return fmt.Errorf("failed to store files in HTTP cache: %w", err) | ||
} | ||
} | ||
|
||
cacheCreateError := <-cacheErrorChan | ||
if cacheCreateError != nil { | ||
return cacheCreateError | ||
} | ||
|
||
return cache.client.PutArtifact(hash, artifactBody, duration, tag) | ||
} | ||
|
||
// write writes a series of files into the given Writer. | ||
func (cache *httpCache) write(w io.WriteCloser, hash string, files []turbopath.AnchoredSystemPath) { | ||
defer w.Close() | ||
defer func() { _ = w.Close() }() | ||
zw := zstd.NewWriter(w) | ||
defer func() { _ = zw.Close() }() | ||
tw := tar.NewWriter(zw) | ||
defer func() { _ = tw.Close() }() | ||
for _, file := range files { | ||
// log.Printf("caching file %v", file) | ||
if err := cache.storeFile(tw, file); err != nil { | ||
log.Printf("[ERROR] Error uploading artifact %s to HTTP cache due to: %s", file, err) | ||
// TODO(jaredpalmer): How can we cancel the request at this point? | ||
} | ||
} | ||
} | ||
func (cache *httpCache) write(w io.WriteCloser, anchor turbopath.AbsoluteSystemPath, files []turbopath.AnchoredSystemPath, cacheErrorChan chan error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is now 1:1 with |
||
cacheItem := cacheitem.CreateWriter(w) | ||
|
||
func (cache *httpCache) storeFile(tw *tar.Writer, repoRelativePath turbopath.AnchoredSystemPath) error { | ||
absoluteFilePath := repoRelativePath.RestoreAnchor(cache.repoRoot) | ||
info, err := absoluteFilePath.Lstat() | ||
if err != nil { | ||
return err | ||
} | ||
target := "" | ||
if info.Mode()&os.ModeSymlink != 0 { | ||
target, err = absoluteFilePath.Readlink() | ||
for _, file := range files { | ||
err := cacheItem.AddFile(anchor, file) | ||
if err != nil { | ||
return err | ||
_ = cacheItem.Close() | ||
cacheErrorChan <- err | ||
return | ||
} | ||
} | ||
hdr, err := tarpatch.FileInfoHeader(repoRelativePath.ToUnixPath(), info, filepath.ToSlash(target)) | ||
if err != nil { | ||
return err | ||
} | ||
// Ensure posix path for filename written in header. | ||
hdr.Name = repoRelativePath.ToUnixPath().ToString() | ||
// Zero out all timestamps. | ||
hdr.ModTime = mtime | ||
hdr.AccessTime = mtime | ||
hdr.ChangeTime = mtime | ||
// Strip user/group ids. | ||
hdr.Uid = nobody | ||
hdr.Gid = nobody | ||
hdr.Uname = "nobody" | ||
hdr.Gname = "nobody" | ||
if err := tw.WriteHeader(hdr); err != nil { | ||
return err | ||
} else if info.IsDir() || target != "" { | ||
return nil // nothing to write | ||
} | ||
f, err := absoluteFilePath.Open() | ||
if err != nil { | ||
return err | ||
} | ||
defer func() { _ = f.Close() }() | ||
_, err = io.Copy(tw, f) | ||
if errors.Is(err, tar.ErrWriteTooLong) { | ||
log.Printf("Error writing %v to tar file, info: %v, mode: %v, is regular: %v", repoRelativePath, info, info.Mode(), info.Mode().IsRegular()) | ||
} | ||
return err | ||
|
||
cacheErrorChan <- cacheItem.Close() | ||
} | ||
|
||
func (cache *httpCache) Fetch(_ turbopath.AbsoluteSystemPath, key string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,7 +31,7 @@ type CacheItem struct { | |
tw *tar.Writer | ||
zw io.WriteCloser | ||
fileBuffer *bufio.Writer | ||
handle io.Reader | ||
handle interface{} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Caches are #4634 made this partially generic, this finishes the work. |
||
compressed bool | ||
} | ||
|
||
|
@@ -72,7 +72,13 @@ func (ci *CacheItem) Close() error { | |
// GetSha returns the SHA-512 hash for the CacheItem. | ||
func (ci *CacheItem) GetSha() ([]byte, error) { | ||
sha := sha512.New() | ||
if _, err := io.Copy(sha, ci.handle); err != nil { | ||
|
||
reader, isReader := ci.handle.(io.Reader) | ||
if !isReader { | ||
panic("can't read from this cache item") | ||
} | ||
|
||
if _, err := io.Copy(sha, reader); err != nil { | ||
return nil, err | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
#!/usr/bin/env node | ||
|
||
const { spawn } = require("child_process"); | ||
const { platform } = require("process"); | ||
|
||
const path = process.argv[2]; | ||
|
||
async function main() { | ||
let errored = false; | ||
|
||
await new Promise((resolve) => { | ||
const command = platform === "win32" ? "pnpm.cmd" : "pnpm"; | ||
const server = spawn(command, ["run", "start"], { cwd: path }); | ||
|
||
server.stdout.on("data", (data) => { | ||
console.log("stdout:"); | ||
console.log(`${data}`); | ||
|
||
// Stable for 5s. | ||
setTimeout(() => { | ||
server.kill(); | ||
}, 5000); | ||
}); | ||
|
||
server.stderr.on("data", (data) => { | ||
console.log("stderr:"); | ||
console.log(`${data}`); | ||
|
||
errored = true; | ||
server.kill(); | ||
}); | ||
|
||
server.on("exit", () => { | ||
console.log(`exit: ${+errored}`); | ||
resolve(); | ||
}); | ||
}); | ||
|
||
process.exit(errored); | ||
} | ||
|
||
main(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fixed by using the channel. Previously failing here didn't actually blow up the artifact.