diff --git a/README.md b/README.md index ad1b9b0..825c282 100644 --- a/README.md +++ b/README.md @@ -176,23 +176,18 @@ One of exponential/constant backoff policies can be provided for the Kafka commi
Search Batch Indexer -| Environment Variable | Default | Required | Description | -| ------------------------------------------------------------ | ------- | -------- | -------------------------------------------------------------------------------------------------------------- | -| PGSTREAM_OPENSEARCH_STORE_URL | N/A | Yes | URL for the opensearch store to connect to (at least one of the URLs must be provided). | -| PGSTREAM_ELASTICSEARCH_STORE_URL | N/A | Yes | URL for the elasticsearch store to connect to (at least one of the URLs must be provided). | -| PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT | 1s | No | Max time interval at which the batch sending to the search store is triggered. | -| PGSTREAM_SEARCH_INDEXER_BATCH_SIZE | 100 | No | Max number of messages to be sent per batch. When this size is reached, the batch is sent to the search store. | -| PGSTREAM_SEARCH_INDEXER_MAX_QUEUE_BYTES | 100MiB | No | Max memory used by the search batch indexer for inflight batches. | -| PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_INITIAL_INTERVAL | 0 | No | Initial interval for the exponential backoff policy to be applied to the search indexer cleanup retries. | -| PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_MAX_INTERVAL | 0 | No | Max interval for the exponential backoff policy to be applied to the search indexer cleanup retries. | -| PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the exponential backoff policy to be applied to the search indexer cleanup retries. | -| PGSTREAM_SEARCH_INDEXER_CLEANUP_BACKOFF_INTERVAL | 0 | No | Constant interval for the backoff policy to be applied to the search indexer cleanup retries. | -| PGSTREAM_SEARCH_INDEXER_CLEANUP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the backoff policy to be applied to the search indexer cleanup retries. | -| PGSTREAM_SEARCH_STORE_EXP_BACKOFF_INITIAL_INTERVAL | 1s | No | Initial interval for the exponential backoff policy to be applied to the search store operation retries. | -| PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_INTERVAL | 1min | No | Max interval for the exponential backoff policy to be applied to the search store operation retries. | -| PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the exponential backoff policy to be applied to the search store operation retries. | -| PGSTREAM_SEARCH_STORE_BACKOFF_INTERVAL | 0 | No | Constant interval for the backoff policy to be applied to the search store operation retries. | -| PGSTREAM_SEARCH_STORE_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the backoff policy to be applied to the search store operation retries. | +| Environment Variable | Default | Required | Description | +| -------------------------------------------------- | ------- | -------- | -------------------------------------------------------------------------------------------------------------- | --- | +| PGSTREAM_OPENSEARCH_STORE_URL | N/A | Yes | URL for the opensearch store to connect to (at least one of the URLs must be provided). | +| PGSTREAM_ELASTICSEARCH_STORE_URL | N/A | Yes | URL for the elasticsearch store to connect to (at least one of the URLs must be provided). | +| PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT | 1s | No | Max time interval at which the batch sending to the search store is triggered. | +| PGSTREAM_SEARCH_INDEXER_BATCH_SIZE | 100 | No | Max number of messages to be sent per batch. When this size is reached, the batch is sent to the search store. | +| PGSTREAM_SEARCH_INDEXER_MAX_QUEUE_BYTES | 100MiB | No | Max memory used by the search batch indexer for inflight batches. | | +| PGSTREAM_SEARCH_STORE_EXP_BACKOFF_INITIAL_INTERVAL | 1s | No | Initial interval for the exponential backoff policy to be applied to the search store operation retries. | +| PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_INTERVAL | 1min | No | Max interval for the exponential backoff policy to be applied to the search store operation retries. | +| PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the exponential backoff policy to be applied to the search store operation retries. | +| PGSTREAM_SEARCH_STORE_BACKOFF_INTERVAL | 0 | No | Constant interval for the backoff policy to be applied to the search store operation retries. | +| PGSTREAM_SEARCH_STORE_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the backoff policy to be applied to the search store operation retries. | One of exponential/constant backoff policies can be provided for the search indexer cleanup retry strategy. If none is provided, no retries apply. diff --git a/cmd/config.go b/cmd/config.go index 2fab128..ac7ec76 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -156,10 +156,9 @@ func parseSearchProcessorConfig() *stream.SearchProcessorConfig { return &stream.SearchProcessorConfig{ Indexer: search.IndexerConfig{ - BatchSize: viper.GetInt("PGSTREAM_SEARCH_INDEXER_BATCH_SIZE"), - BatchTime: viper.GetDuration("PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT"), - MaxQueueBytes: viper.GetInt64("PGSTREAM_SEARCH_INDEXER_MAX_QUEUE_BYTES"), - CleanupBackoff: parseBackoffConfig("PGSTREAM_SEARCH_INDEXER_CLEANUP"), + BatchSize: viper.GetInt("PGSTREAM_SEARCH_INDEXER_BATCH_SIZE"), + BatchTime: viper.GetDuration("PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT"), + MaxQueueBytes: viper.GetInt64("PGSTREAM_SEARCH_INDEXER_MAX_QUEUE_BYTES"), }, Store: store.Config{ OpenSearchURL: opensearchStore, diff --git a/kafka2os.env b/kafka2os.env index cfb226a..b8613ee 100644 --- a/kafka2os.env +++ b/kafka2os.env @@ -10,9 +10,6 @@ PGSTREAM_KAFKA_COMMIT_BACKOFF_MAX_RETRIES=60 # Processor config PGSTREAM_SEARCH_INDEXER_BATCH_SIZE=100 PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT=5s -PGSTREAM_SEARCH_INDEXER_CLEANUP_BACKOFF_INITIAL_INTERVAL=1s -PGSTREAM_SEARCH_INDEXER_CLEANUP_BACKOFF_MAX_INTERVAL=1m -PGSTREAM_SEARCH_INDEXER_CLEANUP_BACKOFF_MAX_RETRIES=60 PGSTREAM_OPENSEARCH_STORE_URL="http://admin:admin@localhost:9200" PGSTREAM_SEARCH_STORE_BACKOFF_INITIAL_INTERVAL=1s PGSTREAM_SEARCH_STORE_BACKOFF_MAX_INTERVAL=1m diff --git a/pg2os.env b/pg2os.env index 8707293..03d2cf8 100644 --- a/pg2os.env +++ b/pg2os.env @@ -5,9 +5,6 @@ PGSTREAM_POSTGRES_LISTENER_URL="postgres://postgres:postgres@localhost?sslmode=d PGSTREAM_TRANSLATOR_STORE_POSTGRES_URL="postgres://postgres:postgres@localhost?sslmode=disable" PGSTREAM_SEARCH_INDEXER_BATCH_SIZE=100 PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT=5s -PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_INITIAL_INTERVAL=1s -PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_MAX_INTERVAL=1m -PGSTREAM_SEARCH_INDEXER_CLEANUP_EXP_BACKOFF_MAX_RETRIES=60 PGSTREAM_OPENSEARCH_STORE_URL="http://admin:admin@localhost:9200" PGSTREAM_SEARCH_STORE_EXP_BACKOFF_INITIAL_INTERVAL=1s PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_INTERVAL=1m diff --git a/pkg/wal/processor/search/config.go b/pkg/wal/processor/search/config.go index 16db131..1d52ab4 100644 --- a/pkg/wal/processor/search/config.go +++ b/pkg/wal/processor/search/config.go @@ -4,8 +4,6 @@ package search import ( "time" - - "github.com/xataio/pgstream/pkg/backoff" ) type IndexerConfig struct { @@ -18,9 +16,6 @@ type IndexerConfig struct { // MaxQueueBytes is the max memory used by the batch indexer for inflight // batches. Defaults to 100MiB MaxQueueBytes int64 - // CleanupBackoff is the retry policy to follow for the async index - // deletion. If no config is provided, no retry policy is applied. - CleanupBackoff backoff.Config } const ( diff --git a/pkg/wal/processor/search/helper_test.go b/pkg/wal/processor/search/helper_test.go index ba4b1a5..e5e46d2 100644 --- a/pkg/wal/processor/search/helper_test.go +++ b/pkg/wal/processor/search/helper_test.go @@ -24,10 +24,11 @@ func (m *mockAdapter) walEventToMsg(e *wal.Event) (*msg, error) { type mockStore struct { getMapperFn func() Mapper applySchemaChangeFn func(ctx context.Context, le *schemalog.LogEntry) error - deleteSchemaFn func(ctx context.Context, schemaName string) error + deleteSchemaFn func(ctx context.Context, i uint, schemaName string) error deleteTableDocumentsFn func(ctx context.Context, schemaName string, tableIDs []string) error sendDocumentsFn func(ctx context.Context, i uint, docs []Document) ([]DocumentError, error) sendDocumentsCalls uint + deleteSchemaCalls uint } func (m *mockStore) GetMapper() Mapper { @@ -39,7 +40,8 @@ func (m *mockStore) ApplySchemaChange(ctx context.Context, le *schemalog.LogEntr } func (m *mockStore) DeleteSchema(ctx context.Context, schemaName string) error { - return m.deleteSchemaFn(ctx, schemaName) + m.deleteSchemaCalls++ + return m.deleteSchemaFn(ctx, m.deleteSchemaCalls, schemaName) } func (m *mockStore) DeleteTableDocuments(ctx context.Context, schemaName string, tableIDs []string) error { @@ -51,24 +53,6 @@ func (m *mockStore) SendDocuments(ctx context.Context, docs []Document) ([]Docum return m.sendDocumentsFn(ctx, m.sendDocumentsCalls, docs) } -type mockCleaner struct { - deleteSchemaFn func(context.Context, string) error - startFn func(context.Context) - stopFn func() -} - -func (m *mockCleaner) deleteSchema(ctx context.Context, schema string) error { - return m.deleteSchemaFn(ctx, schema) -} - -func (m *mockCleaner) start(ctx context.Context) { - m.startFn(ctx) -} - -func (m *mockCleaner) stop() { - m.stopFn() -} - const ( testSchemaName = "test_schema" testTableName = "test_table" diff --git a/pkg/wal/processor/search/search_batch_indexer.go b/pkg/wal/processor/search/search_batch_indexer.go index 98214be..cdc0272 100644 --- a/pkg/wal/processor/search/search_batch_indexer.go +++ b/pkg/wal/processor/search/search_batch_indexer.go @@ -38,8 +38,6 @@ type BatchIndexer struct { // checkpoint callback to mark what was safely stored checkpoint checkpointer.Checkpoint - - cleaner cleaner } type Option func(*BatchIndexer) @@ -66,10 +64,6 @@ func NewBatchIndexer(ctx context.Context, config IndexerConfig, store Store, lsn opt(indexer) } - indexer.cleaner = newSchemaCleaner(&config.CleanupBackoff, store, indexer.logger) - // start a goroutine for processing schema deletes asynchronously. - // routine ends when the internal channel is closed. - go indexer.cleaner.start(ctx) return indexer } @@ -191,7 +185,6 @@ func (i *BatchIndexer) Name() string { func (i *BatchIndexer) Close() error { close(i.msgChan) - i.cleaner.stop() return nil } @@ -270,8 +263,8 @@ func (i *BatchIndexer) applySchemaChange(ctx context.Context, new *schemalog.Log } if new.Schema.Dropped { - if err := i.cleaner.deleteSchema(ctx, new.SchemaName); err != nil { - return fmt.Errorf("register schema for delete: %w", err) + if err := i.store.DeleteSchema(ctx, new.SchemaName); err != nil { + return fmt.Errorf("deleting schema: %w", err) } return nil } diff --git a/pkg/wal/processor/search/search_batch_indexer_test.go b/pkg/wal/processor/search/search_batch_indexer_test.go index 4ea67eb..e274ac8 100644 --- a/pkg/wal/processor/search/search_batch_indexer_test.go +++ b/pkg/wal/processor/search/search_batch_indexer_test.go @@ -401,7 +401,6 @@ func TestBatchIndexer_sendBatch(t *testing.T) { checkpoint checkpointer.Checkpoint batch *msgBatch skipSchema func(string) bool - cleaner cleaner wantErr error }{ @@ -517,9 +516,8 @@ func TestBatchIndexer_sendBatch(t *testing.T) { }, positions: []wal.CommitPosition{testCommitPos}, }, - store: &mockStore{}, - cleaner: &mockCleaner{ - deleteSchemaFn: func(ctx context.Context, s string) error { + store: &mockStore{ + deleteSchemaFn: func(ctx context.Context, _ uint, s string) error { require.Equal(t, testSchemaName, s) return nil }, @@ -557,9 +555,8 @@ func TestBatchIndexer_sendBatch(t *testing.T) { }, positions: []wal.CommitPosition{testCommitPos}, }, - store: &mockStore{}, - cleaner: &mockCleaner{ - deleteSchemaFn: func(ctx context.Context, s string) error { + store: &mockStore{ + deleteSchemaFn: func(ctx context.Context, _ uint, s string) error { return errTest }, }, @@ -680,10 +677,6 @@ func TestBatchIndexer_sendBatch(t *testing.T) { indexer.skipSchema = tc.skipSchema } - if tc.cleaner != nil { - indexer.cleaner = tc.cleaner - } - err := indexer.sendBatch(context.Background(), tc.batch) require.ErrorIs(t, err, tc.wantErr) }) diff --git a/pkg/wal/processor/search/search_schema_cleaner.go b/pkg/wal/processor/search/search_schema_cleaner.go deleted file mode 100644 index e318800..0000000 --- a/pkg/wal/processor/search/search_schema_cleaner.go +++ /dev/null @@ -1,103 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package search - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/xataio/pgstream/pkg/backoff" - loglib "github.com/xataio/pgstream/pkg/log" -) - -type cleaner interface { - deleteSchema(context.Context, string) error - start(context.Context) - stop() -} - -type store interface { - DeleteSchema(ctx context.Context, schemaName string) error -} - -// schemaCleaner takes care of deleting schemas from the search store -// asynchronously -type schemaCleaner struct { - logger loglib.Logger - deleteSchemaQueue chan string - store store - backoffProvider backoff.Provider - registrationTimeout time.Duration -} - -const ( - maxDeleteQueueSize = 5000 - defaultRegistrationTimeout = 5 * time.Second -) - -var errRegistrationTimeout = errors.New("timeout registering schema for clean up") - -func newSchemaCleaner(cfg *backoff.Config, store store, logger loglib.Logger) *schemaCleaner { - return &schemaCleaner{ - logger: logger, - deleteSchemaQueue: make(chan string, maxDeleteQueueSize), - store: store, - registrationTimeout: defaultRegistrationTimeout, - backoffProvider: backoff.NewProvider(cfg), - } -} - -// deleteSchema writes a delete schema item to the delete queue. Times out and returns an error after 5 seconds. -func (sc *schemaCleaner) deleteSchema(_ context.Context, schemaName string) error { - select { - case sc.deleteSchemaQueue <- schemaName: - return nil - case <-time.After(sc.registrationTimeout): - return errRegistrationTimeout - } -} - -// start will continuously process schema items from the local delete queue -func (sc *schemaCleaner) start(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case schema := <-sc.deleteSchemaQueue: - bo := sc.backoffProvider(ctx) - err := bo.RetryNotify( - func() error { - return getRetryError(sc.store.DeleteSchema(ctx, schema)) - }, - func(err error, duration time.Duration) { - sc.logger.Warn(err, "search schema cleaner: delete schema retry failed", loglib.Fields{ - "backoff": duration, - "schema": schema, - }) - }) - if err != nil { - sc.logger.Error(err, "search schema cleaner: delete schema", loglib.Fields{"schema": schema}) - } - } - } -} - -// stop will stop the processing of delete items from the queue and release -// internal resources -func (sc schemaCleaner) stop() { - close(sc.deleteSchemaQueue) -} - -// getRetryError returns a backoff permanent error if the given error is not -// retryable -func getRetryError(err error) error { - if err != nil { - if errors.Is(err, ErrRetriable) { - return err - } - return fmt.Errorf("%w: %w", err, backoff.ErrPermanent) - } - return nil -} diff --git a/pkg/wal/processor/search/search_schema_cleaner_test.go b/pkg/wal/processor/search/search_schema_cleaner_test.go deleted file mode 100644 index 7809e05..0000000 --- a/pkg/wal/processor/search/search_schema_cleaner_test.go +++ /dev/null @@ -1,160 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 - -package search - -import ( - "context" - "errors" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" - "github.com/xataio/pgstream/pkg/backoff" - "github.com/xataio/pgstream/pkg/backoff/mocks" - loglib "github.com/xataio/pgstream/pkg/log" -) - -func TestSchemaCleaner_deleteSchema(t *testing.T) { - t.Parallel() - - testSchemaName := "test_schema" - - tests := []struct { - name string - queueSize uint - - wantErr error - }{ - { - name: "ok", - queueSize: 10, - - wantErr: nil, - }, - { - name: "error - registration timeout", - queueSize: 0, - - wantErr: errRegistrationTimeout, - }, - } - - for _, tc := range tests { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - schemaCleaner := &schemaCleaner{ - logger: loglib.NewNoopLogger(), - registrationTimeout: time.Second, - deleteSchemaQueue: make(chan string, tc.queueSize), - } - defer schemaCleaner.stop() - - err := schemaCleaner.deleteSchema(context.Background(), testSchemaName) - require.ErrorIs(t, err, tc.wantErr) - }) - } -} - -func TestSchemaCleaner_start(t *testing.T) { - t.Parallel() - - testSchemaName := "test_schema" - errTest := errors.New("oh noes") - - tests := []struct { - name string - store store - backoffProvider func(doneChan chan struct{}) backoff.Provider - }{ - { - name: "ok", - store: &mockStore{ - deleteSchemaFn: func(ctx context.Context, schemaName string) error { - require.Equal(t, testSchemaName, schemaName) - return nil - }, - }, - backoffProvider: func(doneChan chan struct{}) backoff.Provider { - once := sync.Once{} - return func(ctx context.Context) backoff.Backoff { - return &mocks.Backoff{ - RetryNotifyFn: func(o backoff.Operation, n backoff.Notify) error { - defer once.Do(func() { doneChan <- struct{}{} }) - return o() - }, - } - } - }, - }, - { - name: "error deleting schema", - store: &mockStore{ - deleteSchemaFn: func(ctx context.Context, schemaName string) error { - return errTest - }, - }, - backoffProvider: func(doneChan chan struct{}) backoff.Provider { - once := sync.Once{} - return func(ctx context.Context) backoff.Backoff { - return &mocks.Backoff{ - RetryNotifyFn: func(o backoff.Operation, n backoff.Notify) error { - defer once.Do(func() { doneChan <- struct{}{} }) - err := o() - if err != nil { - n(err, 50*time.Millisecond) - } - return err - }, - } - } - }, - }, - } - - for _, tc := range tests { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - doneChan := make(chan struct{}, 1) - defer close(doneChan) - - schemaCleaner := &schemaCleaner{ - logger: loglib.NewNoopLogger(), - store: tc.store, - backoffProvider: tc.backoffProvider(doneChan), - registrationTimeout: defaultRegistrationTimeout, - deleteSchemaQueue: make(chan string, 100), - } - defer schemaCleaner.stop() - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - schemaCleaner.start(ctx) - }() - - schemaCleaner.deleteSchemaQueue <- testSchemaName - - for { - select { - case <-ctx.Done(): - t.Errorf("test timeout reached") - wg.Wait() - return - case <-doneChan: - cancel() - wg.Wait() - return - } - } - }) - } -} diff --git a/pkg/wal/processor/search/search_store_retrier.go b/pkg/wal/processor/search/search_store_retrier.go index ddcf455..2ecccab 100644 --- a/pkg/wal/processor/search/search_store_retrier.go +++ b/pkg/wal/processor/search/search_store_retrier.go @@ -66,7 +66,21 @@ func (s *StoreRetrier) ApplySchemaChange(ctx context.Context, logEntry *schemalo } func (s *StoreRetrier) DeleteSchema(ctx context.Context, schemaName string) error { - return s.inner.DeleteSchema(ctx, schemaName) + bo := s.backoffProvider(ctx) + err := bo.RetryNotify( + func() error { + return getRetryError(s.inner.DeleteSchema(ctx, schemaName)) + }, + func(err error, duration time.Duration) { + s.logger.Warn(err, "delete schema retry failed", loglib.Fields{ + "backoff": duration, + "schema": schemaName, + }) + }) + if err != nil { + s.logger.Error(err, "delete schema", loglib.Fields{"schema": schemaName}) + } + return err } func (s *StoreRetrier) DeleteTableDocuments(ctx context.Context, schemaName string, tableIDs []string) error { @@ -187,3 +201,15 @@ func (c *StoreRetryConfig) backoffConfig() *backoff.Config { }, } } + +// getRetryError returns a backoff permanent error if the given error is not +// retryable +func getRetryError(err error) error { + if err != nil { + if errors.Is(err, ErrRetriable) { + return err + } + return fmt.Errorf("%w: %w", err, backoff.ErrPermanent) + } + return nil +} diff --git a/pkg/wal/processor/search/search_store_retrier_test.go b/pkg/wal/processor/search/search_store_retrier_test.go index 4e03620..d471a1c 100644 --- a/pkg/wal/processor/search/search_store_retrier_test.go +++ b/pkg/wal/processor/search/search_store_retrier_test.go @@ -12,6 +12,91 @@ import ( loglib "github.com/xataio/pgstream/pkg/log" ) +func TestStoreRetrier_DeleteSchema(t *testing.T) { + t.Parallel() + + const testSchema = "test-schema" + tests := []struct { + name string + store *mockStore + + wantErr error + }{ + { + name: "ok", + store: &mockStore{ + deleteSchemaFn: func(ctx context.Context, _ uint, schemaName string) error { + require.Equal(t, testSchema, schemaName) + return nil + }, + }, + wantErr: nil, + }, + { + name: "ok - retriable error", + store: &mockStore{ + deleteSchemaFn: func(ctx context.Context, i uint, schemaName string) error { + require.Equal(t, testSchema, schemaName) + switch i { + case 1: + return ErrRetriable + case 2: + return nil + default: + return fmt.Errorf("unexpected call to deleteSchema: %d", i) + } + }, + }, + wantErr: nil, + }, + { + name: "err - retriable error backoff exhausted", + store: &mockStore{ + deleteSchemaFn: func(ctx context.Context, i uint, schemaName string) error { + require.Equal(t, testSchema, schemaName) + switch i { + case 1, 2, 3: + return ErrRetriable + default: + return fmt.Errorf("unexpected call to deleteSchema: %d", i) + } + }, + }, + wantErr: ErrRetriable, + }, + { + name: "err - permanent error", + store: &mockStore{ + deleteSchemaFn: func(ctx context.Context, i uint, schemaName string) error { + require.Equal(t, testSchema, schemaName) + switch i { + case 1: + return errTest + default: + return fmt.Errorf("unexpected call to deleteSchema: %d", i) + } + }, + }, + wantErr: errTest, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + retrier := StoreRetrier{ + inner: tc.store, + logger: loglib.NewNoopLogger(), + backoffProvider: newMockBackoffProvider(), + } + err := retrier.DeleteSchema(context.Background(), testSchema) + require.ErrorIs(t, err, tc.wantErr) + }) + } +} + func TestStoreRetrier_SendDocuments(t *testing.T) { t.Parallel()