From b78b6c45d0dcae34230c1cc2dc9ea2a8c3f0c636 Mon Sep 17 00:00:00 2001 From: David Hartunian Date: Wed, 3 Aug 2022 13:44:49 -0400 Subject: [PATCH] sqlliveness: add timeouts to heartbeats Previously, sqlliveness heartbeat operations could block on the transactions that were involved. This change introduces some timeouts of the length of the heartbeat during the create and refresh operations. Resolves #85541 Release note: None Release justification: low-risk bugfix to existing functionality --- .../tenantcostclient/tenant_side_test.go | 4 +- pkg/sql/sqlliveness/slinstance/BUILD.bazel | 2 + pkg/sql/sqlliveness/slinstance/slinstance.go | 23 +++- .../sqlliveness/slinstance/slinstance_test.go | 106 +++++++++++++++++- pkg/sql/sqlliveness/slstorage/test_helpers.go | 35 +++++- 5 files changed, 160 insertions(+), 10 deletions(-) diff --git a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go index 94b5bf1f3f78..2999417f8939 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go +++ b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go @@ -930,7 +930,7 @@ func TestSQLLivenessExemption(t *testing.T) { // Make the tenant heartbeat like crazy. ctx := context.Background() //slinstance.DefaultTTL.Override(ctx, &st.SV, 20*time.Millisecond) - slinstance.DefaultHeartBeat.Override(ctx, &st.SV, time.Millisecond) + slinstance.DefaultHeartBeat.Override(ctx, &st.SV, 10*time.Millisecond) _, tenantDB := serverutils.StartTenant(t, hostServer, base.TestTenantArgs{ TenantID: tenantID, @@ -960,7 +960,7 @@ func TestSQLLivenessExemption(t *testing.T) { // Verify that heartbeats can go through and update the expiration time. val := livenessValue() - time.Sleep(2 * time.Millisecond) + time.Sleep(20 * time.Millisecond) testutils.SucceedsSoon( t, func() error { diff --git a/pkg/sql/sqlliveness/slinstance/BUILD.bazel b/pkg/sql/sqlliveness/slinstance/BUILD.bazel index 522c3e91a3ab..f62fa2ad2a68 100644 --- a/pkg/sql/sqlliveness/slinstance/BUILD.bazel +++ b/pkg/sql/sqlliveness/slinstance/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/sqlliveness", + "//pkg/util/contextutil", "//pkg/util/grpcutil", "//pkg/util/hlc", "//pkg/util/log", @@ -18,6 +19,7 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/uuid", + "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/sql/sqlliveness/slinstance/slinstance.go b/pkg/sql/sqlliveness/slinstance/slinstance.go index d76e8ca5a1c1..8e768dc30294 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -29,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" ) var ( @@ -253,8 +255,13 @@ func (l *Instance) heartbeatLoop(ctx context.Context) { t.Read = true s, _ := l.getSessionOrBlockCh() if s == nil { - newSession, err := l.createSession(ctx) - if err != nil { + var newSession *session + if err := contextutil.RunWithTimeout(ctx, "sqlliveness create session", l.hb(), func(ctx context.Context) error { + var err error + newSession, err = l.createSession(ctx) + return err + }); err != nil { + log.Errorf(ctx, "sqlliveness failed to create new session: %v", err) func() { l.mu.Lock() defer l.mu.Unlock() @@ -270,12 +277,20 @@ func (l *Instance) heartbeatLoop(ctx context.Context) { t.Reset(l.hb()) continue } - found, err := l.extendSession(ctx, s) - if err != nil { + var found bool + if err := contextutil.RunWithTimeout(ctx, "sqlliveness extend session", l.hb(), func(ctx context.Context) error { + var err error + found, err = l.extendSession(ctx, s) + return err + }); err != nil && !errors.HasType(err, (*contextutil.TimeoutError)(nil)) { + // Unable to extend session due to unknown error. + // Clear and stop heartbeat loop. + log.Errorf(ctx, "sqlliveness failed to extend session: %v", err) l.clearSession(ctx) return } if !found { + // No existing session found, immediately create one. l.clearSession(ctx) // Start next loop iteration immediately to insert a new session. t.Reset(0) diff --git a/pkg/sql/sqlliveness/slinstance/slinstance_test.go b/pkg/sql/sqlliveness/slinstance/slinstance_test.go index 75b069bab914..63b43303e04a 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance_test.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -40,8 +41,8 @@ func TestSQLInstance(t *testing.T) { clusterversion.TestingBinaryVersion, clusterversion.TestingBinaryMinSupportedVersion, true /* initializeVersion */) - slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Microsecond) - slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, time.Microsecond) + slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) + slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) fakeStorage := slstorage.NewFakeStorage() sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil) @@ -91,3 +92,104 @@ func TestSQLInstance(t *testing.T) { _, err = sqlInstance.Session(ctx) require.Error(t, err) } + +// TestSQLInstanceDeadlines tests that we have proper deadlines set on the +// create and extend session operations. This is done by blocking the fake +// storage layer and ensuring that no sessions get created because the +// timeouts are constantly triggered. +func TestSQLInstanceDeadlines(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx, stopper := context.Background(), stop.NewStopper() + defer stopper.Stop(ctx) + + clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */) + settings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.TestingBinaryMinSupportedVersion, + true /* initializeVersion */) + slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) + slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) + + fakeStorage := slstorage.NewFakeStorage() + // block the fake storage + fakeStorage.SetBlockCh() + cleanUpFunc := func() { + fakeStorage.CloseBlockCh() + } + defer cleanUpFunc() + + sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil) + sqlInstance.Start(ctx) + + // verify that we do not create a session + require.Never( + t, + func() bool { + _, err := sqlInstance.Session(ctx) + return err == nil + }, + 100*time.Millisecond, 10*time.Millisecond, + ) +} + +// TestSQLInstanceDeadlinesExtend tests that we have proper deadlines set on the +// create and extend session operations. This tests the case where the session is +// successfully created first and then blocks indefinitely. +func TestSQLInstanceDeadlinesExtend(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx, stopper := context.Background(), stop.NewStopper() + defer stopper.Stop(ctx) + + clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */) + settings := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.TestingBinaryMinSupportedVersion, + true /* initializeVersion */) + slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond) + // Must be shorter than the storage sleep amount below + slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond) + + fakeStorage := slstorage.NewFakeStorage() + sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil) + sqlInstance.Start(ctx) + + // verify that eventually session is created successfully + testutils.SucceedsSoon( + t, + func() error { + _, err := sqlInstance.Session(ctx) + return err + }, + ) + + // verify that session is also extended successfully a few times + require.Never( + t, + func() bool { + _, err := sqlInstance.Session(ctx) + return err != nil + }, + 100*time.Millisecond, 10*time.Millisecond, + ) + + // block the fake storage + fakeStorage.SetBlockCh() + cleanUpFunc := func() { + fakeStorage.CloseBlockCh() + } + defer cleanUpFunc() + + // expect subsequent create/extend calls to fail + require.Eventually( + t, + func() bool { + _, err := sqlInstance.Session(ctx) + return err != nil + }, + testutils.DefaultSucceedsSoonDuration, 10*time.Millisecond, + ) +} diff --git a/pkg/sql/sqlliveness/slstorage/test_helpers.go b/pkg/sql/sqlliveness/slstorage/test_helpers.go index 23eb1115f217..07df9a7935a5 100644 --- a/pkg/sql/sqlliveness/slstorage/test_helpers.go +++ b/pkg/sql/sqlliveness/slstorage/test_helpers.go @@ -24,6 +24,7 @@ type FakeStorage struct { mu struct { syncutil.Mutex sessions map[sqlliveness.SessionID]hlc.Timestamp + blockCh chan struct{} } } @@ -46,10 +47,18 @@ func (s *FakeStorage) IsAlive( // Insert implements the sqlliveness.Storage interface. func (s *FakeStorage) Insert( - _ context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp, + ctx context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp, ) error { s.mu.Lock() defer s.mu.Unlock() + if s.mu.blockCh != nil { + select { + case <-s.mu.blockCh: + break + case <-ctx.Done(): + return ctx.Err() + } + } if _, ok := s.mu.sessions[sid]; ok { return errors.Errorf("session %s already exists", sid) } @@ -59,10 +68,18 @@ func (s *FakeStorage) Insert( // Update implements the sqlliveness.Storage interface. func (s *FakeStorage) Update( - _ context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp, + ctx context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp, ) (bool, error) { s.mu.Lock() defer s.mu.Unlock() + if s.mu.blockCh != nil { + select { + case <-s.mu.blockCh: + break + case <-ctx.Done(): + return false, ctx.Err() + } + } if _, ok := s.mu.sessions[sid]; !ok { return false, nil } @@ -77,3 +94,17 @@ func (s *FakeStorage) Delete(_ context.Context, sid sqlliveness.SessionID) error delete(s.mu.sessions, sid) return nil } + +// SetBlockCh is used to block the storage for testing purposes +func (s *FakeStorage) SetBlockCh() { + s.mu.Lock() + defer s.mu.Unlock() + s.mu.blockCh = make(chan struct{}) +} + +// CloseBlockCh is used to unblock the storage for testing purposes +func (s *FakeStorage) CloseBlockCh() { + s.mu.Lock() + defer s.mu.Unlock() + close(s.mu.blockCh) +}