From 71e1af40a2b32ab040de0f83dfab036fd74928c1 Mon Sep 17 00:00:00 2001 From: rharding6373 Date: Fri, 24 Jun 2022 14:45:23 -0700 Subject: [PATCH] sql: initialize sql instance during instance provider start Before this change, there was a race condition where the instance provider and the instance reader would start before the instance provider created a SQL instance, potentially causing the reader to not cache the instance before initialization was complete. This is a problem in multi-tenant environments, where we may not be able to plan queries if the reader does not know of any SQL instances. This change moves sql instance initialization into the instance provider's `Start()` function before starting the reader, so that the instance is guaranteed to be visible to the reader on its first rangefeed scan of the `system.sql_instances` table. Release note: None --- ...tream_ingestion_frontier_processor_test.go | 4 ++ .../streamingest/stream_ingestion_job_test.go | 2 +- .../stream_ingestion_processor_test.go | 5 +++ .../stream_replication_e2e_test.go | 1 + pkg/server/server_sql.go | 22 ++++++---- .../instanceprovider/instanceprovider.go | 25 +++++++++-- .../instanceprovider/instanceprovider_test.go | 2 + .../instanceprovider/test_helpers.go | 8 +++- pkg/sql/sqlinstance/sqlinstance.go | 41 ++++++++++++++++++- pkg/sql/sqlliveness/slinstance/BUILD.bazel | 1 + pkg/sql/sqlliveness/slinstance/slinstance.go | 25 +++++++++++ 11 files changed, 121 insertions(+), 15 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go index bd12e04a42d6..425bbea0b555 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -40,6 +41,9 @@ type partitionToEvent map[string][]streamingccl.Event func TestStreamIngestionFrontierProcessor(t *testing.T) { defer leaktest.AfterTest(t)() + + skip.UnderRaceWithIssue(t, 83867) + ctx := context.Background() tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{}) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 1c55b5f67b65..259fab7f24e8 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -79,7 +79,7 @@ func verifyIngestionStats(t *testing.T, streamID int64, cutoverTime time.Time, s func TestTenantStreaming(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 82706) + skip.WithIssue(t, 83697) skip.UnderRace(t, "slow under race") diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index cd23cdff0c14..72ffb8d46d9f 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -157,6 +158,8 @@ func TestStreamIngestionProcessor(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderRaceWithIssue(t, 83867) + ctx := context.Background() tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{}) @@ -409,6 +412,8 @@ func TestRandomClientGeneration(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderRaceWithIssue(t, 83867) + ctx := context.Background() tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{}) diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index e2a9e8ad11ae..fc9d900cb036 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -210,6 +210,7 @@ func TestTenantStreamingSuccessfulIngestion(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderRaceWithIssue(t, 83867) skip.UnderStressRace(t, "slow under stressrace") dataSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 6d13540e8a99..8ec65555b678 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -404,9 +404,17 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.AmbientCtx, cfg.stopper, cfg.clock, cfg.db, codec, cfg.Settings, sqllivenessKnobs, ) - cfg.sqlInstanceProvider = instanceprovider.New( - cfg.stopper, cfg.db, codec, cfg.sqlLivenessProvider, cfg.advertiseAddr, cfg.Locality, cfg.rangeFeedFactory, cfg.clock, - ) + // If the node id is already populated, we only need to create a placeholder + // instance provider without initializing the instance, since this is not a + // SQL pod server. + _, isNotSQLPod := cfg.nodeIDContainer.OptionalNodeID() + if isNotSQLPod { + cfg.sqlInstanceProvider = sqlinstance.NewFakeSQLProvider() + } else { + cfg.sqlInstanceProvider = instanceprovider.New( + cfg.stopper, cfg.db, codec, cfg.sqlLivenessProvider, cfg.advertiseAddr, cfg.Locality, cfg.rangeFeedFactory, cfg.clock, + ) + } if !codec.ForSystemTenant() { // In a multi-tenant environment, use the sqlInstanceProvider to resolve @@ -1150,7 +1158,7 @@ func (s *SQLServer) startSQLLivenessAndInstanceProviders(ctx context.Context) er return nil } -func (s *SQLServer) initInstanceID(ctx context.Context) error { +func (s *SQLServer) setInstanceID(ctx context.Context) error { if _, ok := s.sqlIDContainer.OptionalNodeID(); ok { // sqlIDContainer has already been initialized with a node ID, // we don't need to initialize a SQL instance ID in this case @@ -1179,8 +1187,8 @@ func (s *SQLServer) preStart( socketFile string, orphanedLeasesTimeThresholdNanos int64, ) error { - // The sqlliveness and sqlinstance subsystem should be started first to ensure instance ID is - // initialized prior to any other systems that need it. + // The sqlliveness and sqlinstance subsystem should be started first to ensure + // the instance ID is initialized prior to any other systems that need it. if err := s.startSQLLivenessAndInstanceProviders(ctx); err != nil { return err } @@ -1191,7 +1199,7 @@ func (s *SQLServer) preStart( if err := maybeCheckTenantExists(ctx, s.execCfg.Codec, s.execCfg.DB); err != nil { return err } - if err := s.initInstanceID(ctx); err != nil { + if err := s.setInstanceID(ctx); err != nil { return err } s.connManager = connManager diff --git a/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go b/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go index 363573bf6583..5cae12392b36 100644 --- a/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go +++ b/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go @@ -36,7 +36,8 @@ type writer interface { ReleaseInstanceID(ctx context.Context, instanceID base.SQLInstanceID) error } -// provider implements the sqlinstance.Provider interface for access to the sqlinstance subsystem. +// provider implements the sqlinstance.Provider interface for access to the +// sqlinstance subsystem. type provider struct { *instancestorage.Reader storage writer @@ -85,6 +86,12 @@ func (p *provider) Start(ctx context.Context) error { if p.started() { return p.initError } + // Initialize the instance. We need to do this before starting the reader, so + // that the reader sees the instance. + if err := p.initAndWait(ctx); err != nil { + return err + } + if err := p.Reader.Start(ctx); err != nil { p.initOnce.Do(func() { p.initError = err @@ -110,21 +117,31 @@ func (p *provider) Instance( if !p.started() { return base.SQLInstanceID(0), "", sqlinstance.NotStartedError } - - p.maybeInitialize() select { case <-ctx.Done(): return base.SQLInstanceID(0), "", ctx.Err() case <-p.stopper.ShouldQuiesce(): return base.SQLInstanceID(0), "", stop.ErrUnavailable + case <-p.initialized: + return p.instanceID, p.sessionID, p.initError + } +} + +func (p *provider) initAndWait(ctx context.Context) error { + p.maybeInitialize() + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.stopper.ShouldQuiesce(): + return stop.ErrUnavailable case <-p.initialized: if p.initError == nil { log.Ops.Infof(ctx, "created SQL instance %d", p.instanceID) } else { log.Ops.Warningf(ctx, "error creating SQL instance: %s", p.initError) } - return p.instanceID, p.sessionID, p.initError } + return p.initError } func (p *provider) maybeInitialize() { diff --git a/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go b/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go index 1ccbcee894b0..15949afe64f7 100644 --- a/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go +++ b/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go @@ -65,6 +65,7 @@ func TestInstanceProvider(t *testing.T) { defer stopper.Stop(ctx) instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, addr) slInstance.Start(ctx) + instanceProvider.InitAndWaitForTest(ctx) instanceID, sessionID, err := instanceProvider.Instance(ctx) require.NoError(t, err) require.Equal(t, expectedInstanceID, instanceID) @@ -101,6 +102,7 @@ func TestInstanceProvider(t *testing.T) { instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, "addr") slInstance.Start(ctx) instanceProvider.ShutdownSQLInstanceForTest(ctx) + instanceProvider.InitAndWaitForTest(ctx) _, _, err := instanceProvider.Instance(ctx) require.Error(t, err) require.Equal(t, "instance never initialized", err.Error()) diff --git a/pkg/sql/sqlinstance/instanceprovider/test_helpers.go b/pkg/sql/sqlinstance/instanceprovider/test_helpers.go index dd83dd00b727..a1eb1c6ea42b 100644 --- a/pkg/sql/sqlinstance/instanceprovider/test_helpers.go +++ b/pkg/sql/sqlinstance/instanceprovider/test_helpers.go @@ -20,9 +20,10 @@ import ( ) // TestInstanceProvider exposes ShutdownSQLInstanceForTest -// method for testing purposes. +// and InitAndWaitForTest methods for testing purposes. type TestInstanceProvider interface { sqlinstance.Provider + InitAndWaitForTest(context.Context) ShutdownSQLInstanceForTest(context.Context) } @@ -43,6 +44,11 @@ func NewTestInstanceProvider( return p } +// InitAndWaitForTest explicitly calls initAndWait for testing purposes. +func (p *provider) InitAndWaitForTest(ctx context.Context) { + _ = p.initAndWait(ctx) +} + // ShutdownSQLInstanceForTest explicitly calls shutdownSQLInstance for testing purposes. func (p *provider) ShutdownSQLInstanceForTest(ctx context.Context) { p.shutdownSQLInstance(ctx) diff --git a/pkg/sql/sqlinstance/sqlinstance.go b/pkg/sql/sqlinstance/sqlinstance.go index 62d726e414b3..c7412f7c9a17 100644 --- a/pkg/sql/sqlinstance/sqlinstance.go +++ b/pkg/sql/sqlinstance/sqlinstance.go @@ -49,13 +49,50 @@ type Provider interface { // Instance returns the instance ID and sqlliveness.SessionID for the // current SQL instance. Instance(context.Context) (base.SQLInstanceID, sqlliveness.SessionID, error) - // Start starts the instanceprovider. This will block until - // the underlying instance data reader has been started. + // Start starts the instanceprovider and initializes the current SQL instance. + // This will block until the underlying instance data reader has been started. Start(context.Context) error } +// fakeSQLProvider implements the sqlinstance.Provider interface as a +// placeholder for an instance provider, when an instance provider must be +// instantiated for a non-SQL instance. It starts a Reader to provide the +// AddressResolver interface, but otherwise throws unsupported errors. +type fakeSQLProvider struct{} + +// NewFakeSQLProvider returns a new placeholder instance Provider. +func NewFakeSQLProvider() Provider { + return &fakeSQLProvider{} +} + +// Start implements the sqlinstance.Provider interface. +func (p *fakeSQLProvider) Start(ctx context.Context) error { + return nil +} + +// Instance implements the sqlinstance.Provider interface. +func (p *fakeSQLProvider) Instance( + ctx context.Context, +) (_ base.SQLInstanceID, _ sqlliveness.SessionID, err error) { + return base.SQLInstanceID(0), "", NotASQLInstanceError +} + +// GetInstance implements the AddressResolver interface. +func (p *fakeSQLProvider) GetInstance(context.Context, base.SQLInstanceID) (InstanceInfo, error) { + return InstanceInfo{}, NotASQLInstanceError +} + +// GetAllInstances implements the AddressResolver interface. +func (p *fakeSQLProvider) GetAllInstances(context.Context) ([]InstanceInfo, error) { + return nil, NotASQLInstanceError +} + // NonExistentInstanceError can be returned if a SQL instance does not exist. var NonExistentInstanceError = errors.Errorf("non existent SQL instance") // NotStartedError can be returned if the sqlinstance subsystem has not been started yet. var NotStartedError = errors.Errorf("sqlinstance subsystem not started") + +// NotASQLInstanceError can be returned if a function is is not supported for +// non-SQL instances. +var NotASQLInstanceError = errors.Errorf("not supported for non-SQL instance") diff --git a/pkg/sql/sqlliveness/slinstance/BUILD.bazel b/pkg/sql/sqlliveness/slinstance/BUILD.bazel index 97359e7bf94e..f067148fa918 100644 --- a/pkg/sql/sqlliveness/slinstance/BUILD.bazel +++ b/pkg/sql/sqlliveness/slinstance/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/sqlliveness", + "//pkg/util/grpcutil", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/retry", diff --git a/pkg/sql/sqlliveness/slinstance/slinstance.go b/pkg/sql/sqlliveness/slinstance/slinstance.go index 4c04835cd155..d76e8ca5a1c1 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -123,6 +124,7 @@ type Instance struct { ttl func() time.Duration hb func() time.Duration testKnobs sqlliveness.TestingKnobs + startErr error mu struct { started bool syncutil.Mutex @@ -188,6 +190,11 @@ func (l *Instance) createSession(ctx context.Context) (*session, error) { if everySecond.ShouldLog() { log.Errorf(ctx, "failed to create a session at %d-th attempt: %v", i, err) } + // Unauthenticated errors are unrecoverable, we should break instead + // of retrying. + if grpcutil.IsAuthError(err) { + break + } continue } break @@ -248,6 +255,15 @@ func (l *Instance) heartbeatLoop(ctx context.Context) { if s == nil { newSession, err := l.createSession(ctx) if err != nil { + func() { + l.mu.Lock() + defer l.mu.Unlock() + l.startErr = err + // There was an unrecoverable error when trying to + // create the session. Notify all calls to Session that + // the session failed. + close(l.mu.blockCh) + }() return } l.setSession(newSession) @@ -341,6 +357,15 @@ func (l *Instance) Session(ctx context.Context) (sqlliveness.Session, error) { case <-ctx.Done(): return nil, ctx.Err() case <-ch: + var err error + func() { + l.mu.Lock() + defer l.mu.Unlock() + err = l.startErr + }() + if err != nil { + return nil, err + } } } }