-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
58665: cli: support `cockroach --version`, `cockroach version --build-tag` r=bdarnell a=knz Requested by the CC SRE team: we want a guaranteed machine-readable way to extract the version of a crdb executable. `cockroach version` did not provide this. Release note (cli change): The `cockroach` command now supports the command-line parameter `--version` which reports its version parameters. This makes `cockroach --version` equivalent to `cockroach version`. Release note (ops change); The `cockroach version` command now supports a new parameter `--build-tag`; when specified, it displays the technical build tag, which makes it possible to integrate with automated deployment tools. 58672: streamingccl: add IngestionFrontierProcessor r=pbardea a=adityamaru The IngestionFrontierProcessor (IFP) is fed rows from the IngestionProcessor. Each row represents a partition and the latest resolved ts event received for that partition. The IFP uses the existing util `spanFrontier` (used by changefeeds) to keep track of all the rows returned by the several ingestion procs, and resolves the minimum resolved ts for a particular partition across all procs. This is then written to the ingestion job progress indicating that all processors have ingested a stream up to a particular resolved ts. TODO: Job resumption from the resolved ts reported in its progress. Will do this in a follow up PR, along with a test. Informs: #57400 Release note: None 59051: githooks: new release note category `ops changes` r=jseldess,taroface a=knz Fixes #57898 This new release note category is meant to cover changes that interest primarily the folk responsible for running production clusters and integrating CockroachDB into a larger IT deployment. Note that this new category takes over certain things that would previously be attributed to "cli changes". From now on "cli changes" covers changes that primarily interest app developers, crdb developers and operators that are currently trying out features or setting up experiments in development/staging environments. See this page for more details: https://wiki.crdb.io/wiki/spaces/CRDB/pages/186548364/Release+notes Release note: None Co-authored-by: Raphael 'kena' Poss <knz@thaumogen.net> Co-authored-by: Aditya Maru <adityamaru@gmail.com>
- Loading branch information
Showing
21 changed files
with
1,217 additions
and
316 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
177 changes: 177 additions & 0 deletions
177
pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
// Copyright 2020 The Cockroach Authors. | ||
// | ||
// Licensed as a CockroachDB Enterprise file under the Cockroach Community | ||
// License (the "License"); you may not use this file except in compliance with | ||
// the License. You may obtain a copy of the License at | ||
// | ||
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt | ||
|
||
package streamingest | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb" | ||
"github.com/cockroachdb/cockroach/pkg/roachpb" | ||
"github.com/cockroachdb/cockroach/pkg/sql/execinfra" | ||
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" | ||
"github.com/cockroachdb/cockroach/pkg/sql/rowenc" | ||
"github.com/cockroachdb/cockroach/pkg/sql/rowexec" | ||
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree" | ||
"github.com/cockroachdb/cockroach/pkg/sql/types" | ||
"github.com/cockroachdb/cockroach/pkg/util/hlc" | ||
"github.com/cockroachdb/cockroach/pkg/util/protoutil" | ||
"github.com/cockroachdb/cockroach/pkg/util/span" | ||
"github.com/cockroachdb/errors" | ||
"github.com/cockroachdb/redact" | ||
) | ||
|
||
const streamIngestionFrontierProcName = `ingestfntr` | ||
|
||
type streamIngestionFrontier struct { | ||
execinfra.ProcessorBase | ||
execinfra.StreamingProcessor | ||
|
||
flowCtx *execinfra.FlowCtx | ||
spec execinfrapb.StreamIngestionFrontierSpec | ||
a rowenc.DatumAlloc | ||
|
||
// input returns rows from one or more streamIngestion processors. | ||
input execinfra.RowSource | ||
// highWaterAtStart is the job high-water. It's used in an assertion that we | ||
// never regress the job high-water. | ||
highWaterAtStart hlc.Timestamp | ||
|
||
// frontier contains the current resolved timestamp high-water for the tracked | ||
// span set. | ||
frontier *span.Frontier | ||
} | ||
|
||
var _ execinfra.Processor = &streamIngestionFrontier{} | ||
var _ execinfra.RowSource = &streamIngestionFrontier{} | ||
|
||
func init() { | ||
rowexec.NewStreamIngestionFrontierProcessor = newStreamIngestionFrontierProcessor | ||
} | ||
|
||
func newStreamIngestionFrontierProcessor( | ||
flowCtx *execinfra.FlowCtx, | ||
processorID int32, | ||
spec execinfrapb.StreamIngestionFrontierSpec, | ||
input execinfra.RowSource, | ||
post *execinfrapb.PostProcessSpec, | ||
output execinfra.RowReceiver, | ||
) (execinfra.Processor, error) { | ||
sf := &streamIngestionFrontier{ | ||
flowCtx: flowCtx, | ||
spec: spec, | ||
input: input, | ||
highWaterAtStart: spec.HighWaterAtStart, | ||
frontier: span.MakeFrontier(spec.TrackedSpans...), | ||
} | ||
if err := sf.Init( | ||
sf, | ||
post, | ||
input.OutputTypes(), | ||
flowCtx, | ||
processorID, | ||
output, | ||
nil, /* memMonitor */ | ||
execinfra.ProcStateOpts{ | ||
InputsToDrain: []execinfra.RowSource{sf.input}, | ||
}, | ||
); err != nil { | ||
return nil, err | ||
} | ||
return sf, nil | ||
} | ||
|
||
// Start is part of the RowSource interface. | ||
func (sf *streamIngestionFrontier) Start(ctx context.Context) context.Context { | ||
sf.input.Start(ctx) | ||
return sf.StartInternal(ctx, streamIngestionFrontierProcName) | ||
} | ||
|
||
// Next is part of the RowSource interface. | ||
func (sf *streamIngestionFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { | ||
for sf.State == execinfra.StateRunning { | ||
row, meta := sf.input.Next() | ||
if meta != nil { | ||
if meta.Err != nil { | ||
sf.MoveToDraining(nil /* err */) | ||
} | ||
return nil, meta | ||
} | ||
if row == nil { | ||
sf.MoveToDraining(nil /* err */) | ||
break | ||
} | ||
|
||
var frontierChanged bool | ||
var err error | ||
if frontierChanged, err = sf.noteResolvedTimestamp(row[0]); err != nil { | ||
sf.MoveToDraining(err) | ||
break | ||
} | ||
if frontierChanged { | ||
// Send back a row to the job so that it can update the progress. | ||
newResolvedTS := sf.frontier.Frontier() | ||
progressBytes, err := protoutil.Marshal(&newResolvedTS) | ||
if err != nil { | ||
sf.MoveToDraining(err) | ||
break | ||
} | ||
pushRow := rowenc.EncDatumRow{ | ||
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(progressBytes))), | ||
} | ||
if outRow := sf.ProcessRowHelper(pushRow); outRow != nil { | ||
return outRow, nil | ||
} | ||
} | ||
} | ||
return nil, sf.DrainHelper() | ||
} | ||
|
||
func (sf *streamIngestionFrontier) noteResolvedTimestamp(d rowenc.EncDatum) (bool, error) { | ||
var frontierChanged bool | ||
if err := d.EnsureDecoded(streamIngestionResultTypes[0], &sf.a); err != nil { | ||
return frontierChanged, err | ||
} | ||
raw, ok := d.Datum.(*tree.DBytes) | ||
if !ok { | ||
return frontierChanged, errors.AssertionFailedf(`unexpected datum type %T: %s`, d.Datum, | ||
d.Datum) | ||
} | ||
var resolved jobspb.ResolvedSpan | ||
if err := protoutil.Unmarshal([]byte(*raw), &resolved); err != nil { | ||
return frontierChanged, errors.NewAssertionErrorWithWrappedErrf(err, | ||
`unmarshalling resolved timestamp: %x`, raw) | ||
} | ||
|
||
// Inserting a timestamp less than the one the ingestion flow started at could | ||
// potentially regress the job progress. This is not expected and thus we | ||
// assert to catch such unexpected behavior. | ||
if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(sf.highWaterAtStart) { | ||
return frontierChanged, errors.AssertionFailedf( | ||
`got a resolved timestamp %s that is less than the frontier processor start time %s`, | ||
redact.Safe(resolved.Timestamp), redact.Safe(sf.highWaterAtStart)) | ||
} | ||
|
||
return sf.maybeMoveFrontier(resolved.Span, resolved.Timestamp), nil | ||
} | ||
|
||
// maybeMoveFrontier updates the resolved ts for the provided span, and returns | ||
// true if the update causes the frontier to move to higher resolved ts. | ||
func (sf *streamIngestionFrontier) maybeMoveFrontier( | ||
span roachpb.Span, resolved hlc.Timestamp, | ||
) bool { | ||
prevResolved := sf.frontier.Frontier() | ||
sf.frontier.Forward(span, resolved) | ||
return prevResolved.Less(sf.frontier.Frontier()) | ||
} | ||
|
||
// ConsumerClosed is part of the RowSource interface. | ||
func (sf *streamIngestionFrontier) ConsumerClosed() { | ||
// The consumer is done, Next() will not be called again. | ||
sf.InternalClose() | ||
} |
Oops, something went wrong.