Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
68467: storage: add limits to skipped data iteration r=sumeerbhola a=aliher1911

Previously when iterating engine using  MVCCIncrementalIterator caller
could skip large amounts of non-matching data which would result in
"unbounded" resource usage.
This is becoming a problem for resource constrained clusters where low
priority requests like export that are used by backups to interfere with
high priority workloads. If we want to throttle backups we need to be able
to limit how many underlying operations we want to perform per request.
This change adds an optional iteration limit to the iterator. Once the
limit is reached, iterator will return an error. Error will provide a
resume key to continue iteration in next request.

Release note: None

Fixes #68234

70881: sql: implement information_schema.{_pg_numeric_precision,_pg_numeric_precision_radix,_pg_numeric_scale} r=rafiss a=e-mbrown

fixes #70872 

This commit adds implementation for the `information_schema._pg_numeric_precision`, `information_schema._pg_numeric_precision_radix` and `information_schema._pg_numeric_scale` builtin functions. 

These functions return the precision, and scale of an exact numeric type as well as the base the precision and scale are in.

Release note (sql change): The `information_schema._pg_numeric_precision`, `information_schema._pg_numeric_precision_radix` and `information_schema._pg_numeric_scale` builtin functions are nowsupported, which improves compatability with PostgreSQL.

70947: workload: use the same ctx for ramp and main load r=otan,tbg a=rafiss

fixes #70019
fixes #69545
fixes #69523

We have seen issues where a worker returns ErrBadConn immediately after
the ramp period is done. It comes down to the fact that connections in
the pool are marked as "bad" when a context is canceled.

The underlying issue might be a race condition in how lib/pq cancels the
context and then marks the connection as "bad." But I'm not really sure
how to fix that and it's hard to reproduce, so I'm working around the
problem instead.

Release note: None

Co-authored-by: Oleg Afanasyev <oleg@cockroachlabs.com>
Co-authored-by: e-mbrown <ebsonari@gmail.com>
Co-authored-by: Rafi Shamim <rafi@cockroachlabs.com>
  • Loading branch information
4 people committed Oct 5, 2021
4 parents f811374 + e608905 + 4071762 + a3ff7d5 commit cf4fe62
Show file tree
Hide file tree
Showing 21 changed files with 780 additions and 203 deletions.
6 changes: 6 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3152,6 +3152,12 @@ SELECT * FROM crdb_internal.check_consistency(true, ‘\x02’, ‘\x04’)</p>
</span></td></tr>
<tr><td><a name="has_type_privilege"></a><code>has_type_privilege(user: oid, type: oid, privilege: <a href="string.html">string</a>) &rarr; <a href="bool.html">bool</a></code></td><td><span class="funcdesc"><p>Returns whether or not the user has privileges for type.</p>
</span></td></tr>
<tr><td><a name="information_schema._pg_numeric_precision"></a><code>information_schema._pg_numeric_precision(typid: oid, typmod: int4) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>Returns the precision of the given type with type modifier</p>
</span></td></tr>
<tr><td><a name="information_schema._pg_numeric_precision_radix"></a><code>information_schema._pg_numeric_precision_radix(typid: oid, typmod: int4) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>Returns the radix of the given type with type modifier</p>
</span></td></tr>
<tr><td><a name="information_schema._pg_numeric_scale"></a><code>information_schema._pg_numeric_scale(typid: oid, typmod: int4) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>Returns the scale of the given type with type modifier</p>
</span></td></tr>
<tr><td><a name="oid"></a><code>oid(int: <a href="int.html">int</a>) &rarr; oid</code></td><td><span class="funcdesc"><p>Converts an integer to an OID.</p>
</span></td></tr>
<tr><td><a name="pg_collation_for"></a><code>pg_collation_for(str: anyelement) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>Returns the collation of the argument</p>
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
Expand Down
31 changes: 29 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/types"
Expand Down Expand Up @@ -58,6 +59,20 @@ var ExportRequestMaxAllowedFileSizeOverage = settings.RegisterByteSizeSetting(
64<<20, /* 64 MiB */
).WithPublic()

// exportRequestMaxIterationTime controls time spent by export request iterating
// over data in underlying storage. This threshold preventing export request from
// holding locks for too long and preventing non mvcc operations from progressing.
// If request takes longer than this threshold it would stop and return already
// collected data and allow caller to use resume span to continue.
var exportRequestMaxIterationTime = settings.RegisterDurationSetting(
"kv.bulk_sst.max_request_time",
"if set, limits amount of time spent in export requests; "+
"if export request can not finish within allocated time it will resume from the point it stopped in "+
"subsequent request",
// Feature is disabled by default.
0,
)

func init() {
RegisterReadOnlyCommand(roachpb.Export, declareKeysExport, evalExport)
}
Expand Down Expand Up @@ -134,6 +149,8 @@ func evalExport(
maxSize = targetSize + uint64(allowedOverage)
}

maxRunTime := exportRequestMaxIterationTime.Get(&cArgs.EvalCtx.ClusterSettings().SV)

// Time-bound iterators only make sense to use if the start time is set.
useTBI := args.EnableTimeBoundIteratorOptimization && !args.StartTime.IsEmpty()
// Only use resume timestamp if splitting mid key is enabled.
Expand All @@ -145,8 +162,18 @@ func evalExport(
var curSizeOfExportedSSTs int64
for start := args.Key; start != nil; {
destFile := &storage.MemFile{}
summary, resume, resumeTS, err := reader.ExportMVCCToSst(ctx, start, args.EndKey, args.StartTime,
h.Timestamp, resumeKeyTS, exportAllRevisions, targetSize, maxSize, args.SplitMidKey, useTBI, destFile)
summary, resume, resumeTS, err := reader.ExportMVCCToSst(ctx, storage.ExportOptions{
StartKey: storage.MVCCKey{Key: start, Timestamp: resumeKeyTS},
EndKey: args.EndKey,
StartTS: args.StartTime,
EndTS: h.Timestamp,
ExportAllRevisions: exportAllRevisions,
TargetSize: targetSize,
MaxSize: maxSize,
StopMidKey: args.SplitMidKey,
UseTBI: useTBI,
ResourceLimiter: storage.NewResourceLimiter(storage.ResourceLimiterOptions{MaxRunTime: maxRunTime}, timeutil.DefaultTimeSource{}),
}, destFile)
if err != nil {
if errors.HasType(err, (*storage.ExceedMaxSizeError)(nil)) {
err = errors.WithHintf(err,
Expand Down
26 changes: 22 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,17 @@ func assertEqualKVs(
prevStart := start
prevTs := resumeTs
sstFile := &storage.MemFile{}
summary, start, resumeTs, err = e.ExportMVCCToSst(ctx, start, endKey, startTime, endTime, resumeTs,
bool(exportAllRevisions), targetSize, maxSize, bool(stopMidKey), bool(enableTimeBoundIteratorOptimization), sstFile)
summary, start, resumeTs, err = e.ExportMVCCToSst(ctx, storage.ExportOptions{
StartKey: storage.MVCCKey{Key: start, Timestamp: resumeTs},
EndKey: endKey,
StartTS: startTime,
EndTS: endTime,
ExportAllRevisions: bool(exportAllRevisions),
TargetSize: targetSize,
MaxSize: maxSize,
StopMidKey: bool(stopMidKey),
UseTBI: bool(enableTimeBoundIteratorOptimization),
}, sstFile)
require.NoError(t, err)
sst = sstFile.Data()
loaded := loadSST(t, sst, startKey, endKey)
Expand Down Expand Up @@ -634,8 +643,17 @@ func assertEqualKVs(
if dataSizeWhenExceeded == maxSize {
maxSize--
}
_, _, _, err = e.ExportMVCCToSst(ctx, prevStart, endKey, startTime, endTime, prevTs,
bool(exportAllRevisions), targetSize, maxSize, false, bool(enableTimeBoundIteratorOptimization), &storage.MemFile{})
_, _, _, err = e.ExportMVCCToSst(ctx, storage.ExportOptions{
StartKey: storage.MVCCKey{Key: prevStart, Timestamp: prevTs},
EndKey: endKey,
StartTS: startTime,
EndTS: endTime,
ExportAllRevisions: bool(exportAllRevisions),
TargetSize: targetSize,
MaxSize: maxSize,
StopMidKey: false,
UseTBI: bool(enableTimeBoundIteratorOptimization),
}, &storage.MemFile{})
require.Regexp(t, fmt.Sprintf("export size \\(%d bytes\\) exceeds max size \\(%d bytes\\)",
dataSizeWhenExceeded, maxSize), err)
}
Expand Down
11 changes: 2 additions & 9 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,16 +418,9 @@ func (s spanSetReader) Closed() bool {

// ExportMVCCToSst is part of the storage.Reader interface.
func (s spanSetReader) ExportMVCCToSst(
ctx context.Context,
startKey, endKey roachpb.Key,
startTS, endTS, firstKeyTS hlc.Timestamp,
exportAllRevisions bool,
targetSize, maxSize uint64,
stopMidKey, useTBI bool,
dest io.Writer,
ctx context.Context, exportOptions storage.ExportOptions, dest io.Writer,
) (roachpb.BulkOpSummary, roachpb.Key, hlc.Timestamp, error) {
return s.r.ExportMVCCToSst(ctx, startKey, endKey, startTS, endTS, firstKeyTS, exportAllRevisions, targetSize,
maxSize, stopMidKey, useTBI, dest)
return s.r.ExportMVCCToSst(ctx, exportOptions, dest)
}

func (s spanSetReader) MVCCGet(key storage.MVCCKey) ([]byte, error) {
Expand Down
41 changes: 41 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_builtins
Original file line number Diff line number Diff line change
Expand Up @@ -537,3 +537,44 @@ indexed_b_d_idx {2,4} 1 NULL
indexed_b_d_idx {2,4} 2 1
indexed_b_d_idx {2,4} 3 NULL
indexed_b_d_idx {2,4} 4 2

# information_schema._pg_numeric_precision and information_schema._pg_numeric_precision_radix
# and information_schema._pg_numeric_scale

statement ok
CREATE TABLE numeric (
a SMALLINT,
b INT4,
c BIGINT,
d FLOAT(1),
e FLOAT4,
f FLOAT8,
g FLOAT(40),
h FLOAT,
i DECIMAL(12,2),
j DECIMAL(4,4)
);

query TTIII
SELECT a.attname,
t.typname,
information_schema._pg_numeric_precision(a.atttypid,a.atttypmod),
information_schema._pg_numeric_precision_radix(a.atttypid,a.atttypmod),
information_schema._pg_numeric_scale(a.atttypid,a.atttypmod)
FROM pg_attribute a
JOIN pg_type t
ON a.atttypid = t.oid
WHERE a.attrelid = 'numeric'::regclass
ORDER BY a.attname
----
a int2 16 2 0
b int4 32 2 0
c int8 64 2 0
d float4 24 2 NULL
e float4 24 2 NULL
f float8 53 2 NULL
g float8 53 2 NULL
h float8 53 2 NULL
i numeric 12 10 2
j numeric 4 10 4
rowid int8 64 2 0
87 changes: 87 additions & 0 deletions pkg/sql/sem/builtins/pg_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -2268,6 +2268,93 @@ SELECT description
Volatility: tree.VolatilityStable,
},
),

"information_schema._pg_numeric_precision": makeBuiltin(tree.FunctionProperties{Category: categorySystemInfo},
tree.Overload{
Types: tree.ArgTypes{
{"typid", types.Oid},
{"typmod", types.Int4},
},
ReturnType: tree.FixedReturnType(types.Int),
Fn: func(ctx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
typid := oid.Oid(tree.MustBeDOid(args[0]).DInt)
typmod := tree.MustBeDInt(args[1])
switch typid {
case oid.T_int2:
return tree.NewDInt(16), nil
case oid.T_int4:
return tree.NewDInt(32), nil
case oid.T_int8:
return tree.NewDInt(64), nil
case oid.T_numeric:
if typmod != -1 {
// This logics matches the postgres implementation
// of how to calculate the precision based on the typmod
//https://github.com/postgres/postgres/blob/d84ffffe582b8e036a14c6bc2378df29167f3a00/src/backend/catalog/information_schema.sql#L109
return tree.NewDInt(((typmod - 4) >> 16) & 65535), nil
}
return tree.DNull, nil
case oid.T_float4:
return tree.NewDInt(24), nil
case oid.T_float8:
return tree.NewDInt(53), nil
}
return tree.DNull, nil
},
Info: "Returns the precision of the given type with type modifier",
Volatility: tree.VolatilityImmutable,
},
),

"information_schema._pg_numeric_precision_radix": makeBuiltin(tree.FunctionProperties{Category: categorySystemInfo},
tree.Overload{
Types: tree.ArgTypes{
{"typid", types.Oid},
{"typmod", types.Int4},
},
ReturnType: tree.FixedReturnType(types.Int),
Fn: func(ctx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
typid := oid.Oid(tree.MustBeDOid(args[0]).DInt)
if typid == oid.T_int2 || typid == oid.T_int4 || typid == oid.T_int8 || typid == oid.T_float4 || typid == oid.T_float8 {
return tree.NewDInt(2), nil
} else if typid == oid.T_numeric {
return tree.NewDInt(10), nil
} else {
return tree.DNull, nil
}
},
Info: "Returns the radix of the given type with type modifier",
Volatility: tree.VolatilityImmutable,
},
),

"information_schema._pg_numeric_scale": makeBuiltin(tree.FunctionProperties{Category: categorySystemInfo},
tree.Overload{
Types: tree.ArgTypes{
{"typid", types.Oid},
{"typmod", types.Int4},
},
ReturnType: tree.FixedReturnType(types.Int),
Fn: func(ctx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
typid := oid.Oid(tree.MustBeDOid(args[0]).DInt)
typmod := tree.MustBeDInt(args[1])
if typid == oid.T_int2 || typid == oid.T_int4 || typid == oid.T_int8 {
return tree.NewDInt(0), nil
} else if typid == oid.T_numeric {
if typmod == -1 {
return tree.DNull, nil
}
// This logics matches the postgres implementation
// of how to calculate scale based on the typmod
// https://github.com/postgres/postgres/blob/d84ffffe582b8e036a14c6bc2378df29167f3a00/src/backend/catalog/information_schema.sql#L140
return tree.NewDInt((typmod - 4) & 65535), nil
}
return tree.DNull, nil
},
Info: "Returns the scale of the given type with type modifier",
Volatility: tree.VolatilityImmutable,
},
),
}

func getSessionVar(ctx *tree.EvalContext, settingName string, missingOk bool) (tree.Datum, error) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("//build:STRINGER.bzl", "stringer")

go_library(
name = "storage",
Expand Down Expand Up @@ -28,6 +29,7 @@ go_library(
"pebble_iterator.go",
"pebble_merge.go",
"pebble_mvcc_scanner.go",
"resource_limiter.go",
"row_counter.go",
"slice.go",
"slice_go1.9.go",
Expand All @@ -37,6 +39,7 @@ go_library(
"temp_dir.go",
"temp_engine.go",
"testing_knobs.go",
":gen-resourcelimitreached-stringer", # keep
],
importpath = "github.com/cockroachdb/cockroach/pkg/storage",
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -109,6 +112,7 @@ go_test(
"pebble_file_registry_test.go",
"pebble_mvcc_scanner_test.go",
"pebble_test.go",
"resource_limiter_test.go",
"sst_info_test.go",
"sst_iterator_test.go",
"sst_writer_test.go",
Expand Down Expand Up @@ -160,3 +164,9 @@ go_test(
"@org_golang_x_sync//errgroup",
],
)

stringer(
name = "gen-resourcelimitreached-stringer",
src = "resource_limiter.go",
typ = "ResourceLimitReached",
)
20 changes: 15 additions & 5 deletions pkg/storage/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,10 +653,11 @@ func loadTestData(dir string, numKeys, numBatches, batchTimeSpan, valueBytes int

var batch Batch
var minWallTime int64
batchSize := len(keys) / numBatches
for i, key := range keys {
if scaled := len(keys) / numBatches; (i % scaled) == 0 {
if (i % batchSize) == 0 {
if i > 0 {
log.Infof(ctx, "committing (%d/~%d)", i/scaled, numBatches)
log.Infof(ctx, "committing (%d/~%d)", i/batchSize, numBatches)
if err := batch.Commit(false /* sync */); err != nil {
return nil, err
}
Expand All @@ -666,7 +667,7 @@ func loadTestData(dir string, numKeys, numBatches, batchTimeSpan, valueBytes int
}
}
batch = eng.NewBatch()
minWallTime = sstTimestamps[i/scaled]
minWallTime = sstTimestamps[i/batchSize]
}
timestamp := hlc.Timestamp{WallTime: minWallTime + rand.Int63n(int64(batchTimeSpan))}
value := roachpb.MakeValueFromBytes(randutil.RandBytes(rng, valueBytes))
Expand Down Expand Up @@ -1521,8 +1522,17 @@ func runExportToSst(
for i := 0; i < b.N; i++ {
startTS := hlc.Timestamp{WallTime: int64(numRevisions / 2)}
endTS := hlc.Timestamp{WallTime: int64(numRevisions + 2)}
_, _, _, err := engine.ExportMVCCToSst(context.Background(), keys.LocalMax, roachpb.KeyMax, startTS, endTS, hlc.Timestamp{},
exportAllRevisions, 0 /* targetSize */, 0 /* maxSize */, false, useTBI, noopWriter{})
_, _, _, err := engine.ExportMVCCToSst(context.Background(), ExportOptions{
StartKey: MVCCKey{Key: keys.LocalMax},
EndKey: roachpb.KeyMax,
StartTS: startTS,
EndTS: endTS,
ExportAllRevisions: exportAllRevisions,
TargetSize: 0,
MaxSize: 0,
StopMidKey: false,
UseTBI: useTBI,
}, noopWriter{})
if err != nil {
b.Fatal(err)
}
Expand Down
Loading

0 comments on commit cf4fe62

Please sign in to comment.