Skip to content

Commit

Permalink
streamingccl: add IngestionFrontierProcessor
Browse files Browse the repository at this point in the history
The IngestionFrontierProcessor (IFP) is fed rows from the IngestionProcessor.
Each row represents a partition and the latest resolved ts event
received for that partition. The IFP uses the existing util
`spanFrontier` (used by changefeeds) to keep track of all the rows
returned by the several ingestion procs, and resolves the minimum
resolved ts for a particular partition across all procs. This is then
written to the ingestion job progress indicating that all processors
have ingested a stream upto a particular resolved ts.

Release note: None
  • Loading branch information
adityamaru committed Jan 26, 2021
1 parent aaa44bd commit dfdfa8b
Show file tree
Hide file tree
Showing 11 changed files with 1,119 additions and 292 deletions.
6 changes: 6 additions & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "streamingest",
srcs = [
"stream_ingestion_frontier_processor.go",
"stream_ingestion_job.go",
"stream_ingestion_processor.go",
"stream_ingestion_processor_planning.go",
Expand All @@ -28,17 +29,21 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/util/hlc",
"//pkg/util/protoutil",
"//pkg/util/span",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
],
)

go_test(
name = "streamingest_test",
srcs = [
"main_test.go",
"stream_ingestion_frontier_processor_test.go",
"stream_ingestion_job_test.go",
"stream_ingestion_processor_test.go",
],
Expand Down Expand Up @@ -72,6 +77,7 @@ go_test(
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright 2020 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamingest

import (
"context"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

const streamIngestionFrontierProcName = `ingestfntr`

type streamIngestionFrontier struct {
execinfra.ProcessorBase
execinfra.StreamingProcessor

flowCtx *execinfra.FlowCtx
spec execinfrapb.StreamIngestionFrontierSpec
a rowenc.DatumAlloc

// input returns rows from one or more streamIngestion processors
input execinfra.RowSource
// highWaterAtStart is the job high-water. It's used in an assertion that we
// never regress the job high-water.
highWaterAtStart hlc.Timestamp

// frontier contains the current resolved timestamp high-water for the tracked
// span set.
frontier *span.Frontier
}

var _ execinfra.Processor = &streamIngestionFrontier{}
var _ execinfra.RowSource = &streamIngestionFrontier{}

func newStreamIngestionFrontierProcessor(
flowCtx *execinfra.FlowCtx,
processorID int32,
spec execinfrapb.StreamIngestionFrontierSpec,
input execinfra.RowSource,
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
sf := &streamIngestionFrontier{
flowCtx: flowCtx,
spec: spec,
input: input,
highWaterAtStart: spec.HighWaterAtStart,
frontier: span.MakeFrontier(spec.TrackedSpans...),
}
if err := sf.Init(
sf,
post,
input.OutputTypes(),
flowCtx,
processorID,
output,
nil, /* memMonitor */
execinfra.ProcStateOpts{
InputsToDrain: []execinfra.RowSource{sf.input},
},
); err != nil {
return nil, err
}
return sf, nil
}

// Start is part of the RowSource interface.
func (sf *streamIngestionFrontier) Start(ctx context.Context) context.Context {
sf.input.Start(ctx)
ctx = sf.StartInternal(ctx, streamIngestionFrontierProcName)
return ctx
}

// Next is part of the RowSource interface.
func (sf *streamIngestionFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
for sf.State == execinfra.StateRunning {
row, meta := sf.input.Next()
if meta != nil {
if meta.Err != nil {
sf.MoveToDraining(nil /* err */)
}
return nil, meta
}
if row == nil {
sf.MoveToDraining(nil /* err */)
break
}

var frontierChanged bool
var err error
if frontierChanged, err = sf.noteResolvedTimestamp(row[0]); err != nil {
sf.MoveToDraining(err)
break
}
if frontierChanged {
// Send back a row to the job so that it can update the progress.
newResolvedTS := sf.frontier.Frontier()
progressBytes, err := protoutil.Marshal(&newResolvedTS)
if err != nil {
sf.MoveToDraining(err)
return nil, sf.DrainHelper()
}
pushRow := rowenc.EncDatumRow{
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(progressBytes))),
}
if outRow := sf.ProcessRowHelper(pushRow); outRow != nil {
return outRow, nil
}
}
}
return nil, sf.DrainHelper()
}

func (sf *streamIngestionFrontier) noteResolvedTimestamp(d rowenc.EncDatum) (bool, error) {
var frontierChanged bool
if err := d.EnsureDecoded(streamIngestionResultTypes[0], &sf.a); err != nil {
return frontierChanged, err
}
raw, ok := d.Datum.(*tree.DBytes)
if !ok {
return frontierChanged, errors.AssertionFailedf(`unexpected datum type %T: %s`, d.Datum,
d.Datum)
}
var resolved jobspb.ResolvedSpan
if err := protoutil.Unmarshal([]byte(*raw), &resolved); err != nil {
return frontierChanged, errors.NewAssertionErrorWithWrappedErrf(err,
`unmarshalling resolved timestamp: %x`, raw)
}

// Inserting a timestamp less than the one the ingestion flow started at could
// potentially regress the job progress. This is not expected and thus we
// assert to catch such unexpected behavior.
if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(sf.highWaterAtStart) {
return frontierChanged, errors.AssertionFailedf(
`got a resolved timestamp %s that is less than the frontier processor start time %s`,
redact.Safe(resolved.Timestamp), redact.Safe(sf.highWaterAtStart))
}

return sf.maybeMoveFrontier(resolved.Span, resolved.Timestamp), nil
}

// maybeMoveFrontier updates the resolved ts for the provided span, and returns
// true if the update causes the frontier to move to higher resolved ts.
func (sf *streamIngestionFrontier) maybeMoveFrontier(
span roachpb.Span, resolved hlc.Timestamp,
) bool {
prevResolved := sf.frontier.Frontier()
sf.frontier.Forward(span, resolved)
return prevResolved.Less(sf.frontier.Frontier())
}

// ConsumerClosed is part of the RowSource interface.
func (sf *streamIngestionFrontier) ConsumerClosed() {
// The consumer is done, Next() will not be called again.
sf.InternalClose()
}
Loading

0 comments on commit dfdfa8b

Please sign in to comment.