From 061cfa2237654614be304584b50171cf61a6daff Mon Sep 17 00:00:00 2001 From: Mostyn Bramley-Moore Date: Fri, 14 Feb 2020 18:52:45 +0100 Subject: [PATCH] disk cache: store a data integrity header for non-CAS blobs The header is made up of three fields: 1) Little-endian int32 (4 bytes) representing the REAPIv2 DigestFunction. 2) Little-endian int64 (8 bytes) representing the number of bytes in the blob. 3) The hash bytes from the digest, length determined by the particular DigestFunction. (32 for SHA256. 20 for SHA1, 16 for MD5). Note that we currently only support SHA256, however. This header is simple to parse, and does not require buffering the entire blob in memory if you just want the data. To distinguish blobs with and without this header, we use new directories for the affected blobs: ac.v2/ instead of ac/ and similarly for raw/. We do not use this header to actually verify data yet, and we still os.File.Sync() after file writes (#67). This also includes a slightly refactored version of PR #123 (load the items from disk concurrently) by @bdittmer. --- cache/disk/BUILD.bazel | 2 + cache/disk/disk.go | 365 ++++++++++++++++++++++++++++------------ cache/disk/disk_test.go | 246 ++++++++++++++++++++++----- cache/disk/load.go | 364 +++++++++++++++++++++++++++++++++++++++ server/http_test.go | 18 +- 5 files changed, 845 insertions(+), 150 deletions(-) create mode 100644 cache/disk/load.go diff --git a/cache/disk/BUILD.bazel b/cache/disk/BUILD.bazel index 36a81ffde..8e49da4e9 100644 --- a/cache/disk/BUILD.bazel +++ b/cache/disk/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "go_default_library", srcs = [ "disk.go", + "load.go", "lru.go", ], importpath = "github.com/buchgr/bazel-remote/cache/disk", @@ -29,5 +30,6 @@ go_test( "//cache:go_default_library", "//cache/http:go_default_library", "//utils:go_default_library", + "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:go_default_library", ], ) diff --git a/cache/disk/disk.go b/cache/disk/disk.go index 40b49d380..0973d8d06 100644 --- a/cache/disk/disk.go +++ b/cache/disk/disk.go @@ -1,20 +1,23 @@ package disk import ( + "crypto/md5" + "crypto/sha1" "crypto/sha256" + "encoding/binary" "encoding/hex" + "errors" "fmt" + "hash" "io" "io/ioutil" "log" "net/http" "os" "path/filepath" - "sort" "sync" "github.com/buchgr/bazel-remote/cache" - "github.com/djherbis/atime" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -36,7 +39,7 @@ var ( // lruItem is the type of the values stored in SizedLRU to keep track of items. // It implements the SizedItem interface. type lruItem struct { - size int64 + size int64 // Blob + header size. committed bool } @@ -53,9 +56,16 @@ type DiskCache struct { lru SizedLRU } -type nameAndInfo struct { - name string // relative path +type importItem struct { + name string // Absolute path where this item should end up. info os.FileInfo + + // If non-empty, the absolute path there this item is. + oldName string + + // True if the file needs to have the checksum header + // added while migrating. + addChecksum bool } const sha256HashStrSize = sha256.Size * 2 // Two hex characters per byte. @@ -69,17 +79,17 @@ func New(dir string, maxSizeBytes int64, proxy cache.CacheProxy) *DiskCache { for _, c1 := range hexLetters { for _, c2 := range hexLetters { subDir := string(c1) + string(c2) - err := os.MkdirAll(filepath.Join(dir, cache.CAS.String(), subDir), os.ModePerm) - if err != nil { - log.Fatal(err) - } - err = os.MkdirAll(filepath.Join(dir, cache.AC.String(), subDir), os.ModePerm) - if err != nil { - log.Fatal(err) - } - err = os.MkdirAll(filepath.Join(dir, cache.RAW.String(), subDir), os.ModePerm) - if err != nil { - log.Fatal(err) + + casDir := filepath.Join(dir, cache.CAS.String(), subDir) + + acDir := filepath.Join(dir, cache.AC.String()+".v2", subDir) + rawDir := filepath.Join(dir, cache.RAW.String()+".v2", subDir) + + for _, dir := range []string{casDir, acDir, rawDir} { + err := os.MkdirAll(dir, os.ModePerm) + if err != nil { + log.Fatal(err) + } } } } @@ -151,80 +161,14 @@ func New(dir string, maxSizeBytes int64, proxy cache.CacheProxy) *DiskCache { lru: NewSizedLRU(maxSizeBytes, onEvict), } - err := c.migrateDirectories() - if err != nil { - log.Fatalf("Attempting to migrate the old directory structure to the new structure failed "+ - "with error: %v", err) - } - err = c.loadExistingFiles() + files, migrate, err := c.findCacheItems() if err != nil { - log.Fatalf("Loading of existing cache entries failed due to error: %v", err) + log.Fatalf("Error finding existing cache items: %v", err) } - return c -} - -func (c *DiskCache) migrateDirectories() error { - err := migrateDirectory(filepath.Join(c.dir, cache.AC.String())) - if err != nil { - return err + if migrate { + err = c.migrateFiles(files) } - err = migrateDirectory(filepath.Join(c.dir, cache.CAS.String())) - if err != nil { - return err - } - // Note: there are no old "RAW" directories (yet). - return nil -} - -func migrateDirectory(dir string) error { - log.Printf("Migrating files (if any) to new directory structure: %s\n", dir) - return filepath.Walk(dir, func(name string, info os.FileInfo, err error) error { - if err != nil { - log.Println("Error while walking directory:", err) - return err - } - - if info.IsDir() { - if name == dir { - return nil - } - return filepath.SkipDir - } - hash := filepath.Base(name) - newName := filepath.Join(filepath.Dir(name), hash[:2], hash) - return os.Rename(name, newName) - }) -} - -// loadExistingFiles lists all files in the cache directory, and adds them to the -// LRU index so that they can be served. Files are sorted by access time first, -// so that the eviction behavior is preserved across server restarts. -func (c *DiskCache) loadExistingFiles() error { - log.Printf("Loading existing files in %s.\n", c.dir) - - // Walk the directory tree - var files []nameAndInfo - err := filepath.Walk(c.dir, func(name string, info os.FileInfo, err error) error { - if err != nil { - log.Println("Error while walking directory:", err) - return err - } - - if !info.IsDir() { - files = append(files, nameAndInfo{name: name, info: info}) - } - return nil - }) - if err != nil { - return err - } - - log.Println("Sorting cache files by atime.") - // Sort in increasing order of atime - sort.Slice(files, func(i int, j int) bool { - return atime.Get(files[i].info).Before(atime.Get(files[j].info)) - }) log.Println("Building LRU index.") for _, f := range files { @@ -236,13 +180,14 @@ func (c *DiskCache) loadExistingFiles() error { if !ok { err = os.Remove(filepath.Join(c.dir, relPath)) if err != nil { - return err + log.Fatal(err) } } } log.Println("Finished loading disk cache files.") - return nil + + return c } // Put stores a stream of `expectedSize` bytes from `r` into the cache. @@ -275,7 +220,7 @@ func (c *DiskCache) Put(kind cache.EntryKind, hash string, expectedSize int64, r // Try to add the item to the LRU. newItem := &lruItem{ - size: expectedSize, + size: expectedSize + headerSize[pb.DigestFunction_SHA256], committed: false, } ok := c.lru.Add(key, newItem) @@ -304,9 +249,18 @@ func (c *DiskCache) Put(kind cache.EntryKind, hash string, expectedSize int64, r if shouldCommit && c.proxy != nil { // TODO: buffer in memory, avoid a filesystem round-trip? fr, err := os.Open(filePath) - if err == nil { - c.proxy.Put(kind, hash, expectedSize, fr) + if err != nil { + return + } + + if kind != cache.CAS { + _, err = skipHeader(fr) + if err != nil { + return + } } + + c.proxy.Put(kind, hash, expectedSize, fr) } }() @@ -336,7 +290,12 @@ func (c *DiskCache) Put(kind cache.EntryKind, hash string, expectedSize int64, r return fmt.Errorf( "hashsums don't match. Expected %s, found %s", key, actualHash) } - } else { + } else { // kind != cache.CAS + err = writeHeader(f, hash, expectedSize) + if err != nil { + return err + } + if bytesCopied, err = io.Copy(f, r); err != nil { return err } @@ -366,6 +325,194 @@ func (c *DiskCache) Put(kind cache.EntryKind, hash string, expectedSize int64, r return err } +func digestType(hash string) pb.DigestFunction_Value { + switch len(hash) { + case sha256.Size * 2: + return pb.DigestFunction_SHA256 + case sha1.Size * 2: + return pb.DigestFunction_SHA1 + case md5.Size * 2: + return pb.DigestFunction_MD5 + default: + return pb.DigestFunction_UNKNOWN + } +} + +func getHasher(dt pb.DigestFunction_Value) hash.Hash { + switch dt { + case pb.DigestFunction_SHA256: + return sha256.New() + case pb.DigestFunction_SHA1: + return sha1.New() + case pb.DigestFunction_MD5: + return md5.New() + default: + return nil + } +} + +// Write the checksum header to `w`, in a well-defined way. +// +// * Little-endian int32 (4 bytes), which represents the DigestFunction enum value. +// * Little-endian int64 (8 bytes), which represents the data size of the blob. +// * The hash bytes from the digest (length determined by the digest). +func writeHeader(w io.Writer, hash string, expectedSize int64) error { + if expectedSize < 0 { + return errors.New("Size must be non-negative") + } + + dt := digestType(hash) + if dt == pb.DigestFunction_UNKNOWN { + return fmt.Errorf("unsupported hash format: %s", hash) + } + + hashBytes, err := hex.DecodeString(hash) + if err != nil { + return err + } + + err = binary.Write(w, binary.LittleEndian, dt) + if err != nil { + return err + } + + err = binary.Write(w, binary.LittleEndian, expectedSize) + if err != nil { + return err + } + + n, err := w.Write(hashBytes) + if err != nil { + return err + } + if n != len(hashBytes) { + return fmt.Errorf("expected to write %d bytes for hash, wrote: %d", + len(hashBytes), n) + } + + return nil +} + +// Hash type (int32) + blob size ( +const minHashSize = md5.Size + commonHeaderSize + +const commonHeaderSize = 4 + 8 + +var headerSize = map[pb.DigestFunction_Value]int64{ + pb.DigestFunction_MD5: md5.Size + commonHeaderSize, + pb.DigestFunction_SHA1: sha1.Size + commonHeaderSize, + pb.DigestFunction_SHA256: sha256.Size + commonHeaderSize, +} + +var hashSize = map[pb.DigestFunction_Value]int{ + pb.DigestFunction_MD5: md5.Size, + pb.DigestFunction_SHA1: sha1.Size, + pb.DigestFunction_SHA256: sha256.Size, +} + +// Skip over the checksum header in the file, and return an error +// if an error occurred or if the file was too small. +func skipHeader(f *os.File) (int64, error) { + + var dt pb.DigestFunction_Value + err := binary.Read(f, binary.LittleEndian, &dt) + if err != nil { + return -1, err + } + + pos, ok := headerSize[dt] + if !ok { + return -1, fmt.Errorf("Unhandled digest function: %d", dt) + } + + fi, err := f.Stat() + if err != nil { + return -1, err + } + if fi.Size() < pos { + return -1, fmt.Errorf("file too short to contain a valid header: %d", + fi.Size()) + } + + _, err = f.Seek(pos, 0) + + return pos, err +} + +// Return the header hash type, hash value, blob size and an optional error. +// If the error is nil, then the data blob should be readable from r, and the +// read data should be compared with the hash and size from the header. +func readHeader(r io.Reader) (pb.DigestFunction_Value, string, int64, error) { + + var hashType pb.DigestFunction_Value + err := binary.Read(r, binary.LittleEndian, &hashType) + if err != nil { + return pb.DigestFunction_UNKNOWN, "", -1, err + } + + var expectedSize int64 + err = binary.Read(r, binary.LittleEndian, &expectedSize) + if err != nil { + return pb.DigestFunction_UNKNOWN, "", -1, err + } + + hSize, ok := hashSize[hashType] + if !ok { + return pb.DigestFunction_UNKNOWN, "", -1, + fmt.Errorf("Unsupported hash type: %d", hashType) + } + + hashBytes := make([]byte, hSize) + n, err := r.Read(hashBytes) + if err != nil { + return pb.DigestFunction_UNKNOWN, "", -1, err + } + + if n != hSize { + return pb.DigestFunction_UNKNOWN, "", -1, + fmt.Errorf("Failed to read all %d hash bytes", hSize) + } + + hashStr := hex.EncodeToString(hashBytes) + + return hashType, hashStr, expectedSize, nil +} + +func verifyBlob(r io.Reader) error { + dt, expectedHash, size, err := readHeader(r) + if err != nil { + return err + } + + if dt == pb.DigestFunction_UNKNOWN { + return errors.New("unrecognized hash") + } + + hasher := getHasher(dt) + if hasher == nil { + return fmt.Errorf("unhandled hash type: %s", dt.String()) + } + + n, err := io.Copy(hasher, r) + if err != nil { + return err + } + + if n != size { + return fmt.Errorf("expected size %d, found %d", size, n) + } + + hashBytes := hasher.Sum(nil) + foundHash := hex.EncodeToString(hashBytes) + + if foundHash != expectedHash { + return fmt.Errorf("expected hash %s, found %s", + expectedHash, foundHash) + } + + return nil // Success. +} + // Return two bools, `available` is true if the item is in the local // cache and ready to use. // @@ -422,10 +569,20 @@ func (c *DiskCache) Get(kind cache.EntryKind, hash string) (io.ReadCloser, int64 blobPath := cacheFilePath(kind, c.dir, hash) fileInfo, err := os.Stat(blobPath) if err == nil { - r, err := os.Open(blobPath) + f, err := os.Open(blobPath) if err == nil { + + sizeToReturn := fileInfo.Size() + if kind != cache.CAS { + dataHeaderSize, err := skipHeader(f) + if err != nil { + return nil, -1, err + } + sizeToReturn -= dataHeaderSize + } + cacheHits.Inc() - return r, fileInfo.Size(), nil + return f, sizeToReturn, nil } } @@ -488,6 +645,13 @@ func (c *DiskCache) Get(kind cache.EntryKind, hash string) (io.ReadCloser, int64 } tmpFileCreated = true + if kind != cache.CAS { + err = writeHeader(f, hash, foundSize) + if err != nil { + return nil, -1, err + } + } + written, err := io.Copy(f, r) if err != nil { return nil, -1, err @@ -573,17 +737,12 @@ func (c *DiskCache) Stats() (currentSize int64, numItems int) { return c.lru.CurrentSize(), c.lru.Len() } -func ensureDirExists(path string) { - if _, err := os.Stat(path); os.IsNotExist(err) { - err = os.MkdirAll(path, os.ModePerm) - if err != nil { - log.Fatal(err) - } +func cacheKey(kind cache.EntryKind, hash string) string { + if kind == cache.CAS { + return filepath.Join(kind.String(), hash[:2], hash) } -} -func cacheKey(kind cache.EntryKind, hash string) string { - return filepath.Join(kind.String(), hash[:2], hash) + return filepath.Join(kind.String()+".v2", hash[:2], hash) } func cacheFilePath(kind cache.EntryKind, cacheDir string, hash string) string { diff --git a/cache/disk/disk_test.go b/cache/disk/disk_test.go index 1cb14ab43..2eab5bb00 100644 --- a/cache/disk/disk_test.go +++ b/cache/disk/disk_test.go @@ -20,6 +20,8 @@ import ( "github.com/buchgr/bazel-remote/cache" cachehttp "github.com/buchgr/bazel-remote/cache/http" "github.com/buchgr/bazel-remote/utils" + + pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" ) func tempDir(t *testing.T) string { @@ -97,7 +99,7 @@ func TestCacheBasics(t *testing.T) { // Dig into the internals to make sure that the cache state has been // updated correctly - err = checkItems(testCache, int64(len(CONTENTS)), 1) + err = checkItems(testCache, int64(len(CONTENTS))+headerSize[pb.DigestFunction_SHA256], 1) if err != nil { t.Fatal(err) } @@ -117,24 +119,26 @@ func TestCacheBasics(t *testing.T) { func TestCacheEviction(t *testing.T) { cacheDir := tempDir(t) defer os.RemoveAll(cacheDir) - testCache := New(cacheDir, 10, nil) + + testCache := New(cacheDir, 450, nil) expectedSizesNumItems := []struct { - expSize int64 - expNum int + blobSize int + totalSize int64 + expNum int }{ - {0, 1}, // 0 - {1, 2}, // 0, 1 - {3, 3}, // 0, 1, 2 - {6, 4}, // 0, 1, 2, 3 - {10, 5}, // 0, 1, 2, 3, 4 - {9, 2}, // 4, 5 - {6, 1}, // 6 - {7, 1}, // 7 + {0, 44, 1}, // 0 + {10, 98, 2}, // 1, 0 + {30, 172, 3}, // 2, 1, 0 + {60, 276, 4}, // 3, 2, 1, 0 + {120, 440, 5}, // 4, 3, 2, 1, 0 + {90, 402, 3}, // 5, 4, 3 ; 574 evict 0 => 530, evict 1 => 476, evict 2 => 402 + {60, 402, 3}, // 6, 5, 4 ; 506 evict 3 => 402 + {70, 352, 3}, // 7, 6, 5 ; 516 evict 4 => 238 } for i, thisExp := range expectedSizesNumItems { - strReader := strings.NewReader(strings.Repeat("a", i)) + strReader := strings.NewReader(strings.Repeat("a", thisExp.blobSize)) // Suitably-sized, unique key for these testcases: key := fmt.Sprintf("%0*d", sha256HashStrSize, i) @@ -143,12 +147,12 @@ func TestCacheEviction(t *testing.T) { sha256.Size*2, len(key), key) } - err := testCache.Put(cache.AC, key, int64(i), strReader) + err := testCache.Put(cache.AC, key, int64(thisExp.blobSize), strReader) if err != nil { t.Fatal(err) } - err = checkItems(testCache, thisExp.expSize, thisExp.expNum) + err = checkItems(testCache, thisExp.totalSize, thisExp.expNum) if err != nil { t.Fatal(err) } @@ -216,7 +220,7 @@ func hashStr(content string) string { func TestOverwrite(t *testing.T) { cacheDir := tempDir(t) defer os.RemoveAll(cacheDir) - testCache := New(cacheDir, 10, nil) + testCache := New(cacheDir, 10+headerSize[pb.DigestFunction_SHA256], nil) var err error err = putGetCompare(cache.CAS, hashStr("hello"), "hello", testCache) @@ -247,32 +251,73 @@ func TestOverwrite(t *testing.T) { } } +func ensureDirExists(t *testing.T, path string) { + if _, err := os.Stat(path); os.IsNotExist(err) { + err = os.MkdirAll(path, os.ModePerm) + if err != nil { + t.Fatal(err) + } + } +} + func TestCacheExistingFiles(t *testing.T) { cacheDir := tempDir(t) defer os.RemoveAll(cacheDir) - ensureDirExists(filepath.Join(cacheDir, "cas", "f5")) - ensureDirExists(filepath.Join(cacheDir, "cas", "fd")) - ensureDirExists(filepath.Join(cacheDir, "ac", "73")) - ensureDirExists(filepath.Join(cacheDir, "raw", "73")) + blobs := make([]struct { + data []byte + sha256hash string + file string + }, 4, 4) + blobs[0].data, blobs[0].sha256hash = testutils.RandomDataAndHash(64) + blobs[0].file = filepath.Join("cas", blobs[0].sha256hash[:2], blobs[0].sha256hash) + + blobs[1].data = make([]byte, len(blobs[0].data)) + copy(blobs[1].data, blobs[0].data) + blobs[1].data[0]++ + hb := sha256.Sum256(blobs[1].data) + blobs[1].sha256hash = hex.EncodeToString(hb[:]) + blobs[1].file = filepath.Join("cas", blobs[1].sha256hash[:2], blobs[1].sha256hash) + + blobs[2].data = make([]byte, len(blobs[0].data)) + copy(blobs[2].data, blobs[0].data) + blobs[2].data[0]++ + hb = sha256.Sum256(blobs[2].data) + blobs[2].sha256hash = hex.EncodeToString(hb[:]) + blobs[2].file = filepath.Join("ac.v2", blobs[2].sha256hash[:2], blobs[2].sha256hash) + + blobs[3].data = make([]byte, len(blobs[0].data)) + copy(blobs[3].data, blobs[2].data) + blobs[3].sha256hash = blobs[2].sha256hash + blobs[3].file = filepath.Join("raw.v2", blobs[3].sha256hash[:2], blobs[3].sha256hash) + + for _, it := range blobs { + dn := filepath.Join(cacheDir, filepath.Dir(it.file)) + ensureDirExists(t, dn) + fn := filepath.Join(cacheDir, it.file) + f, err := os.Create(fn) + if err != nil { + t.Fatal(err) + } - items := []string{ - "cas/f5/f53b46209596d170f7659a414c9ff9f6b545cf77ffd6e1cbe9bcc57e1afacfbd", - "cas/fd/fdce205a735f407ae2910426611893d99ed985e3d9a341820283ea0b7d046ee3", - "ac/73/733e21b37cef883579a88183eed0d00cdeea0b59e1bcd77db6957f881c3a6b54", - "raw/73/733e21b37cef883579a88183eed0d00cdeea0b59e1bcd77db6957f881c3a6b54", - } + err = writeHeader(f, it.sha256hash, int64(len(it.data))) + if err != nil { + t.Fatal(err) + } - for _, it := range items { - err := ioutil.WriteFile(filepath.Join(cacheDir, it), []byte(CONTENTS), os.ModePerm) + n, err := f.Write(it.data) if err != nil { t.Fatal(err) } + if n != len(it.data) { + t.Fatalf("short write: %d, expected: %d", n, len(it.data)) + } + // Wait a bit to account for atime granularity time.Sleep(10 * time.Millisecond) } - const expectedSize = 4 * int64(len(CONTENTS)) + expectedSize := 4 * (int64(len(blobs[0].data)) + headerSize[pb.DigestFunction_SHA256]) testCache := New(cacheDir, expectedSize, nil) err := checkItems(testCache, expectedSize, 4) @@ -281,18 +326,21 @@ func TestCacheExistingFiles(t *testing.T) { } // Adding a new file should evict items[0] (the oldest) - err = testCache.Put(cache.CAS, CONTENTS_HASH, int64(len(CONTENTS)), strings.NewReader(CONTENTS)) + err = testCache.Put(cache.CAS, CONTENTS_HASH, int64(len(CONTENTS)), + strings.NewReader(CONTENTS)) if err != nil { t.Fatal(err) } + expectedSize = expectedSize - int64(len(blobs[0].data)) + int64(len(CONTENTS)) + err = checkItems(testCache, expectedSize, 4) if err != nil { t.Fatal(err) } found, _ := testCache.Contains(cache.CAS, "f53b46209596d170f7659a414c9ff9f6b545cf77ffd6e1cbe9bcc57e1afacfbd") if found { - t.Fatalf("%s should have been evicted", items[0]) + t.Fatalf("%s should have been evicted", blobs[0].sha256hash) } } @@ -344,23 +392,26 @@ func TestMigrateFromOldDirectoryStructure(t *testing.T) { cacheDir := testutils.TempDir(t) defer os.RemoveAll(cacheDir) - acHash, err := testutils.CreateRandomFile(cacheDir+"/ac/", 512) + acSize := int64(512) + acHash, err := testutils.CreateRandomFile(cacheDir+"/ac/", acSize) if err != nil { t.Fatal(err) } - casHash1, err := testutils.CreateRandomFile(cacheDir+"/cas/", 1024) + + casSize := int64(1024) + casHash1, err := testutils.CreateRandomFile(cacheDir+"/cas/", casSize) if err != nil { t.Fatal(err) } - casHash2, err := testutils.CreateRandomFile(cacheDir+"/cas/", 1024) + casHash2, err := testutils.CreateRandomFile(cacheDir+"/cas/", casSize) if err != nil { t.Fatal(err) } - testCache := New(cacheDir, 2560, nil) - _, numItems := testCache.Stats() - if numItems != 3 { - t.Fatalf("Expected test cache size 3 but was %d", numItems) - } + + sha256HeaderSize := headerSize[pb.DigestFunction_SHA256] + + cacheSize := acSize + (casSize+sha256HeaderSize)*2 + testCache := New(cacheDir, cacheSize, nil) var found bool found, _ = testCache.Contains(cache.AC, acHash) @@ -377,6 +428,11 @@ func TestMigrateFromOldDirectoryStructure(t *testing.T) { if !found { t.Fatalf("Expected cache to contain CAS entry '%s'", casHash2) } + + _, numItems := testCache.Stats() + if numItems != 3 { + t.Fatalf("Expected test cache size 3 but was %d", numItems) + } } func TestLoadExistingEntries(t *testing.T) { @@ -387,7 +443,7 @@ func TestLoadExistingEntries(t *testing.T) { numBlobs := int64(3) blobSize := int64(1024) - acHash, err := testutils.CreateCacheFile(cacheDir+"/ac/", blobSize) + acHash, err := testutils.CreateCacheFile(cacheDir+"/ac.v2/", blobSize) if err != nil { t.Fatal(err) } @@ -395,7 +451,7 @@ func TestLoadExistingEntries(t *testing.T) { if err != nil { t.Fatal(err) } - rawHash, err := testutils.CreateCacheFile(cacheDir+"/raw/", blobSize) + rawHash, err := testutils.CreateCacheFile(cacheDir+"/raw.v2/", blobSize) if err != nil { t.Fatal(err) } @@ -429,12 +485,12 @@ func TestDistinctKeyspaces(t *testing.T) { cacheDir := testutils.TempDir(t) defer os.RemoveAll(cacheDir) - blobSize := 1024 - cacheSize := int64(blobSize * 3) + blobSize := int64(1024) + cacheSize := (blobSize + headerSize[pb.DigestFunction_SHA256]) * 3 testCache := New(cacheDir, cacheSize, nil) - blob, casHash := testutils.RandomDataAndHash(1024) + blob, casHash := testutils.RandomDataAndHash(blobSize) // Add the same blob with the same key, to each of the three // keyspaces, and verify that we have exactly three items in @@ -634,3 +690,105 @@ func TestHttpProxyBackend(t *testing.T) { t.Fatalf("Expected '%v' but received '%v", retrievedData, blob) } } + +func TestChecksumHeader(t *testing.T) { + + blob := []byte{0, 1, 2, 3, 4, 5, 6, 7} + + testCases := []struct { + kind pb.DigestFunction_Value + hash string + size int64 + success bool // True if the {hash,size} are valid. + }{ + {pb.DigestFunction_SHA256, + "0000000011111111222222223333333344444444555555556666666677777777", + 42, true}, + {pb.DigestFunction_SHA256, + "0000000011111111222222223333333344444444555555556666666677777777", + 0, true}, + + {pb.DigestFunction_UNKNOWN, + "00000000111111112222222233333333444444445555555566666666777777778", + 42, false}, // invalid hex string (odd length) + {pb.DigestFunction_UNKNOWN, + "000000001111111122222222333333334444444455555555666666667777777788", + 42, false}, // hash too long + {pb.DigestFunction_UNKNOWN, + "000000001111111122222222333333334444444455555555666666667777777", + 42, false}, // invalid hex string (odd length) + {pb.DigestFunction_UNKNOWN, + "00000000111111112222222233333333444444445555555566666666777777", + 42, false}, // hash too short + {pb.DigestFunction_UNKNOWN, + "", + 42, false}, + {pb.DigestFunction_UNKNOWN, + "0000000011111111222222223333333344444444555555556666666677777777", + -1, false}, // invalid (negative) size + } + + // Note that these tests just confirm that we can read/write a valid + // header and a blob. They dot not confirm that the header describes + // the blob. + + for _, tc := range testCases { + var buf bytes.Buffer + + err := writeHeader(&buf, tc.hash, tc.size) + if !tc.success { + if err == nil { + t.Error("Expected testcase to fail", tc.hash, tc.size) + } + + continue + } + if err != nil { + t.Fatal("Expected testscase to succeed, got:", err) + } + + // Check the header size manually, since it's not exposed by + // the readHeader function. + if int64(buf.Len()) != headerSize[tc.kind] { + t.Fatalf("Expected data header of size %d bytes, got %d. %s %d %v %s", + headerSize[tc.kind], buf.Len(), tc.hash, tc.size, tc.success, err) + } + + // Write the blob. + n, err := buf.Write(blob) + if err != nil { + t.Fatal(err) + } + if n != len(blob) { + t.Fatalf("expected to write %d bytes, instead wrote %d bytes", + len(blob), n) + } + + dt, readHash, readSize, err := readHeader(&buf) + if err != nil { + t.Fatal(err) + } + if dt == pb.DigestFunction_UNKNOWN { + t.Fatal("Unknown digest type") + } + + if readHash != tc.hash { + t.Fatalf("Read a different hash '%s' than was written '%s'", + readHash, tc.hash) + } + + if readSize != tc.size { + t.Fatalf("Read a different size %d than was written %d", + readSize, tc.size) + } + + readBlob, err := ioutil.ReadAll(&buf) + if err != nil { + t.Fatal(err) + } + + if bytes.Compare(blob, readBlob) != 0 { + t.Fatal("Read a different blob than was written") + } + } +} diff --git a/cache/disk/load.go b/cache/disk/load.go new file mode 100644 index 000000000..d4a893899 --- /dev/null +++ b/cache/disk/load.go @@ -0,0 +1,364 @@ +package disk + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "io/ioutil" + "log" + "os" + "path/filepath" + "regexp" + "runtime" + "sort" + "sync" + + "github.com/djherbis/atime" + + "github.com/buchgr/bazel-remote/cache" + + pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" +) + +type scanDir struct { + name string + dest string + version int + kind cache.EntryKind +} + +// Return a list of importItems sorted by atime, and a boolean that is +// true if the caller should migrate items, in reverse LRU order. +func (c *DiskCache) findCacheItems() ([]importItem, bool, error) { + files := []importItem{} + + var mu sync.Mutex // Protects the migrate variable below: + migrate := false + + // Workers submit discovered files here. + filesChan := make(chan []importItem) + + // Workers receive a dir to scan here. + workChan := make(chan scanDir) + + // Workers can report errors here: + errChan := make(chan error) + + numWorkers := runtime.NumCPU() // TODO: consider tweaking this. + + hashKeyRegex := regexp.MustCompile("^[a-f0-9]{64}$") + + // Spawn some worker goroutines to read the cache concurrently. + var wg sync.WaitGroup + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func(i int) { + needMigration := false + defer func() { + if needMigration { + mu.Lock() + migrate = true + mu.Unlock() + } + wg.Done() + }() + + for scanDir := range workChan { + _, err := os.Stat(scanDir.name) + if os.IsNotExist(err) { + continue + } else if err != nil { + errChan <- err + return + } + + listing, err := ioutil.ReadDir(scanDir.name) + if err != nil { + errChan <- err + return + } + + addChecksum := scanDir.version < 2 && (scanDir.kind != cache.CAS) + + toSend := make([]importItem, 0, len(listing)) + for e := range listing { + if listing[e].IsDir() { + continue + } + + if !hashKeyRegex.MatchString(listing[e].Name()) { + log.Println("Unexpected file in cache:", + filepath.Join(scanDir.name, listing[e].Name())) + continue + } + + basename := listing[e].Name() + entry := importItem{ + name: filepath.Join(scanDir.name, basename), + info: listing[e], + addChecksum: addChecksum, + } + + if scanDir.version < 2 { + entry.oldName = entry.name + if scanDir.kind == cache.CAS { + entry.name = filepath.Join(c.dir, + scanDir.kind.String(), + basename[:2], + basename) + } else { + entry.name = filepath.Join(c.dir, + scanDir.kind.String()+".v2", + basename[:2], + basename) + } + + needMigration = true + } + + toSend = append(toSend, entry) + } + + if len(toSend) > 0 { + filesChan <- toSend + } + } + }(i) + } + + go func() { + wg.Wait() + // All workers have now finished. + close(filesChan) + }() + + // Provide the workers with directories to scan. + + workChan <- scanDir{ + name: filepath.Join(c.dir, "ac"), + version: 0, + kind: cache.AC, + } + workChan <- scanDir{ + name: filepath.Join(c.dir, "cas"), + version: 0, + kind: cache.CAS, + } + + hexLetters := []byte("0123456789abcdef") + for _, c1 := range hexLetters { + for _, c2 := range hexLetters { + subDir := string(c1) + string(c2) + + workChan <- scanDir{ + name: filepath.Join(c.dir, "cas", subDir), + version: 2, // v1 and v2 cas dirs are the same. + kind: cache.CAS, + } + + workChan <- scanDir{ + name: filepath.Join(c.dir, "ac", subDir), + version: 1, + kind: cache.AC, + } + workChan <- scanDir{ + name: filepath.Join(c.dir, "ac.v2", subDir), + version: 2, + kind: cache.AC, + } + + workChan <- scanDir{ + name: filepath.Join(c.dir, "raw", subDir), + version: 1, + kind: cache.RAW, + } + workChan <- scanDir{ + name: filepath.Join(c.dir, "raw.v2", subDir), + version: 2, + kind: cache.RAW, + } + } + } + + // No more dirs for the workers to process. + close(workChan) + +OuterLoop: + for { + select { + case err := <-errChan: + return nil, false, err + case f, found := <-filesChan: + if !found { + break OuterLoop + } + files = append(files, f...) + } + } + + log.Println("Sorting cache files by atime.") + sort.Slice(files, func(i int, j int) bool { + return atime.Get(files[i].info).Before(atime.Get(files[j].info)) + }) + + return files, migrate, nil +} + +func updateAccesstime(file string) { + f, err := os.Open(file) + if err != nil { + return + } + var buf [1]byte + f.Read(buf[:]) + f.Close() +} + +func (c *DiskCache) migrateFiles(files []importItem) error { + log.Println("Migrating old cache items to new directory structure.") + + var err error + for _, i := range files { + if i.oldName == "" { + updateAccesstime(filepath.Join(c.dir, i.name)) + continue + } + + if !i.addChecksum { + err = os.Rename(i.oldName, i.name) + if err != nil { + return err + } + + continue + } + + err = moveAndChecksum(i.oldName, i.name) + if err != nil { + return err + } + } + + // Try to remove old (hopefully) empty dirs. + + hexLetters := []byte("0123456789abcdef") + for _, c1 := range hexLetters { + for _, c2 := range hexLetters { + subDir := string(c1) + string(c2) + + acV1subDir := filepath.Join(c.dir, "ac", subDir) + err := os.Remove(acV1subDir) + if err != nil && !os.IsNotExist(err) { + log.Printf("Warning: failed to remove old format directory \"%s\": %v", + acV1subDir, err) + } + rawV1subDir := filepath.Join(c.dir, "raw", subDir) + err = os.Remove(rawV1subDir) + if err != nil && !os.IsNotExist(err) { + log.Printf("Warning: failed to remove old format directory \"%s\": %v", + acV1subDir, err) + } + } + } + + acV1dir := filepath.Join(c.dir, "ac") + err = os.Remove(acV1dir) + if err != nil && !os.IsNotExist(err) { + log.Printf("Warning: failed to remove old format directory \"%s\": %v", + acV1dir, err) + } + rawV1dir := filepath.Join(c.dir, "raw") + err = os.Remove(rawV1dir) + if err != nil && !os.IsNotExist(err) { + log.Printf("Warning: failed to remove old format directory \"%s\": %v", + rawV1dir, err) + } + + return nil +} + +// Replace a raw file with a "v2" style file with the data integrity +// header. `old` and `new` must be different files (OK since we store +// v2 style files in different directories. +func moveAndChecksum(old string, new string) error { + + key := filepath.Base(old) + dt := digestType(key) + if dt == pb.DigestFunction_UNKNOWN { + return fmt.Errorf("Unsupported digest: %s", old) + } + + headerSize, ok := headerSize[dt] + if !ok { + return fmt.Errorf("Unknown header size for digest: %d", dt) + } + + success := false + openOld := false + openNew := false + var in *os.File + var out *os.File + + defer func() { + if openOld { + in.Close() + } + + if openNew { + out.Close() + + if !success { + os.Remove(new) + } + } + + if success { + os.Remove(old) + } + }() + + in, err := os.Open(old) + if err != nil { + return err + } + openOld = true + + out, err = os.Create(new) + if err != nil { + return err + } + openNew = true + + // Make space for the header. + _, err = out.Seek(headerSize, 0) + if err != nil { + return err + } + + hasher := sha256.New() + mw := io.MultiWriter(out, hasher) + + sizeBytes, err := io.Copy(mw, in) + if err != nil { + return err + } + + // Go back and fill in the header. + _, err = out.Seek(0, 0) + if err != nil { + return err + } + + hashBytes := hasher.Sum(nil) + hashStr := hex.EncodeToString(hashBytes) + + err = writeHeader(out, hashStr, sizeBytes) + if err != nil { + return err + } + + success = true + + return nil +} diff --git a/server/http_test.go b/server/http_test.go index 67772c6d6..932213057 100644 --- a/server/http_test.go +++ b/server/http_test.go @@ -84,7 +84,7 @@ func TestUploadFilesConcurrently(t *testing.T) { requests[i] = r } - c := disk.New(cacheDir, 1000*1024, nil) + c := disk.New(cacheDir, 1000*(1024+32+4+8), nil) h := NewHTTPCache(c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, "") handler := http.HandlerFunc(h.CacheHandler) @@ -140,7 +140,9 @@ func TestUploadSameFileConcurrently(t *testing.T) { data, hash := testutils.RandomDataAndHash(1024) - c := disk.New(cacheDir, 1024, nil) + maxHeaderSize := int64(8 + 4 + sha256.Size) + + c := disk.New(cacheDir, 1024+maxHeaderSize, nil) h := NewHTTPCache(c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, "") handler := http.HandlerFunc(h.CacheHandler) @@ -239,11 +241,21 @@ func TestUploadEmptyActionResult(t *testing.T) { "got ", status) } - cacheFile := filepath.Join(cacheDir, "ac", hash[:2], hash) + cacheFile := filepath.Join(cacheDir, "ac.v2", hash[:2], hash) cachedData, err := ioutil.ReadFile(cacheFile) if err != nil { t.Fatal(err) } + + // FIXME: test sha1, md5 too + headerSize := sha256.Size + 8 + 4 + + if len(cachedData) < headerSize { + t.Fatalf("data too short to contain header: %d, must be at least: %d", + len(cachedData), headerSize) + } + cachedData = cachedData[headerSize:] + if len(cachedData) == 0 { t.Fatal("expected non-zero length ActionResult to be cached") }