Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobs: pass op-name to info gets #138101

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (r *Registry) loadJobForResume(
progress := &jobspb.Progress{}
if err := r.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job.InfoStorage(txn)
payloadBytes, exists, err := infoStorage.GetLegacyPayload(ctx)
payloadBytes, exists, err := infoStorage.GetLegacyPayload(ctx, "loadForResume")
if err != nil {
return err
}
Expand All @@ -323,7 +323,7 @@ func (r *Registry) loadJobForResume(
return err
}

progressBytes, exists, err := infoStorage.GetLegacyProgress(ctx)
progressBytes, exists, err := infoStorage.GetLegacyProgress(ctx, "loadForResume")
if err != nil {
return err
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/jobs/job_info_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,12 +379,12 @@ func (i *InfoStorage) checkClaimSession(ctx context.Context) error {
return nil
}

func (i InfoStorage) get(ctx context.Context, infoKey string) ([]byte, bool, error) {
func (i InfoStorage) get(ctx context.Context, opName, infoKey string) ([]byte, bool, error) {
if i.txn == nil {
return nil, false, errors.New("cannot access the job info table without an associated txn")
}

ctx, sp := tracing.ChildSpan(ctx, "get-job-info")
ctx, sp := tracing.ChildSpan(ctx, opName)
defer sp.Finish()

j := i.j
Expand Down Expand Up @@ -536,8 +536,8 @@ func (i InfoStorage) iterate(
}

// Get fetches the latest info record for the given job and infoKey.
func (i InfoStorage) Get(ctx context.Context, infoKey string) ([]byte, bool, error) {
return i.get(ctx, infoKey)
func (i InfoStorage) Get(ctx context.Context, opName, infoKey string) ([]byte, bool, error) {
return i.get(ctx, opName, infoKey)
}

// Write writes the provided value to an info record for the provided jobID and
Expand Down Expand Up @@ -651,8 +651,8 @@ func GetLegacyProgressKey() string {
}

// GetLegacyPayload returns the job's Payload from the system.job_info table.
func (i InfoStorage) GetLegacyPayload(ctx context.Context) ([]byte, bool, error) {
return i.Get(ctx, LegacyPayloadKey)
func (i InfoStorage) GetLegacyPayload(ctx context.Context, opName string) ([]byte, bool, error) {
return i.Get(ctx, opName, LegacyPayloadKey)
}

// WriteLegacyPayload writes the job's Payload to the system.job_info table.
Expand All @@ -661,8 +661,8 @@ func (i InfoStorage) WriteLegacyPayload(ctx context.Context, payload []byte) err
}

// GetLegacyProgress returns the job's Progress from the system.job_info table.
func (i InfoStorage) GetLegacyProgress(ctx context.Context) ([]byte, bool, error) {
return i.Get(ctx, LegacyProgressKey)
func (i InfoStorage) GetLegacyProgress(ctx context.Context, opName string) ([]byte, bool, error) {
return i.Get(ctx, opName, LegacyProgressKey)
}

// WriteLegacyProgress writes the job's Progress to the system.job_info table.
Expand Down
12 changes: 7 additions & 5 deletions pkg/jobs/job_info_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestJobInfoAccessors(t *testing.T) {
getJobInfo := func(j *jobs.Job, key string) (v []byte, ok bool, err error) {
err = idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := j.InfoStorage(txn)
v, ok, err = infoStorage.Get(ctx, key)
v, ok, err = infoStorage.Get(ctx, "getJobInfo", key)
return err
})
return v, ok, err
Expand Down Expand Up @@ -226,6 +226,8 @@ func TestJobInfoAccessors(t *testing.T) {
return nil
}))

const opTest = "test"

// Verify we see 4 rows (c, e, f, g) in the prefix.
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job2.InfoStorage(txn)
Expand All @@ -234,7 +236,7 @@ func TestJobInfoAccessors(t *testing.T) {
return err
}
require.Equal(t, 4, count)
_, ok, err := infoStorage.Get(ctx, kC)
_, ok, err := infoStorage.Get(ctx, opTest, kC)
if err != nil {
return err
}
Expand All @@ -256,12 +258,12 @@ func TestJobInfoAccessors(t *testing.T) {
return err
}
require.Equal(t, 2, count)
_, ok, err := infoStorage.Get(ctx, kC)
_, ok, err := infoStorage.Get(ctx, opTest, kC)
if err != nil {
return err
}
require.False(t, ok)
_, ok, err = infoStorage.Get(ctx, kF)
_, ok, err = infoStorage.Get(ctx, opTest, kF)
if err != nil {
return err
}
Expand Down Expand Up @@ -339,7 +341,7 @@ func TestAccessorsWithWrongSQLLivenessSession(t *testing.T) {
// A Get should still succeed even with an invalid session id.
require.NoError(t, ief.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := job.InfoStorage(txn)
val, exists, err := infoStorage.Get(ctx, "foo")
val, exists, err := infoStorage.Get(ctx, "test", "foo")
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func (j *Job) loadJobPayloadAndProgress(
progress := &jobspb.Progress{}
infoStorage := j.InfoStorage(txn)

payloadBytes, exists, err := infoStorage.GetLegacyPayload(ctx)
payloadBytes, exists, err := infoStorage.GetLegacyPayload(ctx, "loadJobPayloadAndProgress")
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to get payload for job %d", j.ID())
}
Expand All @@ -640,7 +640,7 @@ func (j *Job) loadJobPayloadAndProgress(
return nil, nil, err
}

progressBytes, exists, err := infoStorage.GetLegacyProgress(ctx)
progressBytes, exists, err := infoStorage.GetLegacyProgress(ctx, "loadJobPayloadAndProgress")
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to get progress for job %d", j.ID())
}
Expand Down Expand Up @@ -977,7 +977,7 @@ func GetJobTraceID(ctx context.Context, db isql.DB, jobID jobspb.JobID) (tracing
var traceID tracingpb.TraceID
if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
jobInfo := InfoStorageForJob(txn, jobID)
progressBytes, exists, err := jobInfo.GetLegacyProgress(ctx)
progressBytes, exists, err := jobInfo.GetLegacyProgress(ctx, "GetJobTraceID")
if err != nil {
return err
}
Expand Down Expand Up @@ -1009,7 +1009,7 @@ func LoadJobProgress(
if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := InfoStorageForJob(txn, jobID)
var err error
progressBytes, exists, err = infoStorage.GetLegacyProgress(ctx)
progressBytes, exists, err = infoStorage.GetLegacyProgress(ctx, "LoadJobProgress")
return err
}); err != nil || !exists {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestUpdaterUpdatesJobInfo(t *testing.T) {
expectedProgress jobspb.Progress) {
infoStorage := createdJob.InfoStorage(txn)

payload, exists, err := infoStorage.GetLegacyPayload(ctx)
payload, exists, err := infoStorage.GetLegacyPayload(ctx, "verifyPayloadAndProgress")
require.NoError(t, err)
require.True(t, exists)
data, err := protoutil.Marshal(&expectedPayload)
Expand All @@ -85,7 +85,7 @@ func TestUpdaterUpdatesJobInfo(t *testing.T) {
}
require.Equal(t, data, payload)

progress, exists, err := infoStorage.GetLegacyProgress(ctx)
progress, exists, err := infoStorage.GetLegacyProgress(ctx, "verifyPayloadAndProgress")
require.NoError(t, err)
require.True(t, exists)
data, err = protoutil.Marshal(&expectedProgress)
Expand Down
Loading