Skip to content

Commit

Permalink
sqlliveness: add timeouts to heartbeats
Browse files Browse the repository at this point in the history
Previously, sqlliveness heartbeat operations could block on the transactions
that were involved. This change introduces some timeouts of the length of the
heartbeat during the create and refresh operations.

Resolves cockroachdb#85541

Release note: None

Release justification: low-risk bugfix to existing functionality
  • Loading branch information
dhartunian authored and aadityasondhi committed Sep 16, 2022
1 parent 9a05046 commit b78b6c4
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 10 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ func TestSQLLivenessExemption(t *testing.T) {
// Make the tenant heartbeat like crazy.
ctx := context.Background()
//slinstance.DefaultTTL.Override(ctx, &st.SV, 20*time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &st.SV, time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &st.SV, 10*time.Millisecond)

_, tenantDB := serverutils.StartTenant(t, hostServer, base.TestTenantArgs{
TenantID: tenantID,
Expand Down Expand Up @@ -960,7 +960,7 @@ func TestSQLLivenessExemption(t *testing.T) {

// Verify that heartbeats can go through and update the expiration time.
val := livenessValue()
time.Sleep(2 * time.Millisecond)
time.Sleep(20 * time.Millisecond)
testutils.SucceedsSoon(
t,
func() error {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sqlliveness/slinstance/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/sqlliveness",
"//pkg/util/contextutil",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/log",
Expand All @@ -18,6 +19,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
],
)

Expand Down
23 changes: 19 additions & 4 deletions pkg/sql/sqlliveness/slinstance/slinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

var (
Expand Down Expand Up @@ -253,8 +255,13 @@ func (l *Instance) heartbeatLoop(ctx context.Context) {
t.Read = true
s, _ := l.getSessionOrBlockCh()
if s == nil {
newSession, err := l.createSession(ctx)
if err != nil {
var newSession *session
if err := contextutil.RunWithTimeout(ctx, "sqlliveness create session", l.hb(), func(ctx context.Context) error {
var err error
newSession, err = l.createSession(ctx)
return err
}); err != nil {
log.Errorf(ctx, "sqlliveness failed to create new session: %v", err)
func() {
l.mu.Lock()
defer l.mu.Unlock()
Expand All @@ -270,12 +277,20 @@ func (l *Instance) heartbeatLoop(ctx context.Context) {
t.Reset(l.hb())
continue
}
found, err := l.extendSession(ctx, s)
if err != nil {
var found bool
if err := contextutil.RunWithTimeout(ctx, "sqlliveness extend session", l.hb(), func(ctx context.Context) error {
var err error
found, err = l.extendSession(ctx, s)
return err
}); err != nil && !errors.HasType(err, (*contextutil.TimeoutError)(nil)) {
// Unable to extend session due to unknown error.
// Clear and stop heartbeat loop.
log.Errorf(ctx, "sqlliveness failed to extend session: %v", err)
l.clearSession(ctx)
return
}
if !found {
// No existing session found, immediately create one.
l.clearSession(ctx)
// Start next loop iteration immediately to insert a new session.
t.Reset(0)
Expand Down
106 changes: 104 additions & 2 deletions pkg/sql/sqlliveness/slinstance/slinstance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -40,8 +41,8 @@ func TestSQLInstance(t *testing.T) {
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Microsecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, time.Microsecond)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond)

fakeStorage := slstorage.NewFakeStorage()
sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
Expand Down Expand Up @@ -91,3 +92,104 @@ func TestSQLInstance(t *testing.T) {
_, err = sqlInstance.Session(ctx)
require.Error(t, err)
}

// TestSQLInstanceDeadlines tests that we have proper deadlines set on the
// create and extend session operations. This is done by blocking the fake
// storage layer and ensuring that no sessions get created because the
// timeouts are constantly triggered.
func TestSQLInstanceDeadlines(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, stopper := context.Background(), stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */)
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond)

fakeStorage := slstorage.NewFakeStorage()
// block the fake storage
fakeStorage.SetBlockCh()
cleanUpFunc := func() {
fakeStorage.CloseBlockCh()
}
defer cleanUpFunc()

sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
sqlInstance.Start(ctx)

// verify that we do not create a session
require.Never(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
return err == nil
},
100*time.Millisecond, 10*time.Millisecond,
)
}

// TestSQLInstanceDeadlinesExtend tests that we have proper deadlines set on the
// create and extend session operations. This tests the case where the session is
// successfully created first and then blocks indefinitely.
func TestSQLInstanceDeadlinesExtend(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, stopper := context.Background(), stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */)
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 20*time.Millisecond)
// Must be shorter than the storage sleep amount below
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 10*time.Millisecond)

fakeStorage := slstorage.NewFakeStorage()
sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
sqlInstance.Start(ctx)

// verify that eventually session is created successfully
testutils.SucceedsSoon(
t,
func() error {
_, err := sqlInstance.Session(ctx)
return err
},
)

// verify that session is also extended successfully a few times
require.Never(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
return err != nil
},
100*time.Millisecond, 10*time.Millisecond,
)

// block the fake storage
fakeStorage.SetBlockCh()
cleanUpFunc := func() {
fakeStorage.CloseBlockCh()
}
defer cleanUpFunc()

// expect subsequent create/extend calls to fail
require.Eventually(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
return err != nil
},
testutils.DefaultSucceedsSoonDuration, 10*time.Millisecond,
)
}
35 changes: 33 additions & 2 deletions pkg/sql/sqlliveness/slstorage/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type FakeStorage struct {
mu struct {
syncutil.Mutex
sessions map[sqlliveness.SessionID]hlc.Timestamp
blockCh chan struct{}
}
}

Expand All @@ -46,10 +47,18 @@ func (s *FakeStorage) IsAlive(

// Insert implements the sqlliveness.Storage interface.
func (s *FakeStorage) Insert(
_ context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp,
ctx context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp,
) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.mu.blockCh != nil {
select {
case <-s.mu.blockCh:
break
case <-ctx.Done():
return ctx.Err()
}
}
if _, ok := s.mu.sessions[sid]; ok {
return errors.Errorf("session %s already exists", sid)
}
Expand All @@ -59,10 +68,18 @@ func (s *FakeStorage) Insert(

// Update implements the sqlliveness.Storage interface.
func (s *FakeStorage) Update(
_ context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp,
ctx context.Context, sid sqlliveness.SessionID, expiration hlc.Timestamp,
) (bool, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.mu.blockCh != nil {
select {
case <-s.mu.blockCh:
break
case <-ctx.Done():
return false, ctx.Err()
}
}
if _, ok := s.mu.sessions[sid]; !ok {
return false, nil
}
Expand All @@ -77,3 +94,17 @@ func (s *FakeStorage) Delete(_ context.Context, sid sqlliveness.SessionID) error
delete(s.mu.sessions, sid)
return nil
}

// SetBlockCh is used to block the storage for testing purposes
func (s *FakeStorage) SetBlockCh() {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.blockCh = make(chan struct{})
}

// CloseBlockCh is used to unblock the storage for testing purposes
func (s *FakeStorage) CloseBlockCh() {
s.mu.Lock()
defer s.mu.Unlock()
close(s.mu.blockCh)
}

0 comments on commit b78b6c4

Please sign in to comment.