From 34bf5ef791718bab339514cdacfc8ff54345f975 Mon Sep 17 00:00:00 2001 From: David Hartunian Date: Wed, 3 Aug 2022 13:44:49 -0400 Subject: [PATCH] sqlliveness: add timeouts to heartbeats Previously, sqlliveness heartbeat operations could block on the transactions that were involved. This change introduces some timeouts of the length of the heartbeat during the create and refresh operations. Resolves #85541 Release note: None Release justification: low-risk bugfix to existing functionality --- .../tenantcostclient/tenant_side_test.go | 2 +- pkg/sql/sqlliveness/slinstance/BUILD.bazel | 1 + pkg/sql/sqlliveness/slinstance/slinstance.go | 55 +++++++- .../sqlliveness/slinstance/slinstance_test.go | 124 +++++++++++++++++- pkg/sql/sqlliveness/slstorage/test_helpers.go | 13 +- 5 files changed, 188 insertions(+), 7 deletions(-) diff --git a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go index 94b5bf1f3f78..fbe8610c18c3 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go +++ b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go @@ -930,7 +930,7 @@ func TestSQLLivenessExemption(t *testing.T) { // Make the tenant heartbeat like crazy. ctx := context.Background() //slinstance.DefaultTTL.Override(ctx, &st.SV, 20*time.Millisecond) - slinstance.DefaultHeartBeat.Override(ctx, &st.SV, time.Millisecond) + slinstance.DefaultHeartBeat.Override(ctx, &st.SV, 10*time.Millisecond) _, tenantDB := serverutils.StartTenant(t, hostServer, base.TestTenantArgs{ TenantID: tenantID, diff --git a/pkg/sql/sqlliveness/slinstance/BUILD.bazel b/pkg/sql/sqlliveness/slinstance/BUILD.bazel index 522c3e91a3ab..e73b0091c27b 100644 --- a/pkg/sql/sqlliveness/slinstance/BUILD.bazel +++ b/pkg/sql/sqlliveness/slinstance/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/uuid", + "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/sql/sqlliveness/slinstance/slinstance.go b/pkg/sql/sqlliveness/slinstance/slinstance.go index d76e8ca5a1c1..67a51512851b 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" ) var ( @@ -237,6 +238,48 @@ func (l *Instance) extendSession(ctx context.Context, s *session) (bool, error) return true, nil } +func (l *Instance) createSessionWithTimeout( + ctx context.Context, timeout time.Duration, +) (*session, error) { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeout) // nolint:context + defer cancel() + var s *session + var err error + createChan := make(chan struct{}) + go func() { + s, err = l.createSession(ctx) + close(createChan) + }() + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-createChan: + return s, err + } +} + +func (l *Instance) extendSessionWithTimeout( + ctx context.Context, s *session, timeout time.Duration, +) (bool, error) { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeout) // nolint:context + defer cancel() + var found bool + var err error + extendChan := make(chan struct{}) + go func() { + found, err = l.extendSession(ctx, s) + close(extendChan) + }() + select { + case <-ctx.Done(): + return false, ctx.Err() + case <-extendChan: + return found, err + } +} + func (l *Instance) heartbeatLoop(ctx context.Context) { defer func() { log.Warning(ctx, "exiting heartbeat loop") @@ -253,8 +296,10 @@ func (l *Instance) heartbeatLoop(ctx context.Context) { t.Read = true s, _ := l.getSessionOrBlockCh() if s == nil { - newSession, err := l.createSession(ctx) + var newSession *session + newSession, err := l.createSessionWithTimeout(ctx, l.hb()) if err != nil { + log.Errorf(ctx, "sqlliveness failed to create new session: %v", err) func() { l.mu.Lock() defer l.mu.Unlock() @@ -270,12 +315,16 @@ func (l *Instance) heartbeatLoop(ctx context.Context) { t.Reset(l.hb()) continue } - found, err := l.extendSession(ctx, s) - if err != nil { + found, err := l.extendSessionWithTimeout(ctx, s, l.hb()) + if err != nil && errors.Is(err, context.DeadlineExceeded) { + // Unable to extend session due to unknown error. + // Clear and stop heartbeat loop. + log.Errorf(ctx, "sqlliveness failed to extend session: %v", err) l.clearSession(ctx) return } if !found { + // No existing session found, immediately create one. l.clearSession(ctx) // Start next loop iteration immediately to insert a new session. t.Reset(0) diff --git a/pkg/sql/sqlliveness/slinstance/slinstance_test.go b/pkg/sql/sqlliveness/slinstance/slinstance_test.go index 75b069bab914..6f56c473aff7 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance_test.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance_test.go @@ -40,8 +40,8 @@ func TestSQLInstance(t *testing.T) { clusterversion.TestingBinaryVersion, clusterversion.TestingBinaryMinSupportedVersion, true /* initializeVersion */) - slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Microsecond) - slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, time.Microsecond) + slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Millisecond) + slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 1*time.Millisecond) fakeStorage := slstorage.NewFakeStorage() sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil) @@ -91,3 +91,123 @@ func TestSQLInstance(t *testing.T) { _, err = sqlInstance.Session(ctx) require.Error(t, err) } + +// TestSQLInstanceDeadlines tests that we have proper deadlines set on the +// create and extend session operations. This is done by blocking the fake +// storage layer and ensuring that no sessions get created because the +// timeouts are constantly triggered. +func TestSQLInstanceDeadlines(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx, stopper := context.Background(), stop.NewStopper() + defer stopper.Stop(ctx) + + clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */) + settings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.TestingBinaryMinSupportedVersion, + true /* initializeVersion */) + slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Millisecond) + slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 1*time.Millisecond) + + fakeStorage := slstorage.NewFakeStorage() + // block the fake storage + fakeStorage.BlockCh = make(chan chan struct{}) + cleanUpFunc := func() { + ch := <-fakeStorage.BlockCh + close(ch) + close(fakeStorage.BlockCh) + fakeStorage.BlockCh = nil + } + defer cleanUpFunc() + + sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil) + sqlInstance.Start(ctx) + + // verify that we do not create a session + require.Never( + t, + func() bool { + _, err := sqlInstance.Session(ctx) + if err != nil { + return false + } + return true + }, + 10*time.Millisecond, 1*time.Millisecond, + ) +} + +// TestSQLInstanceDeadlinesExtend tests that we have proper deadlines set on the +// create and extend session operations. This tests the case where the session is +// successfully created first and then blocks indefinitely. +func TestSQLInstanceDeadlinesExtend(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx, stopper := context.Background(), stop.NewStopper() + defer stopper.Stop(ctx) + + clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */) + settings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.TestingBinaryMinSupportedVersion, + true /* initializeVersion */) + slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Millisecond) + // Must be shorter than the storage sleep amount below + slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 1*time.Millisecond) + + fakeStorage := slstorage.NewFakeStorage() + sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil) + sqlInstance.Start(ctx) + + // verify that eventually session is created successfully + require.Eventually( + t, + func() bool { + _, err := sqlInstance.Session(ctx) + if err != nil { + return false + } + return true + }, + 10*time.Millisecond, 1*time.Millisecond, + ) + + // verify that session is also extended successfully a few times + require.Never( + t, + func() bool { + _, err := sqlInstance.Session(ctx) + if err != nil { + return true + } + return false + }, + 10*time.Millisecond, 1*time.Millisecond, + ) + + // block the fake storage + fakeStorage.BlockCh = make(chan chan struct{}) + cleanUpFunc := func() { + ch := <-fakeStorage.BlockCh + close(ch) + close(fakeStorage.BlockCh) + fakeStorage.BlockCh = nil + } + defer cleanUpFunc() + + // expect subsequent create/extend calls to fail + require.Eventually( + t, + func() bool { + _, err := sqlInstance.Session(ctx) + if err != nil { + return true + } + return false + }, + 10*time.Millisecond, 1*time.Millisecond, + ) +} diff --git a/pkg/sql/sqlliveness/slstorage/test_helpers.go b/pkg/sql/sqlliveness/slstorage/test_helpers.go index 23eb1115f217..2b5accc47f06 100644 --- a/pkg/sql/sqlliveness/slstorage/test_helpers.go +++ b/pkg/sql/sqlliveness/slstorage/test_helpers.go @@ -21,7 +21,8 @@ import ( // FakeStorage implements the sqlliveness.Storage interface. type FakeStorage struct { - mu struct { + BlockCh chan chan struct{} + mu struct { syncutil.Mutex sessions map[sqlliveness.SessionID]hlc.Timestamp } @@ -50,6 +51,11 @@ func (s *FakeStorage) Insert( ) error { s.mu.Lock() defer s.mu.Unlock() + if s.BlockCh != nil { + ch := make(chan struct{}) + s.BlockCh <- ch + <-ch + } if _, ok := s.mu.sessions[sid]; ok { return errors.Errorf("session %s already exists", sid) } @@ -63,6 +69,11 @@ func (s *FakeStorage) Update( ) (bool, error) { s.mu.Lock() defer s.mu.Unlock() + if s.BlockCh != nil { + ch := make(chan struct{}) + s.BlockCh <- ch + <-ch + } if _, ok := s.mu.sessions[sid]; !ok { return false, nil }