diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go index bd12e04a42d6..425bbea0b555 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -40,6 +41,9 @@ type partitionToEvent map[string][]streamingccl.Event func TestStreamIngestionFrontierProcessor(t *testing.T) { defer leaktest.AfterTest(t)() + + skip.UnderRaceWithIssue(t, 83867) + ctx := context.Background() tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{}) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 1c55b5f67b65..259fab7f24e8 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -79,7 +79,7 @@ func verifyIngestionStats(t *testing.T, streamID int64, cutoverTime time.Time, s func TestTenantStreaming(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 82706) + skip.WithIssue(t, 83697) skip.UnderRace(t, "slow under race") diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index cd23cdff0c14..72ffb8d46d9f 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -157,6 +158,8 @@ func TestStreamIngestionProcessor(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderRaceWithIssue(t, 83867) + ctx := context.Background() tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{}) @@ -409,6 +412,8 @@ func TestRandomClientGeneration(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderRaceWithIssue(t, 83867) + ctx := context.Background() tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{}) diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index e2a9e8ad11ae..fc9d900cb036 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -210,6 +210,7 @@ func TestTenantStreamingSuccessfulIngestion(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderRaceWithIssue(t, 83867) skip.UnderStressRace(t, "slow under stressrace") dataSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 6d13540e8a99..8ec65555b678 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -404,9 +404,17 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.AmbientCtx, cfg.stopper, cfg.clock, cfg.db, codec, cfg.Settings, sqllivenessKnobs, ) - cfg.sqlInstanceProvider = instanceprovider.New( - cfg.stopper, cfg.db, codec, cfg.sqlLivenessProvider, cfg.advertiseAddr, cfg.Locality, cfg.rangeFeedFactory, cfg.clock, - ) + // If the node id is already populated, we only need to create a placeholder + // instance provider without initializing the instance, since this is not a + // SQL pod server. + _, isNotSQLPod := cfg.nodeIDContainer.OptionalNodeID() + if isNotSQLPod { + cfg.sqlInstanceProvider = sqlinstance.NewFakeSQLProvider() + } else { + cfg.sqlInstanceProvider = instanceprovider.New( + cfg.stopper, cfg.db, codec, cfg.sqlLivenessProvider, cfg.advertiseAddr, cfg.Locality, cfg.rangeFeedFactory, cfg.clock, + ) + } if !codec.ForSystemTenant() { // In a multi-tenant environment, use the sqlInstanceProvider to resolve @@ -1150,7 +1158,7 @@ func (s *SQLServer) startSQLLivenessAndInstanceProviders(ctx context.Context) er return nil } -func (s *SQLServer) initInstanceID(ctx context.Context) error { +func (s *SQLServer) setInstanceID(ctx context.Context) error { if _, ok := s.sqlIDContainer.OptionalNodeID(); ok { // sqlIDContainer has already been initialized with a node ID, // we don't need to initialize a SQL instance ID in this case @@ -1179,8 +1187,8 @@ func (s *SQLServer) preStart( socketFile string, orphanedLeasesTimeThresholdNanos int64, ) error { - // The sqlliveness and sqlinstance subsystem should be started first to ensure instance ID is - // initialized prior to any other systems that need it. + // The sqlliveness and sqlinstance subsystem should be started first to ensure + // the instance ID is initialized prior to any other systems that need it. if err := s.startSQLLivenessAndInstanceProviders(ctx); err != nil { return err } @@ -1191,7 +1199,7 @@ func (s *SQLServer) preStart( if err := maybeCheckTenantExists(ctx, s.execCfg.Codec, s.execCfg.DB); err != nil { return err } - if err := s.initInstanceID(ctx); err != nil { + if err := s.setInstanceID(ctx); err != nil { return err } s.connManager = connManager diff --git a/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go b/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go index 363573bf6583..5cae12392b36 100644 --- a/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go +++ b/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go @@ -36,7 +36,8 @@ type writer interface { ReleaseInstanceID(ctx context.Context, instanceID base.SQLInstanceID) error } -// provider implements the sqlinstance.Provider interface for access to the sqlinstance subsystem. +// provider implements the sqlinstance.Provider interface for access to the +// sqlinstance subsystem. type provider struct { *instancestorage.Reader storage writer @@ -85,6 +86,12 @@ func (p *provider) Start(ctx context.Context) error { if p.started() { return p.initError } + // Initialize the instance. We need to do this before starting the reader, so + // that the reader sees the instance. + if err := p.initAndWait(ctx); err != nil { + return err + } + if err := p.Reader.Start(ctx); err != nil { p.initOnce.Do(func() { p.initError = err @@ -110,21 +117,31 @@ func (p *provider) Instance( if !p.started() { return base.SQLInstanceID(0), "", sqlinstance.NotStartedError } - - p.maybeInitialize() select { case <-ctx.Done(): return base.SQLInstanceID(0), "", ctx.Err() case <-p.stopper.ShouldQuiesce(): return base.SQLInstanceID(0), "", stop.ErrUnavailable + case <-p.initialized: + return p.instanceID, p.sessionID, p.initError + } +} + +func (p *provider) initAndWait(ctx context.Context) error { + p.maybeInitialize() + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.stopper.ShouldQuiesce(): + return stop.ErrUnavailable case <-p.initialized: if p.initError == nil { log.Ops.Infof(ctx, "created SQL instance %d", p.instanceID) } else { log.Ops.Warningf(ctx, "error creating SQL instance: %s", p.initError) } - return p.instanceID, p.sessionID, p.initError } + return p.initError } func (p *provider) maybeInitialize() { diff --git a/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go b/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go index 1ccbcee894b0..15949afe64f7 100644 --- a/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go +++ b/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go @@ -65,6 +65,7 @@ func TestInstanceProvider(t *testing.T) { defer stopper.Stop(ctx) instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, addr) slInstance.Start(ctx) + instanceProvider.InitAndWaitForTest(ctx) instanceID, sessionID, err := instanceProvider.Instance(ctx) require.NoError(t, err) require.Equal(t, expectedInstanceID, instanceID) @@ -101,6 +102,7 @@ func TestInstanceProvider(t *testing.T) { instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, "addr") slInstance.Start(ctx) instanceProvider.ShutdownSQLInstanceForTest(ctx) + instanceProvider.InitAndWaitForTest(ctx) _, _, err := instanceProvider.Instance(ctx) require.Error(t, err) require.Equal(t, "instance never initialized", err.Error()) diff --git a/pkg/sql/sqlinstance/instanceprovider/test_helpers.go b/pkg/sql/sqlinstance/instanceprovider/test_helpers.go index dd83dd00b727..a1eb1c6ea42b 100644 --- a/pkg/sql/sqlinstance/instanceprovider/test_helpers.go +++ b/pkg/sql/sqlinstance/instanceprovider/test_helpers.go @@ -20,9 +20,10 @@ import ( ) // TestInstanceProvider exposes ShutdownSQLInstanceForTest -// method for testing purposes. +// and InitAndWaitForTest methods for testing purposes. type TestInstanceProvider interface { sqlinstance.Provider + InitAndWaitForTest(context.Context) ShutdownSQLInstanceForTest(context.Context) } @@ -43,6 +44,11 @@ func NewTestInstanceProvider( return p } +// InitAndWaitForTest explicitly calls initAndWait for testing purposes. +func (p *provider) InitAndWaitForTest(ctx context.Context) { + _ = p.initAndWait(ctx) +} + // ShutdownSQLInstanceForTest explicitly calls shutdownSQLInstance for testing purposes. func (p *provider) ShutdownSQLInstanceForTest(ctx context.Context) { p.shutdownSQLInstance(ctx) diff --git a/pkg/sql/sqlinstance/sqlinstance.go b/pkg/sql/sqlinstance/sqlinstance.go index 62d726e414b3..c7412f7c9a17 100644 --- a/pkg/sql/sqlinstance/sqlinstance.go +++ b/pkg/sql/sqlinstance/sqlinstance.go @@ -49,13 +49,50 @@ type Provider interface { // Instance returns the instance ID and sqlliveness.SessionID for the // current SQL instance. Instance(context.Context) (base.SQLInstanceID, sqlliveness.SessionID, error) - // Start starts the instanceprovider. This will block until - // the underlying instance data reader has been started. + // Start starts the instanceprovider and initializes the current SQL instance. + // This will block until the underlying instance data reader has been started. Start(context.Context) error } +// fakeSQLProvider implements the sqlinstance.Provider interface as a +// placeholder for an instance provider, when an instance provider must be +// instantiated for a non-SQL instance. It starts a Reader to provide the +// AddressResolver interface, but otherwise throws unsupported errors. +type fakeSQLProvider struct{} + +// NewFakeSQLProvider returns a new placeholder instance Provider. +func NewFakeSQLProvider() Provider { + return &fakeSQLProvider{} +} + +// Start implements the sqlinstance.Provider interface. +func (p *fakeSQLProvider) Start(ctx context.Context) error { + return nil +} + +// Instance implements the sqlinstance.Provider interface. +func (p *fakeSQLProvider) Instance( + ctx context.Context, +) (_ base.SQLInstanceID, _ sqlliveness.SessionID, err error) { + return base.SQLInstanceID(0), "", NotASQLInstanceError +} + +// GetInstance implements the AddressResolver interface. +func (p *fakeSQLProvider) GetInstance(context.Context, base.SQLInstanceID) (InstanceInfo, error) { + return InstanceInfo{}, NotASQLInstanceError +} + +// GetAllInstances implements the AddressResolver interface. +func (p *fakeSQLProvider) GetAllInstances(context.Context) ([]InstanceInfo, error) { + return nil, NotASQLInstanceError +} + // NonExistentInstanceError can be returned if a SQL instance does not exist. var NonExistentInstanceError = errors.Errorf("non existent SQL instance") // NotStartedError can be returned if the sqlinstance subsystem has not been started yet. var NotStartedError = errors.Errorf("sqlinstance subsystem not started") + +// NotASQLInstanceError can be returned if a function is is not supported for +// non-SQL instances. +var NotASQLInstanceError = errors.Errorf("not supported for non-SQL instance") diff --git a/pkg/sql/sqlliveness/slinstance/BUILD.bazel b/pkg/sql/sqlliveness/slinstance/BUILD.bazel index 97359e7bf94e..f067148fa918 100644 --- a/pkg/sql/sqlliveness/slinstance/BUILD.bazel +++ b/pkg/sql/sqlliveness/slinstance/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/sqlliveness", + "//pkg/util/grpcutil", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/retry", diff --git a/pkg/sql/sqlliveness/slinstance/slinstance.go b/pkg/sql/sqlliveness/slinstance/slinstance.go index 4c04835cd155..d76e8ca5a1c1 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -123,6 +124,7 @@ type Instance struct { ttl func() time.Duration hb func() time.Duration testKnobs sqlliveness.TestingKnobs + startErr error mu struct { started bool syncutil.Mutex @@ -188,6 +190,11 @@ func (l *Instance) createSession(ctx context.Context) (*session, error) { if everySecond.ShouldLog() { log.Errorf(ctx, "failed to create a session at %d-th attempt: %v", i, err) } + // Unauthenticated errors are unrecoverable, we should break instead + // of retrying. + if grpcutil.IsAuthError(err) { + break + } continue } break @@ -248,6 +255,15 @@ func (l *Instance) heartbeatLoop(ctx context.Context) { if s == nil { newSession, err := l.createSession(ctx) if err != nil { + func() { + l.mu.Lock() + defer l.mu.Unlock() + l.startErr = err + // There was an unrecoverable error when trying to + // create the session. Notify all calls to Session that + // the session failed. + close(l.mu.blockCh) + }() return } l.setSession(newSession) @@ -341,6 +357,15 @@ func (l *Instance) Session(ctx context.Context) (sqlliveness.Session, error) { case <-ctx.Done(): return nil, ctx.Err() case <-ch: + var err error + func() { + l.mu.Lock() + defer l.mu.Unlock() + err = l.startErr + }() + if err != nil { + return nil, err + } } } }