Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/engine: Teeing engine fixes #45472

Merged
merged 1 commit into from
Mar 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2998,6 +2998,15 @@ func TestDecommission(t *testing.T) {
t.Skip("skipping under testrace: #39807 and #37811")
}

// This test relies on concurrently waiting for a value to change in the
// underlying engine(s). Since the teeing engine does not respond well to
// value mismatches, whether transient or permanent, skip this test if the
// teeing engine is being used. See
// https://github.com/cockroachdb/cockroach/issues/42656 for more context.
if engine.DefaultStorageEngine == enginepb.EngineTypeTeePebbleRocksDB {
t.Skip("disabled on teeing engine")
}

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 5, base.TestClusterArgs{
ReplicationMode: base.ReplicationAuto,
Expand Down
19 changes: 19 additions & 0 deletions pkg/storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -1694,6 +1695,15 @@ func TestSystemZoneConfigs(t *testing.T) {
t.Skip()
}

// This test relies on concurrently waiting for a value to change in the
// underlying engine(s). Since the teeing engine does not respond well to
// value mismatches, whether transient or permanent, skip this test if the
// teeing engine is being used. See
// https://github.com/cockroachdb/cockroach/issues/42656 for more context.
if engine.DefaultStorageEngine == enginepb.EngineTypeTeePebbleRocksDB {
t.Skip("disabled on teeing engine")
}

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 7, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Expand Down Expand Up @@ -2159,6 +2169,15 @@ func TestRandomConcurrentAdminChangeReplicasRequests(t *testing.T) {
func TestReplicaTombstone(t *testing.T) {
defer leaktest.AfterTest(t)()

// This test relies on concurrently waiting for a value to change in the
// underlying engine(s). Since the teeing engine does not respond well to
// value mismatches, whether transient or permanent, skip this test if the
// teeing engine is being used. See
// https://github.com/cockroachdb/cockroach/issues/42656 for more context.
if engine.DefaultStorageEngine == enginepb.EngineTypeTeePebbleRocksDB {
t.Skip("disabled on teeing engine")
}

t.Run("(1) ChangeReplicasTrigger", func(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand Down
1 change: 0 additions & 1 deletion pkg/storage/engine/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,6 @@ func newTeeInMem(ctx context.Context, attrs roachpb.Attributes, cacheSize int64)
pebbleInMem := newPebbleInMem(ctx, attrs, cacheSize)
rocksDBInMem := newRocksDBInMem(attrs, cacheSize)
tee := NewTee(ctx, rocksDBInMem, pebbleInMem)
tee.inMem = true
return tee
}

Expand Down
159 changes: 105 additions & 54 deletions pkg/storage/engine/tee.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"context"
"encoding/hex"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
Expand All @@ -32,10 +34,9 @@ import (
// This engine is only meant to be used in testing. No performance or
// stability guarantees are made about this engine in production.
type TeeEngine struct {
ctx context.Context
eng1 Engine
eng2 Engine
inMem bool
ctx context.Context
eng1 Engine
eng2 Engine
}

var _ Engine = &TeeEngine{}
Expand Down Expand Up @@ -136,7 +137,9 @@ func (t *TeeEngine) GetProto(
return false, 0, 0, err
}

err = protoutil.Unmarshal(val, msg)
if msg != nil {
err = protoutil.Unmarshal(val, msg)
}
keyBytes = int64(key.Len())
valBytes = int64(len(val))
return true, keyBytes, valBytes, err
Expand Down Expand Up @@ -256,9 +259,7 @@ func (t *TeeEngine) GetCompactionStats() string {

// GetStats implements the Engine interface.
func (t *TeeEngine) GetStats() (*Stats, error) {
// TODO(itsbilal): Test why getting stats from eng1 segfaults when
// eng1 = RocksDB.
return t.eng2.GetStats()
return t.eng1.GetStats()
}

// GetEncryptionRegistries implements the Engine interface.
Expand Down Expand Up @@ -330,26 +331,54 @@ func (t *TeeEngine) Type() enginepb.EngineType {
return enginepb.EngineTypeTeePebbleRocksDB
}

// Helper to remap a path in the first engine's aux dir, into the same path in
// the second engine's aux dir. Returns ok = true only if the provided path
// is in the first engine's aux dir.
func (t *TeeEngine) remapPath(path1 string) (path2 string, ok bool) {
auxDir1 := t.eng1.GetAuxiliaryDir()
if !strings.HasPrefix(path1, auxDir1) {
// This dir isn't in the first engine's aux dir.
return path1, false
}
ok = true
path2 = filepath.Join(t.eng2.GetAuxiliaryDir(), strings.TrimPrefix(path1, auxDir1))
return
}

// IngestExternalFiles implements the Engine interface.
func (t *TeeEngine) IngestExternalFiles(ctx context.Context, paths []string) error {
var err, err2 error
// Special case: If either engine is RocksDB, run that last, since RocksDB
// IngestExternalFiles deletes the specified files.
if rocksDBEng, ok := t.eng1.(*RocksDB); ok {
err2 = t.eng2.IngestExternalFiles(ctx, paths)
err = rocksDBEng.IngestExternalFiles(ctx, paths)
} else {
err = t.eng1.IngestExternalFiles(ctx, paths)
err2 = t.eng2.IngestExternalFiles(ctx, paths)

// The paths should be in eng1's aux directory. Map them to eng2's aux
// directory.
paths2 := make([]string, len(paths))
for i, path := range paths {
var ok bool
paths2[i], ok = t.remapPath(path)
if !ok {
paths2[i] = filepath.Join(t.eng2.GetAuxiliaryDir(), "temp-ingest", filepath.Base(path))
data, err := t.eng1.ReadFile(path)
if err != nil {
return err
}
f, err := t.eng2.CreateFile(paths2[i])
if err != nil {
return err
}
_, _ = f.Write(data)
_ = f.Sync()
_ = f.Close()
}
}

err = t.eng1.IngestExternalFiles(ctx, paths)
err2 = t.eng2.IngestExternalFiles(ctx, paths2)
return fatalOnErrorMismatch(t.ctx, err, err2)
}

// PreIngestDelay implements the Engine interface.
func (t *TeeEngine) PreIngestDelay(ctx context.Context) {
// TODO(itsbilal): Test why PreIngestDelay on eng1 segfaults when
// eng1 = RocksDB in tests like TestDBAddSSTable.
t.eng2.PreIngestDelay(ctx)
t.eng1.PreIngestDelay(ctx)
}

// ApproximateDiskBytes implements the Engine interface.
Expand All @@ -372,17 +401,19 @@ func (t *TeeEngine) CompactRange(start, end roachpb.Key, forceBottommost bool) e

// InMem implements the Engine interface.
func (t *TeeEngine) InMem() bool {
return t.inMem
return t.eng1.InMem()
}

// CreateFile implements the FS interface.
func (t *TeeEngine) CreateFile(filename string) (fs.File, error) {
_ = os.MkdirAll(filepath.Dir(filename), 0755)
file1, err := t.eng1.CreateFile(filename)
if !t.inMem {
// No need to write twice if the two engines share the same file system.
filename2, ok := t.remapPath(filename)
if !ok {
return file1, err
}
file2, err2 := t.eng2.CreateFile(filename)
_ = os.MkdirAll(filepath.Dir(filename2), 0755)
file2, err2 := t.eng2.CreateFile(filename2)
if err = fatalOnErrorMismatch(t.ctx, err, err2); err != nil {
return nil, err
}
Expand All @@ -395,12 +426,14 @@ func (t *TeeEngine) CreateFile(filename string) (fs.File, error) {

// CreateFileWithSync implements the FS interface.
func (t *TeeEngine) CreateFileWithSync(filename string, bytesPerSync int) (fs.File, error) {
_ = os.MkdirAll(filepath.Dir(filename), 0755)
file1, err := t.eng1.CreateFileWithSync(filename, bytesPerSync)
if !t.inMem {
// No need to write twice if the two engines share the same file system.
filename2, ok := t.remapPath(filename)
if !ok {
return file1, err
}
file2, err2 := t.eng2.CreateFileWithSync(filename, bytesPerSync)
_ = os.MkdirAll(filepath.Dir(filename2), 0755)
file2, err2 := t.eng2.CreateFileWithSync(filename2, bytesPerSync)
if err = fatalOnErrorMismatch(t.ctx, err, err2); err != nil {
return nil, err
}
Expand All @@ -414,7 +447,11 @@ func (t *TeeEngine) CreateFileWithSync(filename string, bytesPerSync int) (fs.Fi
// OpenFile implements the FS interface.
func (t *TeeEngine) OpenFile(filename string) (fs.File, error) {
file1, err := t.eng1.OpenFile(filename)
file2, err2 := t.eng2.OpenFile(filename)
filename2, ok := t.remapPath(filename)
if !ok {
return file1, err
}
file2, err2 := t.eng2.OpenFile(filename2)
if err = fatalOnErrorMismatch(t.ctx, err, err2); err != nil {
return nil, err
}
Expand All @@ -428,7 +465,11 @@ func (t *TeeEngine) OpenFile(filename string) (fs.File, error) {
// OpenDir implements the FS interface.
func (t *TeeEngine) OpenDir(name string) (fs.File, error) {
file1, err := t.eng1.OpenDir(name)
file2, err2 := t.eng2.OpenDir(name)
name2, ok := t.remapPath(name)
if !ok {
return file1, err
}
file2, err2 := t.eng2.OpenDir(name2)
if err = fatalOnErrorMismatch(t.ctx, err, err2); err != nil {
return nil, err
}
Expand All @@ -447,85 +488,91 @@ func (t *TeeEngine) ReadFile(filename string) ([]byte, error) {
// WriteFile implements the Engine interface.
func (t *TeeEngine) WriteFile(filename string, data []byte) error {
err := t.eng1.WriteFile(filename, data)
if !t.inMem {
// No need to write twice if the two engines share the same file system.
filename2, ok := t.remapPath(filename)
if !ok {
return err
}
err2 := t.eng2.WriteFile(filename, data)
_ = os.MkdirAll(filepath.Dir(filename2), 0755)
err2 := t.eng2.WriteFile(filename2, data)
return fatalOnErrorMismatch(t.ctx, err, err2)
}

// DeleteFile implements the FS interface.
func (t *TeeEngine) DeleteFile(filename string) error {
err := t.eng1.DeleteFile(filename)
if !t.inMem {
// No need to write twice if the two engines share the same file system.
filename2, ok := t.remapPath(filename)
if !ok {
return err
}
err2 := t.eng2.DeleteFile(filename)
err2 := t.eng2.DeleteFile(filename2)
return fatalOnErrorMismatch(t.ctx, err, err2)
}

// DeleteDirAndFiles implements the Engine interface.
func (t *TeeEngine) DeleteDirAndFiles(dir string) error {
err := t.eng1.DeleteDirAndFiles(dir)
if !t.inMem {
// No need to write twice if the two engines share the same file system.
dir2, ok := t.remapPath(dir)
if !ok {
return err
}
err2 := t.eng2.DeleteDirAndFiles(dir)
err2 := t.eng2.DeleteDirAndFiles(dir2)
return fatalOnErrorMismatch(t.ctx, err, err2)
}

// LinkFile implements the FS interface.
func (t *TeeEngine) LinkFile(oldname, newname string) error {
err := t.eng1.LinkFile(oldname, newname)
if !t.inMem {
// No need to write twice if the two engines share the same file system.
oldname2, ok := t.remapPath(oldname)
if !ok {
return err
}
err2 := t.eng2.LinkFile(oldname, newname)
newname2, _ := t.remapPath(newname)
err2 := t.eng2.LinkFile(oldname2, newname2)
return fatalOnErrorMismatch(t.ctx, err, err2)
}

// RenameFile implements the FS interface.
func (t *TeeEngine) RenameFile(oldname, newname string) error {
err := t.eng1.RenameFile(oldname, newname)
if !t.inMem {
// No need to write twice if the two engines share the same file system.
oldname2, ok := t.remapPath(oldname)
if !ok {
return err
}
err2 := t.eng2.RenameFile(oldname, newname)
newname2, _ := t.remapPath(newname)
err2 := t.eng2.RenameFile(oldname2, newname2)
return fatalOnErrorMismatch(t.ctx, err, err2)
}

// CreateDir implements the FS interface.
func (t *TeeEngine) CreateDir(name string) error {
err := t.eng1.CreateDir(name)
if !t.inMem {
// No need to create twice if the two engines share the same file system.
name2, ok := t.remapPath(name)
if !ok {
return err
}
err2 := t.eng2.CreateDir(name)
err2 := t.eng2.CreateDir(name2)
return fatalOnErrorMismatch(t.ctx, err, err2)
}

// DeleteDir implements the FS interface.
func (t *TeeEngine) DeleteDir(name string) error {
err := t.eng1.DeleteDir(name)
if !t.inMem {
// No need to delete twice if the two engines share the same file system.
name2, ok := t.remapPath(name)
if !ok {
return err
}
err2 := t.eng2.DeleteDir(name)
err2 := t.eng2.DeleteDir(name2)
return fatalOnErrorMismatch(t.ctx, err, err2)
}

// ListDir implements the FS interface.
func (t *TeeEngine) ListDir(name string) ([]string, error) {
list1, err := t.eng1.ListDir(name)
_, err2 := t.eng2.ListDir(name)

name2, ok := t.remapPath(name)
if !ok {
return list1, err
}
_, err2 := t.eng2.ListDir(name2)
if err = fatalOnErrorMismatch(t.ctx, err, err2); err != nil {
return nil, err
}
Expand Down Expand Up @@ -769,7 +816,9 @@ func (t *TeeEngineReader) GetProto(
return false, 0, 0, err
}

err = protoutil.Unmarshal(val, msg)
if msg != nil {
err = protoutil.Unmarshal(val, msg)
}
keyBytes = int64(key.Len())
valBytes = int64(len(val))
return true, keyBytes, valBytes, err
Expand Down Expand Up @@ -869,7 +918,9 @@ func (t *TeeEngineBatch) GetProto(
return false, 0, 0, err
}

err = protoutil.Unmarshal(val, msg)
if msg != nil {
err = protoutil.Unmarshal(val, msg)
}
keyBytes = int64(key.Len())
valBytes = int64(len(val))
return true, keyBytes, valBytes, err
Expand Down Expand Up @@ -1127,7 +1178,7 @@ func (t *TeeEngineIter) ComputeStats(
if !stats1.Equal(stats2) {
log.Fatalf(t.ctx, "mismatching stats between engines: %v != %v", stats1, stats2)
}
return stats1, nil
return stats1, err
}

// FindSplitKey implements the Iterator interface.
Expand Down