diff --git a/cache/disk/BUILD.bazel b/cache/disk/BUILD.bazel index 36a81ffde..8e49da4e9 100644 --- a/cache/disk/BUILD.bazel +++ b/cache/disk/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "go_default_library", srcs = [ "disk.go", + "load.go", "lru.go", ], importpath = "github.com/buchgr/bazel-remote/cache/disk", @@ -29,5 +30,6 @@ go_test( "//cache:go_default_library", "//cache/http:go_default_library", "//utils:go_default_library", + "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:go_default_library", ], ) diff --git a/cache/disk/disk.go b/cache/disk/disk.go index 40b49d380..0973d8d06 100644 --- a/cache/disk/disk.go +++ b/cache/disk/disk.go @@ -1,20 +1,23 @@ package disk import ( + "crypto/md5" + "crypto/sha1" "crypto/sha256" + "encoding/binary" "encoding/hex" + "errors" "fmt" + "hash" "io" "io/ioutil" "log" "net/http" "os" "path/filepath" - "sort" "sync" "github.com/buchgr/bazel-remote/cache" - "github.com/djherbis/atime" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -36,7 +39,7 @@ var ( // lruItem is the type of the values stored in SizedLRU to keep track of items. // It implements the SizedItem interface. type lruItem struct { - size int64 + size int64 // Blob + header size. committed bool } @@ -53,9 +56,16 @@ type DiskCache struct { lru SizedLRU } -type nameAndInfo struct { - name string // relative path +type importItem struct { + name string // Absolute path where this item should end up. info os.FileInfo + + // If non-empty, the absolute path there this item is. + oldName string + + // True if the file needs to have the checksum header + // added while migrating. + addChecksum bool } const sha256HashStrSize = sha256.Size * 2 // Two hex characters per byte. @@ -69,17 +79,17 @@ func New(dir string, maxSizeBytes int64, proxy cache.CacheProxy) *DiskCache { for _, c1 := range hexLetters { for _, c2 := range hexLetters { subDir := string(c1) + string(c2) - err := os.MkdirAll(filepath.Join(dir, cache.CAS.String(), subDir), os.ModePerm) - if err != nil { - log.Fatal(err) - } - err = os.MkdirAll(filepath.Join(dir, cache.AC.String(), subDir), os.ModePerm) - if err != nil { - log.Fatal(err) - } - err = os.MkdirAll(filepath.Join(dir, cache.RAW.String(), subDir), os.ModePerm) - if err != nil { - log.Fatal(err) + + casDir := filepath.Join(dir, cache.CAS.String(), subDir) + + acDir := filepath.Join(dir, cache.AC.String()+".v2", subDir) + rawDir := filepath.Join(dir, cache.RAW.String()+".v2", subDir) + + for _, dir := range []string{casDir, acDir, rawDir} { + err := os.MkdirAll(dir, os.ModePerm) + if err != nil { + log.Fatal(err) + } } } } @@ -151,80 +161,14 @@ func New(dir string, maxSizeBytes int64, proxy cache.CacheProxy) *DiskCache { lru: NewSizedLRU(maxSizeBytes, onEvict), } - err := c.migrateDirectories() - if err != nil { - log.Fatalf("Attempting to migrate the old directory structure to the new structure failed "+ - "with error: %v", err) - } - err = c.loadExistingFiles() + files, migrate, err := c.findCacheItems() if err != nil { - log.Fatalf("Loading of existing cache entries failed due to error: %v", err) + log.Fatalf("Error finding existing cache items: %v", err) } - return c -} - -func (c *DiskCache) migrateDirectories() error { - err := migrateDirectory(filepath.Join(c.dir, cache.AC.String())) - if err != nil { - return err + if migrate { + err = c.migrateFiles(files) } - err = migrateDirectory(filepath.Join(c.dir, cache.CAS.String())) - if err != nil { - return err - } - // Note: there are no old "RAW" directories (yet). - return nil -} - -func migrateDirectory(dir string) error { - log.Printf("Migrating files (if any) to new directory structure: %s\n", dir) - return filepath.Walk(dir, func(name string, info os.FileInfo, err error) error { - if err != nil { - log.Println("Error while walking directory:", err) - return err - } - - if info.IsDir() { - if name == dir { - return nil - } - return filepath.SkipDir - } - hash := filepath.Base(name) - newName := filepath.Join(filepath.Dir(name), hash[:2], hash) - return os.Rename(name, newName) - }) -} - -// loadExistingFiles lists all files in the cache directory, and adds them to the -// LRU index so that they can be served. Files are sorted by access time first, -// so that the eviction behavior is preserved across server restarts. -func (c *DiskCache) loadExistingFiles() error { - log.Printf("Loading existing files in %s.\n", c.dir) - - // Walk the directory tree - var files []nameAndInfo - err := filepath.Walk(c.dir, func(name string, info os.FileInfo, err error) error { - if err != nil { - log.Println("Error while walking directory:", err) - return err - } - - if !info.IsDir() { - files = append(files, nameAndInfo{name: name, info: info}) - } - return nil - }) - if err != nil { - return err - } - - log.Println("Sorting cache files by atime.") - // Sort in increasing order of atime - sort.Slice(files, func(i int, j int) bool { - return atime.Get(files[i].info).Before(atime.Get(files[j].info)) - }) log.Println("Building LRU index.") for _, f := range files { @@ -236,13 +180,14 @@ func (c *DiskCache) loadExistingFiles() error { if !ok { err = os.Remove(filepath.Join(c.dir, relPath)) if err != nil { - return err + log.Fatal(err) } } } log.Println("Finished loading disk cache files.") - return nil + + return c } // Put stores a stream of `expectedSize` bytes from `r` into the cache. @@ -275,7 +220,7 @@ func (c *DiskCache) Put(kind cache.EntryKind, hash string, expectedSize int64, r // Try to add the item to the LRU. newItem := &lruItem{ - size: expectedSize, + size: expectedSize + headerSize[pb.DigestFunction_SHA256], committed: false, } ok := c.lru.Add(key, newItem) @@ -304,9 +249,18 @@ func (c *DiskCache) Put(kind cache.EntryKind, hash string, expectedSize int64, r if shouldCommit && c.proxy != nil { // TODO: buffer in memory, avoid a filesystem round-trip? fr, err := os.Open(filePath) - if err == nil { - c.proxy.Put(kind, hash, expectedSize, fr) + if err != nil { + return + } + + if kind != cache.CAS { + _, err = skipHeader(fr) + if err != nil { + return + } } + + c.proxy.Put(kind, hash, expectedSize, fr) } }() @@ -336,7 +290,12 @@ func (c *DiskCache) Put(kind cache.EntryKind, hash string, expectedSize int64, r return fmt.Errorf( "hashsums don't match. Expected %s, found %s", key, actualHash) } - } else { + } else { // kind != cache.CAS + err = writeHeader(f, hash, expectedSize) + if err != nil { + return err + } + if bytesCopied, err = io.Copy(f, r); err != nil { return err } @@ -366,6 +325,194 @@ func (c *DiskCache) Put(kind cache.EntryKind, hash string, expectedSize int64, r return err } +func digestType(hash string) pb.DigestFunction_Value { + switch len(hash) { + case sha256.Size * 2: + return pb.DigestFunction_SHA256 + case sha1.Size * 2: + return pb.DigestFunction_SHA1 + case md5.Size * 2: + return pb.DigestFunction_MD5 + default: + return pb.DigestFunction_UNKNOWN + } +} + +func getHasher(dt pb.DigestFunction_Value) hash.Hash { + switch dt { + case pb.DigestFunction_SHA256: + return sha256.New() + case pb.DigestFunction_SHA1: + return sha1.New() + case pb.DigestFunction_MD5: + return md5.New() + default: + return nil + } +} + +// Write the checksum header to `w`, in a well-defined way. +// +// * Little-endian int32 (4 bytes), which represents the DigestFunction enum value. +// * Little-endian int64 (8 bytes), which represents the data size of the blob. +// * The hash bytes from the digest (length determined by the digest). +func writeHeader(w io.Writer, hash string, expectedSize int64) error { + if expectedSize < 0 { + return errors.New("Size must be non-negative") + } + + dt := digestType(hash) + if dt == pb.DigestFunction_UNKNOWN { + return fmt.Errorf("unsupported hash format: %s", hash) + } + + hashBytes, err := hex.DecodeString(hash) + if err != nil { + return err + } + + err = binary.Write(w, binary.LittleEndian, dt) + if err != nil { + return err + } + + err = binary.Write(w, binary.LittleEndian, expectedSize) + if err != nil { + return err + } + + n, err := w.Write(hashBytes) + if err != nil { + return err + } + if n != len(hashBytes) { + return fmt.Errorf("expected to write %d bytes for hash, wrote: %d", + len(hashBytes), n) + } + + return nil +} + +// Hash type (int32) + blob size ( +const minHashSize = md5.Size + commonHeaderSize + +const commonHeaderSize = 4 + 8 + +var headerSize = map[pb.DigestFunction_Value]int64{ + pb.DigestFunction_MD5: md5.Size + commonHeaderSize, + pb.DigestFunction_SHA1: sha1.Size + commonHeaderSize, + pb.DigestFunction_SHA256: sha256.Size + commonHeaderSize, +} + +var hashSize = map[pb.DigestFunction_Value]int{ + pb.DigestFunction_MD5: md5.Size, + pb.DigestFunction_SHA1: sha1.Size, + pb.DigestFunction_SHA256: sha256.Size, +} + +// Skip over the checksum header in the file, and return an error +// if an error occurred or if the file was too small. +func skipHeader(f *os.File) (int64, error) { + + var dt pb.DigestFunction_Value + err := binary.Read(f, binary.LittleEndian, &dt) + if err != nil { + return -1, err + } + + pos, ok := headerSize[dt] + if !ok { + return -1, fmt.Errorf("Unhandled digest function: %d", dt) + } + + fi, err := f.Stat() + if err != nil { + return -1, err + } + if fi.Size() < pos { + return -1, fmt.Errorf("file too short to contain a valid header: %d", + fi.Size()) + } + + _, err = f.Seek(pos, 0) + + return pos, err +} + +// Return the header hash type, hash value, blob size and an optional error. +// If the error is nil, then the data blob should be readable from r, and the +// read data should be compared with the hash and size from the header. +func readHeader(r io.Reader) (pb.DigestFunction_Value, string, int64, error) { + + var hashType pb.DigestFunction_Value + err := binary.Read(r, binary.LittleEndian, &hashType) + if err != nil { + return pb.DigestFunction_UNKNOWN, "", -1, err + } + + var expectedSize int64 + err = binary.Read(r, binary.LittleEndian, &expectedSize) + if err != nil { + return pb.DigestFunction_UNKNOWN, "", -1, err + } + + hSize, ok := hashSize[hashType] + if !ok { + return pb.DigestFunction_UNKNOWN, "", -1, + fmt.Errorf("Unsupported hash type: %d", hashType) + } + + hashBytes := make([]byte, hSize) + n, err := r.Read(hashBytes) + if err != nil { + return pb.DigestFunction_UNKNOWN, "", -1, err + } + + if n != hSize { + return pb.DigestFunction_UNKNOWN, "", -1, + fmt.Errorf("Failed to read all %d hash bytes", hSize) + } + + hashStr := hex.EncodeToString(hashBytes) + + return hashType, hashStr, expectedSize, nil +} + +func verifyBlob(r io.Reader) error { + dt, expectedHash, size, err := readHeader(r) + if err != nil { + return err + } + + if dt == pb.DigestFunction_UNKNOWN { + return errors.New("unrecognized hash") + } + + hasher := getHasher(dt) + if hasher == nil { + return fmt.Errorf("unhandled hash type: %s", dt.String()) + } + + n, err := io.Copy(hasher, r) + if err != nil { + return err + } + + if n != size { + return fmt.Errorf("expected size %d, found %d", size, n) + } + + hashBytes := hasher.Sum(nil) + foundHash := hex.EncodeToString(hashBytes) + + if foundHash != expectedHash { + return fmt.Errorf("expected hash %s, found %s", + expectedHash, foundHash) + } + + return nil // Success. +} + // Return two bools, `available` is true if the item is in the local // cache and ready to use. // @@ -422,10 +569,20 @@ func (c *DiskCache) Get(kind cache.EntryKind, hash string) (io.ReadCloser, int64 blobPath := cacheFilePath(kind, c.dir, hash) fileInfo, err := os.Stat(blobPath) if err == nil { - r, err := os.Open(blobPath) + f, err := os.Open(blobPath) if err == nil { + + sizeToReturn := fileInfo.Size() + if kind != cache.CAS { + dataHeaderSize, err := skipHeader(f) + if err != nil { + return nil, -1, err + } + sizeToReturn -= dataHeaderSize + } + cacheHits.Inc() - return r, fileInfo.Size(), nil + return f, sizeToReturn, nil } } @@ -488,6 +645,13 @@ func (c *DiskCache) Get(kind cache.EntryKind, hash string) (io.ReadCloser, int64 } tmpFileCreated = true + if kind != cache.CAS { + err = writeHeader(f, hash, foundSize) + if err != nil { + return nil, -1, err + } + } + written, err := io.Copy(f, r) if err != nil { return nil, -1, err @@ -573,17 +737,12 @@ func (c *DiskCache) Stats() (currentSize int64, numItems int) { return c.lru.CurrentSize(), c.lru.Len() } -func ensureDirExists(path string) { - if _, err := os.Stat(path); os.IsNotExist(err) { - err = os.MkdirAll(path, os.ModePerm) - if err != nil { - log.Fatal(err) - } +func cacheKey(kind cache.EntryKind, hash string) string { + if kind == cache.CAS { + return filepath.Join(kind.String(), hash[:2], hash) } -} -func cacheKey(kind cache.EntryKind, hash string) string { - return filepath.Join(kind.String(), hash[:2], hash) + return filepath.Join(kind.String()+".v2", hash[:2], hash) } func cacheFilePath(kind cache.EntryKind, cacheDir string, hash string) string { diff --git a/cache/disk/disk_test.go b/cache/disk/disk_test.go index 1cb14ab43..2eab5bb00 100644 --- a/cache/disk/disk_test.go +++ b/cache/disk/disk_test.go @@ -20,6 +20,8 @@ import ( "github.com/buchgr/bazel-remote/cache" cachehttp "github.com/buchgr/bazel-remote/cache/http" "github.com/buchgr/bazel-remote/utils" + + pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" ) func tempDir(t *testing.T) string { @@ -97,7 +99,7 @@ func TestCacheBasics(t *testing.T) { // Dig into the internals to make sure that the cache state has been // updated correctly - err = checkItems(testCache, int64(len(CONTENTS)), 1) + err = checkItems(testCache, int64(len(CONTENTS))+headerSize[pb.DigestFunction_SHA256], 1) if err != nil { t.Fatal(err) } @@ -117,24 +119,26 @@ func TestCacheBasics(t *testing.T) { func TestCacheEviction(t *testing.T) { cacheDir := tempDir(t) defer os.RemoveAll(cacheDir) - testCache := New(cacheDir, 10, nil) + + testCache := New(cacheDir, 450, nil) expectedSizesNumItems := []struct { - expSize int64 - expNum int + blobSize int + totalSize int64 + expNum int }{ - {0, 1}, // 0 - {1, 2}, // 0, 1 - {3, 3}, // 0, 1, 2 - {6, 4}, // 0, 1, 2, 3 - {10, 5}, // 0, 1, 2, 3, 4 - {9, 2}, // 4, 5 - {6, 1}, // 6 - {7, 1}, // 7 + {0, 44, 1}, // 0 + {10, 98, 2}, // 1, 0 + {30, 172, 3}, // 2, 1, 0 + {60, 276, 4}, // 3, 2, 1, 0 + {120, 440, 5}, // 4, 3, 2, 1, 0 + {90, 402, 3}, // 5, 4, 3 ; 574 evict 0 => 530, evict 1 => 476, evict 2 => 402 + {60, 402, 3}, // 6, 5, 4 ; 506 evict 3 => 402 + {70, 352, 3}, // 7, 6, 5 ; 516 evict 4 => 238 } for i, thisExp := range expectedSizesNumItems { - strReader := strings.NewReader(strings.Repeat("a", i)) + strReader := strings.NewReader(strings.Repeat("a", thisExp.blobSize)) // Suitably-sized, unique key for these testcases: key := fmt.Sprintf("%0*d", sha256HashStrSize, i) @@ -143,12 +147,12 @@ func TestCacheEviction(t *testing.T) { sha256.Size*2, len(key), key) } - err := testCache.Put(cache.AC, key, int64(i), strReader) + err := testCache.Put(cache.AC, key, int64(thisExp.blobSize), strReader) if err != nil { t.Fatal(err) } - err = checkItems(testCache, thisExp.expSize, thisExp.expNum) + err = checkItems(testCache, thisExp.totalSize, thisExp.expNum) if err != nil { t.Fatal(err) } @@ -216,7 +220,7 @@ func hashStr(content string) string { func TestOverwrite(t *testing.T) { cacheDir := tempDir(t) defer os.RemoveAll(cacheDir) - testCache := New(cacheDir, 10, nil) + testCache := New(cacheDir, 10+headerSize[pb.DigestFunction_SHA256], nil) var err error err = putGetCompare(cache.CAS, hashStr("hello"), "hello", testCache) @@ -247,32 +251,73 @@ func TestOverwrite(t *testing.T) { } } +func ensureDirExists(t *testing.T, path string) { + if _, err := os.Stat(path); os.IsNotExist(err) { + err = os.MkdirAll(path, os.ModePerm) + if err != nil { + t.Fatal(err) + } + } +} + func TestCacheExistingFiles(t *testing.T) { cacheDir := tempDir(t) defer os.RemoveAll(cacheDir) - ensureDirExists(filepath.Join(cacheDir, "cas", "f5")) - ensureDirExists(filepath.Join(cacheDir, "cas", "fd")) - ensureDirExists(filepath.Join(cacheDir, "ac", "73")) - ensureDirExists(filepath.Join(cacheDir, "raw", "73")) + blobs := make([]struct { + data []byte + sha256hash string + file string + }, 4, 4) + blobs[0].data, blobs[0].sha256hash = testutils.RandomDataAndHash(64) + blobs[0].file = filepath.Join("cas", blobs[0].sha256hash[:2], blobs[0].sha256hash) + + blobs[1].data = make([]byte, len(blobs[0].data)) + copy(blobs[1].data, blobs[0].data) + blobs[1].data[0]++ + hb := sha256.Sum256(blobs[1].data) + blobs[1].sha256hash = hex.EncodeToString(hb[:]) + blobs[1].file = filepath.Join("cas", blobs[1].sha256hash[:2], blobs[1].sha256hash) + + blobs[2].data = make([]byte, len(blobs[0].data)) + copy(blobs[2].data, blobs[0].data) + blobs[2].data[0]++ + hb = sha256.Sum256(blobs[2].data) + blobs[2].sha256hash = hex.EncodeToString(hb[:]) + blobs[2].file = filepath.Join("ac.v2", blobs[2].sha256hash[:2], blobs[2].sha256hash) + + blobs[3].data = make([]byte, len(blobs[0].data)) + copy(blobs[3].data, blobs[2].data) + blobs[3].sha256hash = blobs[2].sha256hash + blobs[3].file = filepath.Join("raw.v2", blobs[3].sha256hash[:2], blobs[3].sha256hash) + + for _, it := range blobs { + dn := filepath.Join(cacheDir, filepath.Dir(it.file)) + ensureDirExists(t, dn) + fn := filepath.Join(cacheDir, it.file) + f, err := os.Create(fn) + if err != nil { + t.Fatal(err) + } - items := []string{ - "cas/f5/f53b46209596d170f7659a414c9ff9f6b545cf77ffd6e1cbe9bcc57e1afacfbd", - "cas/fd/fdce205a735f407ae2910426611893d99ed985e3d9a341820283ea0b7d046ee3", - "ac/73/733e21b37cef883579a88183eed0d00cdeea0b59e1bcd77db6957f881c3a6b54", - "raw/73/733e21b37cef883579a88183eed0d00cdeea0b59e1bcd77db6957f881c3a6b54", - } + err = writeHeader(f, it.sha256hash, int64(len(it.data))) + if err != nil { + t.Fatal(err) + } - for _, it := range items { - err := ioutil.WriteFile(filepath.Join(cacheDir, it), []byte(CONTENTS), os.ModePerm) + n, err := f.Write(it.data) if err != nil { t.Fatal(err) } + if n != len(it.data) { + t.Fatalf("short write: %d, expected: %d", n, len(it.data)) + } + // Wait a bit to account for atime granularity time.Sleep(10 * time.Millisecond) } - const expectedSize = 4 * int64(len(CONTENTS)) + expectedSize := 4 * (int64(len(blobs[0].data)) + headerSize[pb.DigestFunction_SHA256]) testCache := New(cacheDir, expectedSize, nil) err := checkItems(testCache, expectedSize, 4) @@ -281,18 +326,21 @@ func TestCacheExistingFiles(t *testing.T) { } // Adding a new file should evict items[0] (the oldest) - err = testCache.Put(cache.CAS, CONTENTS_HASH, int64(len(CONTENTS)), strings.NewReader(CONTENTS)) + err = testCache.Put(cache.CAS, CONTENTS_HASH, int64(len(CONTENTS)), + strings.NewReader(CONTENTS)) if err != nil { t.Fatal(err) } + expectedSize = expectedSize - int64(len(blobs[0].data)) + int64(len(CONTENTS)) + err = checkItems(testCache, expectedSize, 4) if err != nil { t.Fatal(err) } found, _ := testCache.Contains(cache.CAS, "f53b46209596d170f7659a414c9ff9f6b545cf77ffd6e1cbe9bcc57e1afacfbd") if found { - t.Fatalf("%s should have been evicted", items[0]) + t.Fatalf("%s should have been evicted", blobs[0].sha256hash) } } @@ -344,23 +392,26 @@ func TestMigrateFromOldDirectoryStructure(t *testing.T) { cacheDir := testutils.TempDir(t) defer os.RemoveAll(cacheDir) - acHash, err := testutils.CreateRandomFile(cacheDir+"/ac/", 512) + acSize := int64(512) + acHash, err := testutils.CreateRandomFile(cacheDir+"/ac/", acSize) if err != nil { t.Fatal(err) } - casHash1, err := testutils.CreateRandomFile(cacheDir+"/cas/", 1024) + + casSize := int64(1024) + casHash1, err := testutils.CreateRandomFile(cacheDir+"/cas/", casSize) if err != nil { t.Fatal(err) } - casHash2, err := testutils.CreateRandomFile(cacheDir+"/cas/", 1024) + casHash2, err := testutils.CreateRandomFile(cacheDir+"/cas/", casSize) if err != nil { t.Fatal(err) } - testCache := New(cacheDir, 2560, nil) - _, numItems := testCache.Stats() - if numItems != 3 { - t.Fatalf("Expected test cache size 3 but was %d", numItems) - } + + sha256HeaderSize := headerSize[pb.DigestFunction_SHA256] + + cacheSize := acSize + (casSize+sha256HeaderSize)*2 + testCache := New(cacheDir, cacheSize, nil) var found bool found, _ = testCache.Contains(cache.AC, acHash) @@ -377,6 +428,11 @@ func TestMigrateFromOldDirectoryStructure(t *testing.T) { if !found { t.Fatalf("Expected cache to contain CAS entry '%s'", casHash2) } + + _, numItems := testCache.Stats() + if numItems != 3 { + t.Fatalf("Expected test cache size 3 but was %d", numItems) + } } func TestLoadExistingEntries(t *testing.T) { @@ -387,7 +443,7 @@ func TestLoadExistingEntries(t *testing.T) { numBlobs := int64(3) blobSize := int64(1024) - acHash, err := testutils.CreateCacheFile(cacheDir+"/ac/", blobSize) + acHash, err := testutils.CreateCacheFile(cacheDir+"/ac.v2/", blobSize) if err != nil { t.Fatal(err) } @@ -395,7 +451,7 @@ func TestLoadExistingEntries(t *testing.T) { if err != nil { t.Fatal(err) } - rawHash, err := testutils.CreateCacheFile(cacheDir+"/raw/", blobSize) + rawHash, err := testutils.CreateCacheFile(cacheDir+"/raw.v2/", blobSize) if err != nil { t.Fatal(err) } @@ -429,12 +485,12 @@ func TestDistinctKeyspaces(t *testing.T) { cacheDir := testutils.TempDir(t) defer os.RemoveAll(cacheDir) - blobSize := 1024 - cacheSize := int64(blobSize * 3) + blobSize := int64(1024) + cacheSize := (blobSize + headerSize[pb.DigestFunction_SHA256]) * 3 testCache := New(cacheDir, cacheSize, nil) - blob, casHash := testutils.RandomDataAndHash(1024) + blob, casHash := testutils.RandomDataAndHash(blobSize) // Add the same blob with the same key, to each of the three // keyspaces, and verify that we have exactly three items in @@ -634,3 +690,105 @@ func TestHttpProxyBackend(t *testing.T) { t.Fatalf("Expected '%v' but received '%v", retrievedData, blob) } } + +func TestChecksumHeader(t *testing.T) { + + blob := []byte{0, 1, 2, 3, 4, 5, 6, 7} + + testCases := []struct { + kind pb.DigestFunction_Value + hash string + size int64 + success bool // True if the {hash,size} are valid. + }{ + {pb.DigestFunction_SHA256, + "0000000011111111222222223333333344444444555555556666666677777777", + 42, true}, + {pb.DigestFunction_SHA256, + "0000000011111111222222223333333344444444555555556666666677777777", + 0, true}, + + {pb.DigestFunction_UNKNOWN, + "00000000111111112222222233333333444444445555555566666666777777778", + 42, false}, // invalid hex string (odd length) + {pb.DigestFunction_UNKNOWN, + "000000001111111122222222333333334444444455555555666666667777777788", + 42, false}, // hash too long + {pb.DigestFunction_UNKNOWN, + "000000001111111122222222333333334444444455555555666666667777777", + 42, false}, // invalid hex string (odd length) + {pb.DigestFunction_UNKNOWN, + "00000000111111112222222233333333444444445555555566666666777777", + 42, false}, // hash too short + {pb.DigestFunction_UNKNOWN, + "", + 42, false}, + {pb.DigestFunction_UNKNOWN, + "0000000011111111222222223333333344444444555555556666666677777777", + -1, false}, // invalid (negative) size + } + + // Note that these tests just confirm that we can read/write a valid + // header and a blob. They dot not confirm that the header describes + // the blob. + + for _, tc := range testCases { + var buf bytes.Buffer + + err := writeHeader(&buf, tc.hash, tc.size) + if !tc.success { + if err == nil { + t.Error("Expected testcase to fail", tc.hash, tc.size) + } + + continue + } + if err != nil { + t.Fatal("Expected testscase to succeed, got:", err) + } + + // Check the header size manually, since it's not exposed by + // the readHeader function. + if int64(buf.Len()) != headerSize[tc.kind] { + t.Fatalf("Expected data header of size %d bytes, got %d. %s %d %v %s", + headerSize[tc.kind], buf.Len(), tc.hash, tc.size, tc.success, err) + } + + // Write the blob. + n, err := buf.Write(blob) + if err != nil { + t.Fatal(err) + } + if n != len(blob) { + t.Fatalf("expected to write %d bytes, instead wrote %d bytes", + len(blob), n) + } + + dt, readHash, readSize, err := readHeader(&buf) + if err != nil { + t.Fatal(err) + } + if dt == pb.DigestFunction_UNKNOWN { + t.Fatal("Unknown digest type") + } + + if readHash != tc.hash { + t.Fatalf("Read a different hash '%s' than was written '%s'", + readHash, tc.hash) + } + + if readSize != tc.size { + t.Fatalf("Read a different size %d than was written %d", + readSize, tc.size) + } + + readBlob, err := ioutil.ReadAll(&buf) + if err != nil { + t.Fatal(err) + } + + if bytes.Compare(blob, readBlob) != 0 { + t.Fatal("Read a different blob than was written") + } + } +} diff --git a/cache/disk/load.go b/cache/disk/load.go new file mode 100644 index 000000000..d4a893899 --- /dev/null +++ b/cache/disk/load.go @@ -0,0 +1,364 @@ +package disk + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "io/ioutil" + "log" + "os" + "path/filepath" + "regexp" + "runtime" + "sort" + "sync" + + "github.com/djherbis/atime" + + "github.com/buchgr/bazel-remote/cache" + + pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" +) + +type scanDir struct { + name string + dest string + version int + kind cache.EntryKind +} + +// Return a list of importItems sorted by atime, and a boolean that is +// true if the caller should migrate items, in reverse LRU order. +func (c *DiskCache) findCacheItems() ([]importItem, bool, error) { + files := []importItem{} + + var mu sync.Mutex // Protects the migrate variable below: + migrate := false + + // Workers submit discovered files here. + filesChan := make(chan []importItem) + + // Workers receive a dir to scan here. + workChan := make(chan scanDir) + + // Workers can report errors here: + errChan := make(chan error) + + numWorkers := runtime.NumCPU() // TODO: consider tweaking this. + + hashKeyRegex := regexp.MustCompile("^[a-f0-9]{64}$") + + // Spawn some worker goroutines to read the cache concurrently. + var wg sync.WaitGroup + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func(i int) { + needMigration := false + defer func() { + if needMigration { + mu.Lock() + migrate = true + mu.Unlock() + } + wg.Done() + }() + + for scanDir := range workChan { + _, err := os.Stat(scanDir.name) + if os.IsNotExist(err) { + continue + } else if err != nil { + errChan <- err + return + } + + listing, err := ioutil.ReadDir(scanDir.name) + if err != nil { + errChan <- err + return + } + + addChecksum := scanDir.version < 2 && (scanDir.kind != cache.CAS) + + toSend := make([]importItem, 0, len(listing)) + for e := range listing { + if listing[e].IsDir() { + continue + } + + if !hashKeyRegex.MatchString(listing[e].Name()) { + log.Println("Unexpected file in cache:", + filepath.Join(scanDir.name, listing[e].Name())) + continue + } + + basename := listing[e].Name() + entry := importItem{ + name: filepath.Join(scanDir.name, basename), + info: listing[e], + addChecksum: addChecksum, + } + + if scanDir.version < 2 { + entry.oldName = entry.name + if scanDir.kind == cache.CAS { + entry.name = filepath.Join(c.dir, + scanDir.kind.String(), + basename[:2], + basename) + } else { + entry.name = filepath.Join(c.dir, + scanDir.kind.String()+".v2", + basename[:2], + basename) + } + + needMigration = true + } + + toSend = append(toSend, entry) + } + + if len(toSend) > 0 { + filesChan <- toSend + } + } + }(i) + } + + go func() { + wg.Wait() + // All workers have now finished. + close(filesChan) + }() + + // Provide the workers with directories to scan. + + workChan <- scanDir{ + name: filepath.Join(c.dir, "ac"), + version: 0, + kind: cache.AC, + } + workChan <- scanDir{ + name: filepath.Join(c.dir, "cas"), + version: 0, + kind: cache.CAS, + } + + hexLetters := []byte("0123456789abcdef") + for _, c1 := range hexLetters { + for _, c2 := range hexLetters { + subDir := string(c1) + string(c2) + + workChan <- scanDir{ + name: filepath.Join(c.dir, "cas", subDir), + version: 2, // v1 and v2 cas dirs are the same. + kind: cache.CAS, + } + + workChan <- scanDir{ + name: filepath.Join(c.dir, "ac", subDir), + version: 1, + kind: cache.AC, + } + workChan <- scanDir{ + name: filepath.Join(c.dir, "ac.v2", subDir), + version: 2, + kind: cache.AC, + } + + workChan <- scanDir{ + name: filepath.Join(c.dir, "raw", subDir), + version: 1, + kind: cache.RAW, + } + workChan <- scanDir{ + name: filepath.Join(c.dir, "raw.v2", subDir), + version: 2, + kind: cache.RAW, + } + } + } + + // No more dirs for the workers to process. + close(workChan) + +OuterLoop: + for { + select { + case err := <-errChan: + return nil, false, err + case f, found := <-filesChan: + if !found { + break OuterLoop + } + files = append(files, f...) + } + } + + log.Println("Sorting cache files by atime.") + sort.Slice(files, func(i int, j int) bool { + return atime.Get(files[i].info).Before(atime.Get(files[j].info)) + }) + + return files, migrate, nil +} + +func updateAccesstime(file string) { + f, err := os.Open(file) + if err != nil { + return + } + var buf [1]byte + f.Read(buf[:]) + f.Close() +} + +func (c *DiskCache) migrateFiles(files []importItem) error { + log.Println("Migrating old cache items to new directory structure.") + + var err error + for _, i := range files { + if i.oldName == "" { + updateAccesstime(filepath.Join(c.dir, i.name)) + continue + } + + if !i.addChecksum { + err = os.Rename(i.oldName, i.name) + if err != nil { + return err + } + + continue + } + + err = moveAndChecksum(i.oldName, i.name) + if err != nil { + return err + } + } + + // Try to remove old (hopefully) empty dirs. + + hexLetters := []byte("0123456789abcdef") + for _, c1 := range hexLetters { + for _, c2 := range hexLetters { + subDir := string(c1) + string(c2) + + acV1subDir := filepath.Join(c.dir, "ac", subDir) + err := os.Remove(acV1subDir) + if err != nil && !os.IsNotExist(err) { + log.Printf("Warning: failed to remove old format directory \"%s\": %v", + acV1subDir, err) + } + rawV1subDir := filepath.Join(c.dir, "raw", subDir) + err = os.Remove(rawV1subDir) + if err != nil && !os.IsNotExist(err) { + log.Printf("Warning: failed to remove old format directory \"%s\": %v", + acV1subDir, err) + } + } + } + + acV1dir := filepath.Join(c.dir, "ac") + err = os.Remove(acV1dir) + if err != nil && !os.IsNotExist(err) { + log.Printf("Warning: failed to remove old format directory \"%s\": %v", + acV1dir, err) + } + rawV1dir := filepath.Join(c.dir, "raw") + err = os.Remove(rawV1dir) + if err != nil && !os.IsNotExist(err) { + log.Printf("Warning: failed to remove old format directory \"%s\": %v", + rawV1dir, err) + } + + return nil +} + +// Replace a raw file with a "v2" style file with the data integrity +// header. `old` and `new` must be different files (OK since we store +// v2 style files in different directories. +func moveAndChecksum(old string, new string) error { + + key := filepath.Base(old) + dt := digestType(key) + if dt == pb.DigestFunction_UNKNOWN { + return fmt.Errorf("Unsupported digest: %s", old) + } + + headerSize, ok := headerSize[dt] + if !ok { + return fmt.Errorf("Unknown header size for digest: %d", dt) + } + + success := false + openOld := false + openNew := false + var in *os.File + var out *os.File + + defer func() { + if openOld { + in.Close() + } + + if openNew { + out.Close() + + if !success { + os.Remove(new) + } + } + + if success { + os.Remove(old) + } + }() + + in, err := os.Open(old) + if err != nil { + return err + } + openOld = true + + out, err = os.Create(new) + if err != nil { + return err + } + openNew = true + + // Make space for the header. + _, err = out.Seek(headerSize, 0) + if err != nil { + return err + } + + hasher := sha256.New() + mw := io.MultiWriter(out, hasher) + + sizeBytes, err := io.Copy(mw, in) + if err != nil { + return err + } + + // Go back and fill in the header. + _, err = out.Seek(0, 0) + if err != nil { + return err + } + + hashBytes := hasher.Sum(nil) + hashStr := hex.EncodeToString(hashBytes) + + err = writeHeader(out, hashStr, sizeBytes) + if err != nil { + return err + } + + success = true + + return nil +} diff --git a/server/http_test.go b/server/http_test.go index 67772c6d6..932213057 100644 --- a/server/http_test.go +++ b/server/http_test.go @@ -84,7 +84,7 @@ func TestUploadFilesConcurrently(t *testing.T) { requests[i] = r } - c := disk.New(cacheDir, 1000*1024, nil) + c := disk.New(cacheDir, 1000*(1024+32+4+8), nil) h := NewHTTPCache(c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, "") handler := http.HandlerFunc(h.CacheHandler) @@ -140,7 +140,9 @@ func TestUploadSameFileConcurrently(t *testing.T) { data, hash := testutils.RandomDataAndHash(1024) - c := disk.New(cacheDir, 1024, nil) + maxHeaderSize := int64(8 + 4 + sha256.Size) + + c := disk.New(cacheDir, 1024+maxHeaderSize, nil) h := NewHTTPCache(c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, "") handler := http.HandlerFunc(h.CacheHandler) @@ -239,11 +241,21 @@ func TestUploadEmptyActionResult(t *testing.T) { "got ", status) } - cacheFile := filepath.Join(cacheDir, "ac", hash[:2], hash) + cacheFile := filepath.Join(cacheDir, "ac.v2", hash[:2], hash) cachedData, err := ioutil.ReadFile(cacheFile) if err != nil { t.Fatal(err) } + + // FIXME: test sha1, md5 too + headerSize := sha256.Size + 8 + 4 + + if len(cachedData) < headerSize { + t.Fatalf("data too short to contain header: %d, must be at least: %d", + len(cachedData), headerSize) + } + cachedData = cachedData[headerSize:] + if len(cachedData) == 0 { t.Fatal("expected non-zero length ActionResult to be cached") }