Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: improve performance store attestation #1596

Merged
merged 4 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions app/controlplane/configs/config.devel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ data:
database:
driver: pgx
source: postgresql://postgres:@${DB_HOST:0.0.0.0}/controlplane
max_open_conns: 5
min_open_conns: 1
max_conn_idle_time: 120s
# max_open_conns: 5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an accidental change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really, I uncommented it so we can reproduce locally the issues, and in the end I left it like this. Do you think we should remove it?

# min_open_conns: 1
# max_conn_idle_time: 120s

# Development credentials for the SSO authentication roundtrip
auth:
Expand Down
36 changes: 16 additions & 20 deletions app/controlplane/internal/service/attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,30 +255,26 @@ func (s *AttestationService) Store(ctx context.Context, req *cpAPI.AttestationSe
if err != nil {
return nil, handleUseCaseErr(err, s.log)
}
go func(ctx context.Context) {
// Store the exploded attestation referrer information in the DB
if err := s.referrerUseCase.ExtractAndPersist(ctx, envelope, wf.ID.String()); err != nil {
_ = handleUseCaseErr(err, s.log)
return
}
//
// // Store the exploded attestation referrer information in the DB
if err := s.referrerUseCase.ExtractAndPersist(ctx, envelope, wf.ID.String()); err != nil {
return nil, handleUseCaseErr(err, s.log)
}

if !casBackend.Inline {
// Store the mappings in the DB
references, err := s.casMappingUseCase.LookupDigestsInAttestation(envelope)
if err != nil {
_ = handleUseCaseErr(err, s.log)
return
}
if !casBackend.Inline {
// Store the mappings in the DB
references, err := s.casMappingUseCase.LookupDigestsInAttestation(envelope)
if err != nil {
return nil, handleUseCaseErr(err, s.log)
}

for _, ref := range references {
s.log.Infow("msg", "creating CAS mapping", "name", ref.Name, "digest", ref.Digest, "workflowRun", req.WorkflowRunId, "casBackend", casBackend.ID.String())
if _, err := s.casMappingUseCase.Create(ctx, ref.Digest, casBackend.ID.String(), req.WorkflowRunId); err != nil {
_ = handleUseCaseErr(err, s.log)
return
}
for _, ref := range references {
s.log.Infow("msg", "creating CAS mapping", "name", ref.Name, "digest", ref.Digest, "workflowRun", req.WorkflowRunId, "casBackend", casBackend.ID.String())
if _, err := s.casMappingUseCase.Create(ctx, ref.Digest, casBackend.ID.String(), req.WorkflowRunId); err != nil {
return nil, handleUseCaseErr(err, s.log)
}
}
}(context.Background()) // reset context
}

secretName := casBackend.SecretName

Expand Down
55 changes: 23 additions & 32 deletions app/controlplane/pkg/data/referrer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,9 @@ func NewReferrerRepo(data *Data, wfRepo biz.WorkflowRepo, logger log.Logger) biz
}
}

type storedReferrerMap map[string]*ent.Referrer
type storedReferrerMap map[string]uuid.UUID

func (r *ReferrerRepo) Save(ctx context.Context, referrers []*biz.Referrer, workflowID uuid.UUID) (err error) {
// Start transaction
tx, err := r.data.DB.Tx(ctx)
if err != nil {
return fmt.Errorf("failed to create transaction: %w", err)
}

defer func() {
// Unblock the row if there was an error
if err != nil {
_ = tx.Rollback()
}
}()

// find the workflow
wf, err := r.workflowRepo.FindByID(ctx, workflowID)
if err != nil {
Expand All @@ -69,50 +56,54 @@ func (r *ReferrerRepo) Save(ctx context.Context, referrers []*biz.Referrer, work

storedMap := make(storedReferrerMap)

for _, r := range referrers {
for _, ref := range referrers {
// Check if it exists already, if not create it
err := tx.Referrer.Create().
SetDigest(r.Digest).SetKind(r.Kind).SetDownloadable(r.Downloadable).
SetMetadata(r.Metadata).SetAnnotations(r.Annotations).
storedID := r.data.DB.Referrer.Create().
SetDigest(ref.Digest).SetKind(ref.Kind).SetDownloadable(ref.Downloadable).
SetMetadata(ref.Metadata).SetAnnotations(ref.Annotations).
AddWorkflowIDs(workflowID).
OnConflictColumns(
referrer.FieldDigest, referrer.FieldKind,
).UpdateNewValues().Exec(ctx)
).UpdateNewValues().IDX(ctx)
if err != nil {
return fmt.Errorf("failed to create referrer: %w", err)
}

storedRef, err := tx.Referrer.Query().Where(referrer.Digest(r.Digest), referrer.Kind(r.Kind)).Only(ctx)
storedRef, err := r.data.DB.Referrer.Query().Select(referrer.FieldID).Where(referrer.ID(storedID)).First(ctx)
if err != nil {
return fmt.Errorf("failed to load referrer: %w", err)
} else if storedRef == nil {
return fmt.Errorf("failed to load referrer: %w", err)
}

// Store it in the map
storedMap[r.MapID()] = storedRef
storedMap[ref.MapID()] = storedRef.ID
}

// 2 - define the relationship between referrers
for _, r := range referrers {
for _, parentRef := range referrers {
// This is the current item stored in DB
storedReferrer := storedMap[r.MapID()]
storedReferrer := storedMap[parentRef.MapID()]
// Iterate on the items it refer to (references)
for _, ref := range r.References {
var references []uuid.UUID
for _, ref := range parentRef.References {
// amd find it in the DB
storedReference, ok := storedMap[ref.MapID()]
if !ok {
return fmt.Errorf("referrer %v not found", ref)
}

// Create the relationship
_, err := storedReferrer.Update().AddReferenceIDs(storedReference.ID).Save(ctx)
if err != nil {
return fmt.Errorf("failed to create referrer relationship: %w", err)
}
references = append(references, storedReference)
}
}

if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
if len(references) == 0 {
continue
}

// Create the relationship
if err := r.data.DB.Referrer.UpdateOneID(storedReferrer).AddReferenceIDs(references...).Exec(ctx); err != nil {
return fmt.Errorf("failed to create referrer relationship: %w", err)
}
}

return nil
Expand Down
1 change: 1 addition & 0 deletions devel/compose.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ services:
- ALLOW_EMPTY_PASSWORD=yes
- POSTGRESQL_DATABASE=controlplane
- POSTGRESQL_USERNAME=postgres
- POSTGRESQL_SHARED_PRELOAD_LIBRARIES=pgaudit,pg_stat_statements
healthcheck:
test: ["CMD", "pg_isready", "-U", "postgres"]
interval: 2s
Expand Down
Loading