Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
121918: streamingest: increase initial split TTLs r=dt a=dt

Release note: none.
Epic: none.

Co-authored-by: David Taylor <tinystatemachine@gmail.com>
  • Loading branch information
craig[bot] and dt committed Apr 11, 2024
2 parents cead8be + f39a0a4 commit 8c40c18
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,21 @@ func createInitialSplits(
return grp.Wait()
}

// Spans are filled from left to right during the initial scan. Each might cover
// many (10-100+) source ranges merged to a single span by PartitionSpans, that
// could take multiple minutes to fill, so a span later in spans might be filled
// until minutes or hours later than one earlier in spans. Thus we want to give
// later splits longer enforcement times as we know that they may not be filled
// for longer, and we do not want them being merged away before we fill them.
const baseSplitExpiration = time.Hour * 6
const extraExpirationPerSpan = time.Minute * 10
const maxSplitExpiration = time.Hour * 24 * 7

func splitAndScatterWorker(
spans []roachpb.Span, rekeyer *backupccl.KeyRewriter, splitter splitAndScatterer,
) func(ctx context.Context) error {
return func(ctx context.Context) error {
for _, span := range spans {
for spanNum, span := range spans {
startKey := span.Key.Clone()
splitKey, _, err := rekeyer.RewriteKey(startKey, 0 /* walltimeForImportElision */)
if err != nil {
Expand Down Expand Up @@ -402,7 +412,8 @@ func splitAndScatterWorker(
// splitKey = newSplitKey
// }
//
if err := splitAndScatter(ctx, splitKey, splitter); err != nil {
addedExpiration := time.Duration(spanNum) * extraExpirationPerSpan
if err := splitAndScatter(ctx, splitKey, splitter, addedExpiration); err != nil {
return err
}

Expand All @@ -411,13 +422,11 @@ func splitAndScatterWorker(
}
}

var splitAndScatterSitckyBitDuration = time.Hour

func splitAndScatter(
ctx context.Context, splitAndScatterKey roachpb.Key, s splitAndScatterer,
ctx context.Context, splitAndScatterKey roachpb.Key, s splitAndScatterer, extra time.Duration,
) error {
log.VInfof(ctx, 1, "splitting and scattering at %s", splitAndScatterKey)
expirationTime := s.now().AddDuration(splitAndScatterSitckyBitDuration)
expirationTime := s.now().AddDuration(min(baseSplitExpiration+extra, maxSplitExpiration))
if err := s.split(ctx, splitAndScatterKey, expirationTime); err != nil {
return err
}
Expand Down

0 comments on commit 8c40c18

Please sign in to comment.