Skip to content

Commit

Permalink
sqlliveness: add timeouts to heartbeats
Browse files Browse the repository at this point in the history
Previously, sqlliveness heartbeat operations could block on the transactions
that were involved. This change introduces some timeouts of the length of the
heartbeat during the create and refresh operations.

Resolves cockroachdb#85541

Release note: None

Release justification: low-risk bugfix to existing functionality
  • Loading branch information
dhartunian committed Sep 6, 2022
1 parent 0197ada commit ad0fe90
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 4 deletions.
2 changes: 2 additions & 0 deletions pkg/sql/sqlliveness/slinstance/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/sqlliveness",
"//pkg/util/contextutil",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/log",
Expand All @@ -35,6 +36,7 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/slstorage",
"//pkg/util/contextutil",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
67 changes: 64 additions & 3 deletions pkg/sql/sqlliveness/slinstance/slinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ package slinstance

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -29,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

var (
Expand Down Expand Up @@ -237,6 +240,56 @@ func (l *Instance) extendSession(ctx context.Context, s *session) (bool, error)
return true, nil
}

type TimeoutError struct{}

func (t TimeoutError) Error() string {
return "session timeout"
}

var _ error = &TimeoutError{}

func (l *Instance) createSessionWithTimeout(
ctx context.Context, timeout time.Duration,
) (*session, error) {
var s *session
var err error
var createChan = make(chan struct{})
go func() {
s, err = l.createSession(ctx)
close(createChan)
}()

t := timeutil.NewTimer()
t.Reset(timeout)
select {
case <-t.C:
return nil, &TimeoutError{}
case <-createChan:
return s, err
}
}

func (l *Instance) extendSessionWithTimeout(
ctx context.Context, s *session, timeout time.Duration,
) (bool, error) {
var found bool
var err error
var createChan = make(chan struct{})
go func() {
found, err = l.extendSession(ctx, s)
close(createChan)
}()

t := timeutil.NewTimer()
t.Reset(timeout)
select {
case <-t.C:
return false, &TimeoutError{}
case <-createChan:
return found, err
}
}

func (l *Instance) heartbeatLoop(ctx context.Context) {
defer func() {
log.Warning(ctx, "exiting heartbeat loop")
Expand All @@ -253,8 +306,10 @@ func (l *Instance) heartbeatLoop(ctx context.Context) {
t.Read = true
s, _ := l.getSessionOrBlockCh()
if s == nil {
newSession, err := l.createSession(ctx)
var newSession *session
newSession, err := l.createSessionWithTimeout(ctx, l.hb())
if err != nil {
fmt.Printf("ERORR ON NEW SESSION: %v\n", err)
func() {
l.mu.Lock()
defer l.mu.Unlock()
Expand All @@ -266,21 +321,27 @@ func (l *Instance) heartbeatLoop(ctx context.Context) {
}()
return
}
fmt.Printf("SUCCESS??? ON NEW SESSION: %v\n", newSession)
l.setSession(newSession)
t.Reset(l.hb())
continue
}
found, err := l.extendSession(ctx, s)
if err != nil {
found, err := l.extendSessionWithTimeout(ctx, s, l.hb())
if err != nil && !errors.HasType(err, (*contextutil.TimeoutError)(nil)) {
// Unable to extend session due to unknown error.
// Clear and stop heartbeat loop.
fmt.Printf("ERORR ON EXTEND SESSION: %v\n", err)
l.clearSession(ctx)
return
}
if !found {
// No existing session found, immediately create one.
l.clearSession(ctx)
// Start next loop iteration immediately to insert a new session.
t.Reset(0)
continue
}
fmt.Printf("SUCCESS??? ON EXTEND SESSION: %v\n", found)
if log.V(2) {
log.Infof(ctx, "extended SQL liveness session %s", s.ID())
}
Expand Down
85 changes: 85 additions & 0 deletions pkg/sql/sqlliveness/slinstance/slinstance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,88 @@ func TestSQLInstance(t *testing.T) {
_, err = sqlInstance.Session(ctx)
require.Error(t, err)
}

// TestSQLInstanceDeadlines tests that we have proper deadlines set on the
// create and extend session operations. This is done by inserting delays into
// the fake storage layer and ensuring that no sessions get created because the
// timeouts are constantly triggered.
func TestSQLInstanceDeadlines(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, stopper := context.Background(), stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */)
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 1*time.Second)
// Must be shorter than the storage sleep amount below
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 1*time.Second)

fakeStorage := slstorage.NewFakeStorage()
fakeStorage.InsertSleep = 2 * time.Second
sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
sqlInstance.Start(ctx)

require.Never(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
if err != nil {
return false
}
return true
},
50*time.Millisecond, 1*time.Millisecond,
)
}

func TestSQLInstanceDeadlinesExtend(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, stopper := context.Background(), stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */)
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 1*time.Second)
// Must be shorter than the storage sleep amount below
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 250*time.Millisecond)

fakeStorage := slstorage.NewFakeStorage()
sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
sqlInstance.Start(ctx)

require.Eventually(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
if err != nil {
return false
}
return true
},
50*time.Millisecond, 1*time.Millisecond,
)

fakeStorage.InsertSleep = 2 * time.Second

require.Eventually(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
if err != nil {
return true
}
return false
},
1*time.Second, 1*time.Millisecond,
)
}
6 changes: 5 additions & 1 deletion pkg/sql/sqlliveness/slstorage/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package slstorage

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -21,7 +22,8 @@ import (

// FakeStorage implements the sqlliveness.Storage interface.
type FakeStorage struct {
mu struct {
InsertSleep time.Duration
mu struct {
syncutil.Mutex
sessions map[sqlliveness.SessionID]hlc.Timestamp
}
Expand Down Expand Up @@ -50,6 +52,7 @@ func (s *FakeStorage) Insert(
) error {
s.mu.Lock()
defer s.mu.Unlock()
time.Sleep(s.InsertSleep)
if _, ok := s.mu.sessions[sid]; ok {
return errors.Errorf("session %s already exists", sid)
}
Expand All @@ -63,6 +66,7 @@ func (s *FakeStorage) Update(
) (bool, error) {
s.mu.Lock()
defer s.mu.Unlock()
time.Sleep(s.InsertSleep)
if _, ok := s.mu.sessions[sid]; !ok {
return false, nil
}
Expand Down

0 comments on commit ad0fe90

Please sign in to comment.