Skip to content

Commit

Permalink
Merge pull request #99989 from dt/backport23.1-99508-99878
Browse files Browse the repository at this point in the history
release-23.1: jobs: change job_info.info_key to string
  • Loading branch information
dt authored Mar 29, 2023
2 parents 4a208d9 + cc8e633 commit aec78f3
Show file tree
Hide file tree
Showing 14 changed files with 127 additions and 65 deletions.
3 changes: 3 additions & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ go_test(
shard_count = 16,
deps = [
"//pkg/base",
"//pkg/ccl/backupccl",
"//pkg/cloud/impl:cloudimpl",
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
Expand Down Expand Up @@ -137,6 +139,7 @@ go_test(
"//pkg/sql/sqlliveness",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
Expand Down
56 changes: 28 additions & 28 deletions pkg/jobs/job_info_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (i InfoStorage) checkClaimSession(ctx context.Context) error {
return nil
}

func (i InfoStorage) get(ctx context.Context, infoKey []byte) ([]byte, bool, error) {
func (i InfoStorage) get(ctx context.Context, infoKey string) ([]byte, bool, error) {
if i.txn == nil {
return nil, false, errors.New("cannot access the job info table without an associated txn")
}
Expand All @@ -82,7 +82,7 @@ func (i InfoStorage) get(ctx context.Context, infoKey []byte) ([]byte, bool, err
row, err := i.txn.QueryRowEx(
ctx, "job-info-get", i.txn.KV(),
sessiondata.NodeUserSessionDataOverride,
"SELECT value FROM system.job_info WHERE job_id = $1 AND info_key = $2",
"SELECT value FROM system.job_info WHERE job_id = $1 AND info_key::string = $2 ORDER BY written LIMIT 1",
j.ID(), infoKey,
)

Expand All @@ -102,13 +102,13 @@ func (i InfoStorage) get(ctx context.Context, infoKey []byte) ([]byte, bool, err
return []byte(*value), true, nil
}

func (i InfoStorage) write(ctx context.Context, infoKey, value []byte) error {
func (i InfoStorage) write(ctx context.Context, infoKey string, value []byte) error {
return i.doWrite(ctx, func(ctx context.Context, j *Job, txn isql.Txn) error {
// First clear out any older revisions of this info.
_, err := txn.ExecEx(
ctx, "write-job-info-delete", txn.KV(),
sessiondata.NodeUserSessionDataOverride,
"DELETE FROM system.job_info WHERE job_id = $1 AND info_key = $2",
"DELETE FROM system.job_info WHERE job_id = $1 AND info_key::string = $2",
j.ID(), infoKey,
)
if err != nil {
Expand Down Expand Up @@ -156,8 +156,8 @@ func (i InfoStorage) doWrite(
func (i InfoStorage) iterate(
ctx context.Context,
iterMode iterateMode,
infoPrefix []byte,
fn func(infoKey, value []byte) error,
infoPrefix string,
fn func(infoKey string, value []byte) error,
) (retErr error) {
if i.txn == nil {
return errors.New("cannot iterate over the job info table without an associated txn")
Expand All @@ -184,31 +184,31 @@ func (i InfoStorage) iterate(
FROM system.job_info
WHERE job_id = $1 AND info_key >= $2 AND info_key < $3
`+iterConfig,
i.j.ID(), infoPrefix, roachpb.Key(infoPrefix).PrefixEnd(),
i.j.ID(), infoPrefix, string(roachpb.Key(infoPrefix).PrefixEnd()),
)
if err != nil {
return err
}
defer func(it isql.Rows) { retErr = errors.CombineErrors(retErr, it.Close()) }(rows)

var prevKey []byte
var prevKey string
var ok bool
for ok, err = rows.Next(ctx); ok; ok, err = rows.Next(ctx) {
if err != nil {
return err
}
row := rows.Cur()

key, ok := row[0].(*tree.DBytes)
key, ok := row[0].(*tree.DString)
if !ok {
return errors.AssertionFailedf("job info: expected info_key to be DBytes (was %T)", row[0])
return errors.AssertionFailedf("job info: expected info_key to be string (was %T)", row[0])
}
infoKey := []byte(*key)
infoKey := string(*key)

if bytes.Equal(infoKey, prevKey) {
if infoKey == prevKey {
continue
}
prevKey = append(prevKey[:0], infoKey...)
prevKey = infoKey

value, ok := row[1].(*tree.DBytes)
if !ok {
Expand All @@ -223,29 +223,29 @@ func (i InfoStorage) iterate(
}

// Get fetches the latest info record for the given job and infoKey.
func (i InfoStorage) Get(ctx context.Context, infoKey []byte) ([]byte, bool, error) {
func (i InfoStorage) Get(ctx context.Context, infoKey string) ([]byte, bool, error) {
return i.get(ctx, infoKey)
}

// Write writes the provided value to an info record for the provided jobID and
// infoKey after removing any existing info records for that job and infoKey
// using the same transaction, effectively replacing any older row with a row
// with the new value.
func (i InfoStorage) Write(ctx context.Context, infoKey, value []byte) error {
func (i InfoStorage) Write(ctx context.Context, infoKey string, value []byte) error {
if value == nil {
return errors.AssertionFailedf("missing value (infoKey %q)", string(infoKey))
return errors.AssertionFailedf("missing value (infoKey %q)", infoKey)
}
return i.write(ctx, infoKey, value)
}

// Delete removes the info record for the provided infoKey.
func (i InfoStorage) Delete(ctx context.Context, infoKey []byte) error {
func (i InfoStorage) Delete(ctx context.Context, infoKey string) error {
return i.write(ctx, infoKey, nil /* value */)
}

// DeleteRange removes the info records between the provided
// start key (inclusive) and end key (exclusive).
func (i InfoStorage) DeleteRange(ctx context.Context, startInfoKey, endInfoKey []byte) error {
func (i InfoStorage) DeleteRange(ctx context.Context, startInfoKey, endInfoKey string) error {
return i.doWrite(ctx, func(ctx context.Context, j *Job, txn isql.Txn) error {
_, err := txn.ExecEx(
ctx, "write-job-info-delete", txn.KV(),
Expand All @@ -259,15 +259,15 @@ func (i InfoStorage) DeleteRange(ctx context.Context, startInfoKey, endInfoKey [

// Iterate iterates though the info records for a given job and info key prefix.
func (i InfoStorage) Iterate(
ctx context.Context, infoPrefix []byte, fn func(infoKey, value []byte) error,
ctx context.Context, infoPrefix string, fn func(infoKey string, value []byte) error,
) (retErr error) {
return i.iterate(ctx, iterateAll, infoPrefix, fn)
}

// GetLast calls fn on the last info record whose key matches the
// given prefix.
func (i InfoStorage) GetLast(
ctx context.Context, infoPrefix []byte, fn func(infoKey, value []byte) error,
ctx context.Context, infoPrefix string, fn func(infoKey string, value []byte) error,
) (retErr error) {
return i.iterate(ctx, getLast, infoPrefix, fn)
}
Expand All @@ -286,32 +286,32 @@ const (

// GetLegacyPayloadKey returns the info_key whose value is the jobspb.Payload of
// the job.
func GetLegacyPayloadKey() []byte {
return []byte(legacyPayloadKey)
func GetLegacyPayloadKey() string {
return legacyPayloadKey
}

// GetLegacyProgressKey returns the info_key whose value is the jobspb.Progress
// of the job.
func GetLegacyProgressKey() []byte {
return []byte(legacyProgressKey)
func GetLegacyProgressKey() string {
return legacyProgressKey
}

// GetLegacyPayload returns the job's Payload from the system.jobs_info table.
func (i InfoStorage) GetLegacyPayload(ctx context.Context) ([]byte, bool, error) {
return i.Get(ctx, []byte(legacyPayloadKey))
return i.Get(ctx, legacyPayloadKey)
}

// WriteLegacyPayload writes the job's Payload to the system.jobs_info table.
func (i InfoStorage) WriteLegacyPayload(ctx context.Context, payload []byte) error {
return i.Write(ctx, []byte(legacyPayloadKey), payload)
return i.Write(ctx, legacyPayloadKey, payload)
}

// GetLegacyProgress returns the job's Progress from the system.jobs_info table.
func (i InfoStorage) GetLegacyProgress(ctx context.Context) ([]byte, bool, error) {
return i.Get(ctx, []byte(legacyProgressKey))
return i.Get(ctx, legacyProgressKey)
}

// WriteLegacyProgress writes the job's Progress to the system.jobs_info table.
func (i InfoStorage) WriteLegacyProgress(ctx context.Context, progress []byte) error {
return i.Write(ctx, []byte(legacyProgressKey), progress)
return i.Write(ctx, legacyProgressKey, progress)
}
80 changes: 69 additions & 11 deletions pkg/jobs/job_info_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,24 @@ package jobs_test

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" // import ccl to be able to run backups
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keyvisualizer"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -56,11 +63,11 @@ func TestJobInfoAccessors(t *testing.T) {
job1 := createJob(1)
job2 := createJob(2)
job3 := createJob(3)
kPrefix, kA, kB, kC, kD := []byte("🔑"), []byte("🔑A"), []byte("🔑B"), []byte("🔑C"), []byte("🔑D")
kPrefix, kA, kB, kC, kD := "🔑", "🔑A", "🔑B", "🔑C", "🔑D"
v1, v2, v3 := []byte("val1"), []byte("val2"), []byte("val3")

// Key doesn't exist yet.
getJobInfo := func(j *jobs.Job, key []byte) (v []byte, ok bool, err error) {
getJobInfo := func(j *jobs.Job, key string) (v []byte, ok bool, err error) {
err = idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := j.InfoStorage(txn)
v, ok, err = infoStorage.Get(ctx, key)
Expand Down Expand Up @@ -138,7 +145,7 @@ func TestJobInfoAccessors(t *testing.T) {
var i int
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
return infoStorage.Iterate(ctx, kPrefix, func(key, value []byte) error {
return infoStorage.Iterate(ctx, kPrefix, func(key string, value []byte) error {
i++
switch i {
case 1:
Expand All @@ -162,7 +169,7 @@ func TestJobInfoAccessors(t *testing.T) {
i = 0
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
return infoStorage.GetLast(ctx, kPrefix, func(key, value []byte) error {
return infoStorage.GetLast(ctx, kPrefix, func(key string, value []byte) error {
i++
require.Equal(t, key, kC)
require.Equal(t, v3, value)
Expand All @@ -175,7 +182,7 @@ func TestJobInfoAccessors(t *testing.T) {
found := false
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
return infoStorage.Iterate(ctx, kA, func(key, value []byte) error {
return infoStorage.Iterate(ctx, kA, func(key string, value []byte) error {
require.Equal(t, kA, key)
require.Equal(t, v2, value)
found = true
Expand All @@ -193,7 +200,7 @@ func TestJobInfoAccessors(t *testing.T) {
i = 0
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
return infoStorage.Iterate(ctx, kPrefix, func(key, value []byte) error {
return infoStorage.Iterate(ctx, kPrefix, func(key string, value []byte) error {
i++
require.Equal(t, key, kC)
return nil
Expand All @@ -204,7 +211,7 @@ func TestJobInfoAccessors(t *testing.T) {
// Iterate a different job.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job3.InfoStorage(txn)
return infoStorage.Iterate(ctx, kPrefix, func(key, value []byte) error {
return infoStorage.Iterate(ctx, kPrefix, func(key string, value []byte) error {
t.Fatalf("unexpected record for job 3: %v = %v", key, value)
return nil
})
Expand Down Expand Up @@ -252,7 +259,7 @@ func TestAccessorsWithWrongSQLLivenessSession(t *testing.T) {
require.NoError(t, err)
require.NoError(t, ief.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job.InfoStorage(txn)
return infoStorage.Write(ctx, []byte("foo"), []byte("baz"))
return infoStorage.Write(ctx, "foo", []byte("baz"))
}))

require.NoError(t, ief.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
Expand All @@ -265,14 +272,14 @@ func TestAccessorsWithWrongSQLLivenessSession(t *testing.T) {

err = ief.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job.InfoStorage(txn)
return infoStorage.Write(ctx, []byte("foo"), []byte("bar"))
return infoStorage.Write(ctx, "foo", []byte("bar"))
})
require.True(t, testutils.IsError(err, "expected session.*but found"))

// A Get should still succeed even with an invalid session id.
require.NoError(t, ief.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job.InfoStorage(txn)
val, exists, err := infoStorage.Get(ctx, []byte("foo"))
val, exists, err := infoStorage.Get(ctx, "foo")
if err != nil {
return err
}
Expand All @@ -284,9 +291,60 @@ func TestAccessorsWithWrongSQLLivenessSession(t *testing.T) {
// Iterate should still succeed even with an invalid session id.
require.NoError(t, ief.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job.InfoStorage(txn)
return infoStorage.Iterate(ctx, []byte("foo"), func(infoKey, value []byte) error {
return infoStorage.Iterate(ctx, "foo", func(infoKey string, value []byte) error {
require.Equal(t, value, []byte("baz"))
return nil
})
}))
}

// TestJobInfoUpgradeRegressionTests is a regression test where a job that is
// created before V23_1JobInfoTableIsBackfilled and continues to run during the
// V23_1JobInfoTableIsBackfilled upgrade will have duplicate payload and
// progress rows in the job_info table. Prior to the fix this caused the
// InfoStorage read path to error out on seeing more than one row per jobID,
// info_key.
func TestJobInfoUpgradeRegressionTests(t *testing.T) {
defer leaktest.AfterTest(t)()

s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
DisableDefaultTestTenant: true,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: make(chan struct{}),
BinaryVersionOverride: clusterversion.ByKey(clusterversion.BinaryMinSupportedVersionKey),
BootstrapVersionKeyOverride: clusterversion.BinaryMinSupportedVersionKey,
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
})
ctx := context.Background()
defer s.Stopper().Stop(ctx)

_, err := sqlDB.Exec(`SET CLUSTER SETTING version = $1`, clusterversion.V23_1CreateSystemJobInfoTable.String())
require.NoError(t, err)

_, err = sqlDB.Exec(`SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.after.write_lock'`)
require.NoError(t, err)

var jobID jobspb.JobID
require.NoError(t, sqlDB.QueryRow(`BACKUP INTO 'userfile:///foo' WITH detached`).Scan(&jobID))
runner := sqlutils.MakeSQLRunner(sqlDB)
jobutils.WaitForJobToPause(t, runner, jobID)

runner.CheckQueryResults(t, fmt.Sprintf(`SELECT count(*) FROM system.job_info WHERE job_id = %d`, jobID),
[][]string{{"2"}})

_, err = sqlDB.Exec(`SET CLUSTER SETTING version = $1`, clusterversion.V23_1JobInfoTableIsBackfilled.String())
require.NoError(t, err)

runner.CheckQueryResults(t, fmt.Sprintf(`SELECT count(*) FROM system.job_info WHERE job_id = %d`, jobID),
[][]string{{"4"}})

err = s.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := jobs.InfoStorageForJob(txn, jobID)
_, _, err := infoStorage.Get(ctx, jobs.GetLegacyPayloadKey())
return err
})
require.NoError(t, err)
}
Loading

0 comments on commit aec78f3

Please sign in to comment.