Skip to content

Commit

Permalink
sql: add catalog.Partitioning interface
Browse files Browse the repository at this point in the history
This commit aims at reducing direct usage of
descpb.PartitioningDescriptor types and instead using the new
catalog.Partitioning interface. This refactoring is in line with recent
virtualization work tracked under #56306.

In an effort to improve readability and ability to reason about state
changes, this commit changes the type signature of sql.CreatePartitioning
and, notably, removes its side-effects on the index descriptor. The
latter is now modified using a new tabledesc.UpdateIndexPartitioning
function.

Release note: None
  • Loading branch information
Marius Posta committed May 5, 2021
1 parent 782d320 commit 696e9e4
Show file tree
Hide file tree
Showing 27 changed files with 625 additions and 457 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/partitionccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/sql/types",
"//pkg/util/encoding",
"//pkg/util/errorutil/unimplemented",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
159 changes: 73 additions & 86 deletions pkg/ccl/partitionccl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -149,7 +150,7 @@ func createPartitioningImpl(
ctx context.Context,
evalCtx *tree.EvalContext,
tableDesc *tabledesc.Mutable,
indexDesc *descpb.IndexDescriptor,
newIdxColumnNames []string,
partBy *tree.PartitionBy,
allowedNewColumnNames []tree.Name,
numImplicitColumns int,
Expand All @@ -166,7 +167,7 @@ func createPartitioningImpl(
// We don't have the fields for our parent partitions handy, but we can use
// the names from the index we're partitioning. They must have matched or we
// would have already returned an error.
partCols := append([]string(nil), indexDesc.ColumnNames[:colOffset]...)
partCols := append([]string(nil), newIdxColumnNames[:colOffset]...)
for _, p := range partBy.Fields {
partCols = append(partCols, string(p))
}
Expand All @@ -175,16 +176,16 @@ func createPartitioningImpl(

var cols []catalog.Column
for i := 0; i < len(partBy.Fields); i++ {
if colOffset+i >= len(indexDesc.ColumnNames) {
if colOffset+i >= len(newIdxColumnNames) {
return partDesc, pgerror.Newf(pgcode.Syntax,
"declared partition columns (%s) exceed the number of columns in index being partitioned (%s)",
partitioningString(), strings.Join(indexDesc.ColumnNames, ", "))
partitioningString(), strings.Join(newIdxColumnNames, ", "))
}
// Search by name because some callsites of this method have not
// allocated ids yet (so they are still all the 0 value).
col, err := findColumnByNameOnTable(
tableDesc,
tree.Name(indexDesc.ColumnNames[colOffset+i]),
tree.Name(newIdxColumnNames[colOffset+i]),
allowedNewColumnNames,
)
if err != nil {
Expand All @@ -197,7 +198,7 @@ func createPartitioningImpl(
n := colOffset + i + 1
return partDesc, pgerror.Newf(pgcode.Syntax,
"declared partition columns (%s) do not match first %d columns in index being partitioned (%s)",
partitioningString(), n, strings.Join(indexDesc.ColumnNames[:n], ", "))
partitioningString(), n, strings.Join(newIdxColumnNames[:n], ", "))
}
}

Expand Down Expand Up @@ -225,7 +226,7 @@ func createPartitioningImpl(
ctx,
evalCtx,
tableDesc,
indexDesc,
newIdxColumnNames,
l.Subpartition,
allowedNewColumnNames,
0, /* implicitColumnNames */
Expand Down Expand Up @@ -263,25 +264,19 @@ func createPartitioningImpl(
return partDesc, nil
}

// detectImplicitPartitionColumns detects implicit partitioning columns
// and returns a new index descriptor with the implicit columns modified
// on the index descriptor and the number of implicit columns prepended.
func detectImplicitPartitionColumns(
evalCtx *tree.EvalContext,
// collectImplicitPartitionColumns collects implicit partitioning columns.
func collectImplicitPartitionColumns(
tableDesc *tabledesc.Mutable,
indexDesc descpb.IndexDescriptor,
indexFirstColumnName string,
partBy *tree.PartitionBy,
allowedNewColumnNames []tree.Name,
) (descpb.IndexDescriptor, int, error) {
) (implicitCols []catalog.Column, _ error) {
seenImplicitColumnNames := map[string]struct{}{}
var implicitColumnIDs []descpb.ColumnID
var implicitColumns []string
var implicitColumnDirections []descpb.IndexDescriptor_Direction
// Iterate over each field in the PARTITION BY until it matches the start
// of the actual explicitly indexed columns.
for _, field := range partBy.Fields {
// As soon as the fields match, we have no implicit columns to add.
if string(field) == indexDesc.ColumnNames[0] {
if string(field) == indexFirstColumnName {
break
}

Expand All @@ -291,27 +286,20 @@ func detectImplicitPartitionColumns(
allowedNewColumnNames,
)
if err != nil {
return indexDesc, 0, err
return nil, err
}
if _, ok := seenImplicitColumnNames[col.GetName()]; ok {
return indexDesc, 0, pgerror.Newf(
return nil, pgerror.Newf(
pgcode.InvalidObjectDefinition,
`found multiple definitions in partition using column "%s"`,
col.GetName(),
)
}
seenImplicitColumnNames[col.GetName()] = struct{}{}
implicitColumns = append(implicitColumns, col.GetName())
implicitColumnIDs = append(implicitColumnIDs, col.GetID())
implicitColumnDirections = append(implicitColumnDirections, descpb.IndexDescriptor_ASC)
implicitCols = append(implicitCols, col)
}

if len(implicitColumns) > 0 {
indexDesc.ColumnNames = append(implicitColumns, indexDesc.ColumnNames...)
indexDesc.ColumnIDs = append(implicitColumnIDs, indexDesc.ColumnIDs...)
indexDesc.ColumnDirections = append(implicitColumnDirections, indexDesc.ColumnDirections...)
}
return indexDesc, len(implicitColumns), nil
return implicitCols, nil
}

// findColumnByNameOnTable finds the given column from the table.
Expand Down Expand Up @@ -348,74 +336,72 @@ func createPartitioning(
partBy *tree.PartitionBy,
allowedNewColumnNames []tree.Name,
allowImplicitPartitioning bool,
) (descpb.IndexDescriptor, error) {
) (newImplicitCols []catalog.Column, newPartitioning descpb.PartitioningDescriptor, err error) {
org := sql.ClusterOrganization.Get(&st.SV)
if err := utilccl.CheckEnterpriseEnabled(st, evalCtx.ClusterID, org, "partitions"); err != nil {
return indexDesc, err
return nil, newPartitioning, err
}

// Truncate existing implicitly partitioned columns.
// Truncate existing implicitly partitioned column names.
oldNumImplicitColumns := int(indexDesc.Partitioning.NumImplicitColumns)
oldImplicitColumnIDs := indexDesc.ColumnIDs[:oldNumImplicitColumns]
newIdxColumnNames := indexDesc.ColumnNames[oldNumImplicitColumns:]

indexDesc.ColumnIDs = indexDesc.ColumnIDs[oldNumImplicitColumns:]
indexDesc.ColumnNames = indexDesc.ColumnNames[oldNumImplicitColumns:]
indexDesc.ColumnDirections = indexDesc.ColumnDirections[oldNumImplicitColumns:]

var numImplicitColumns int
var err error
if allowImplicitPartitioning {
indexDesc, numImplicitColumns, err = detectImplicitPartitionColumns(
evalCtx,
newImplicitCols, err = collectImplicitPartitionColumns(
tableDesc,
indexDesc,
newIdxColumnNames[0],
partBy,
allowedNewColumnNames,
)
if err != nil {
return indexDesc, err
return nil, newPartitioning, err
}
if numImplicitColumns > 0 {
if err := checkClusterSupportsImplicitPartitioning(evalCtx); err != nil {
return indexDesc, err
}
}
if len(newImplicitCols) > 0 {
if err := checkClusterSupportsImplicitPartitioning(evalCtx); err != nil {
return nil, newPartitioning, err
}
// Prepend with new implicit column names.
newIdxColumnNames = make([]string, len(newImplicitCols), len(newImplicitCols)+len(newIdxColumnNames))
for i, col := range newImplicitCols {
newIdxColumnNames[i] = col.GetName()
}
newIdxColumnNames = append(newIdxColumnNames, indexDesc.ColumnNames[oldNumImplicitColumns:]...)
}

// If we had implicit column partitioning beforehand, check we have the
// same implicitly partitioned columns.
// Having different implicitly partitioned columns requires rewrites,
// which is outside the scope of createPartitioning.
if oldNumImplicitColumns > 0 {
if numImplicitColumns != oldNumImplicitColumns {
return indexDesc, errors.AssertionFailedf(
if len(newImplicitCols) != oldNumImplicitColumns {
return nil, newPartitioning, errors.AssertionFailedf(
"mismatching number of implicit columns: old %d vs new %d",
oldNumImplicitColumns,
numImplicitColumns,
len(newImplicitCols),
)
}
for i, oldColID := range oldImplicitColumnIDs {
if oldColID != indexDesc.ColumnIDs[i] {
return indexDesc, errors.AssertionFailedf("found new implicit partitioning at index %d", i)
for i, col := range newImplicitCols {
if indexDesc.ColumnIDs[i] != col.GetID() {
return nil, newPartitioning, errors.AssertionFailedf("found new implicit partitioning at column ordinal %d", i)
}
}
}

partitioning, err := createPartitioningImpl(
newPartitioning, err = createPartitioningImpl(
ctx,
evalCtx,
tableDesc,
&indexDesc,
newIdxColumnNames,
partBy,
allowedNewColumnNames,
numImplicitColumns,
len(newImplicitCols),
0, /* colOffset */
)
if err != nil {
return indexDesc, err
return nil, descpb.PartitioningDescriptor{}, err
}
indexDesc.Partitioning = partitioning
return indexDesc, err
return newImplicitCols, newPartitioning, err
}

// selectPartitionExprs constructs an expression for selecting all rows in the
Expand All @@ -434,7 +420,7 @@ func selectPartitionExprs(
AddMutations: true,
}, func(idx catalog.Index) error {
return selectPartitionExprsByName(
a, evalCtx, tableDesc, idx, &idx.IndexDesc().Partitioning, prefixDatums, exprsByPartName, true /* genExpr */)
a, evalCtx, tableDesc, idx, idx.GetPartitioning(), prefixDatums, exprsByPartName, true /* genExpr */)
}); err != nil {
return nil, err
}
Expand Down Expand Up @@ -488,38 +474,37 @@ func selectPartitionExprsByName(
evalCtx *tree.EvalContext,
tableDesc catalog.TableDescriptor,
idx catalog.Index,
partDesc *descpb.PartitioningDescriptor,
part catalog.Partitioning,
prefixDatums tree.Datums,
exprsByPartName map[string]tree.TypedExpr,
genExpr bool,
) error {
if partDesc.NumColumns == 0 {
if part.NumColumns() == 0 {
return nil
}

// Setting genExpr to false skips the expression generation and only
// registers each descendent partition in the map with a placeholder entry.
if !genExpr {
for _, l := range partDesc.List {
exprsByPartName[l.Name] = tree.DBoolFalse
err := part.ForEachList(func(name string, _ [][]byte, subPartitioning catalog.Partitioning) error {
exprsByPartName[name] = tree.DBoolFalse
var fakeDatums tree.Datums
if err := selectPartitionExprsByName(
a, evalCtx, tableDesc, idx, &l.Subpartitioning, fakeDatums, exprsByPartName, genExpr,
); err != nil {
return err
}
}
for _, r := range partDesc.Range {
exprsByPartName[r.Name] = tree.DBoolFalse
return selectPartitionExprsByName(a, evalCtx, tableDesc, idx, subPartitioning, fakeDatums, exprsByPartName, genExpr)
})
if err != nil {
return err
}
return nil
return part.ForEachRange(func(name string, _, _ []byte) error {
exprsByPartName[name] = tree.DBoolFalse
return nil
})
}

var colVars tree.Exprs
{
// The recursive calls of selectPartitionExprsByName don't pass though
// the column ordinal references, so reconstruct them here.
colVars = make(tree.Exprs, len(prefixDatums)+int(partDesc.NumColumns))
colVars = make(tree.Exprs, len(prefixDatums)+part.NumColumns())
for i := range colVars {
col, err := tabledesc.FindPublicColumnWithID(tableDesc, idx.GetColumnID(i))
if err != nil {
Expand All @@ -529,7 +514,7 @@ func selectPartitionExprsByName(
}
}

if len(partDesc.List) > 0 {
if part.NumLists() > 0 {
type exprAndPartName struct {
expr tree.TypedExpr
name string
Expand All @@ -539,12 +524,11 @@ func selectPartitionExprsByName(
// `(1, 2)`, the expr for the former must exclude the latter. This is
// done by bucketing the expression for each partition value by the
// number of DEFAULTs it involves.
partValueExprs := make([][]exprAndPartName, int(partDesc.NumColumns)+1)
partValueExprs := make([][]exprAndPartName, part.NumColumns()+1)

for _, l := range partDesc.List {
for _, valueEncBuf := range l.Values {
t, _, err := rowenc.DecodePartitionTuple(
a, evalCtx.Codec, tableDesc, idx, partDesc, valueEncBuf, prefixDatums)
err := part.ForEachList(func(name string, values [][]byte, subPartitioning catalog.Partitioning) error {
for _, valueEncBuf := range values {
t, _, err := rowenc.DecodePartitionTuple(a, evalCtx.Codec, tableDesc, idx, part, valueEncBuf, prefixDatums)
if err != nil {
return err
}
Expand All @@ -562,11 +546,11 @@ func selectPartitionExprsByName(
tree.NewDTuple(tupleTyp, allDatums...))
partValueExprs[len(t.Datums)] = append(partValueExprs[len(t.Datums)], exprAndPartName{
expr: partValueExpr,
name: l.Name,
name: name,
})

genExpr := true
if _, ok := exprsByPartName[l.Name]; ok {
if _, ok := exprsByPartName[name]; ok {
// Presence of a partition name in the exprsByPartName map
// means the caller has expressed an interested in this
// partition, which means any subpartitions can be skipped
Expand All @@ -578,11 +562,15 @@ func selectPartitionExprsByName(
genExpr = false
}
if err := selectPartitionExprsByName(
a, evalCtx, tableDesc, idx, &l.Subpartitioning, allDatums, exprsByPartName, genExpr,
a, evalCtx, tableDesc, idx, subPartitioning, allDatums, exprsByPartName, genExpr,
); err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}

// Walk backward through partValueExprs, so partition values with fewest
Expand Down Expand Up @@ -617,10 +605,9 @@ func selectPartitionExprsByName(
}
}

for range partDesc.Range {
return errors.New("TODO(dan): unsupported for range partitionings")
if part.NumRanges() > 0 {
log.Fatal(evalCtx.Context, "TODO(dan): unsupported for range partitionings")
}

return nil
}

Expand Down
11 changes: 7 additions & 4 deletions pkg/ccl/partitionccl/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1342,12 +1342,12 @@ func TestRepartitioning(t *testing.T) {
} else {
fmt.Fprintf(&repartition, `ALTER INDEX %s@%s `, test.new.parsed.tableName, testIndex.GetName())
}
if testIndex.GetPartitioning().NumColumns == 0 {
if testIndex.GetPartitioning().NumColumns() == 0 {
repartition.WriteString(`PARTITION BY NOTHING`)
} else {
if err := sql.ShowCreatePartitioning(
&rowenc.DatumAlloc{}, keys.SystemSQLCodec, test.new.parsed.tableDesc, testIndex,
&testIndex.IndexDesc().Partitioning, &repartition, 0 /* indent */, 0, /* colOffset */
testIndex.GetPartitioning(), &repartition, 0 /* indent */, 0, /* colOffset */
); err != nil {
t.Fatalf("%+v", err)
}
Expand All @@ -1357,8 +1357,11 @@ func TestRepartitioning(t *testing.T) {
// Verify that repartitioning removes zone configs for partitions that
// have been removed.
newPartitionNames := map[string]struct{}{}
for _, name := range test.new.parsed.tableDesc.PartitionNames() {
newPartitionNames[name] = struct{}{}
for _, index := range test.new.parsed.tableDesc.NonDropIndexes() {
_ = index.GetPartitioning().ForEachPartitionName(func(name string) error {
newPartitionNames[name] = struct{}{}
return nil
})
}
for _, row := range sqlDB.QueryStr(
t, "SELECT partition_name FROM crdb_internal.zones WHERE partition_name IS NOT NULL") {
Expand Down
Loading

0 comments on commit 696e9e4

Please sign in to comment.