Skip to content

Commit

Permalink
Add concurrency to backfill-redis (#1504)
Browse files Browse the repository at this point in the history
* Add concurrency to backfill-redis

Signed-off-by: Cody Soyland <codysoyland@github.com>

* Add result summary

Signed-off-by: Cody Soyland <codysoyland@github.com>

---------

Signed-off-by: Cody Soyland <codysoyland@github.com>
  • Loading branch information
codysoyland committed May 25, 2023
1 parent 795a236 commit 3adca0d
Showing 1 changed file with 99 additions and 33 deletions.
132 changes: 99 additions & 33 deletions cmd/backfill-redis/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
To run:
go run cmd/backfill-redis/main.go --rekor-address <address> \
--hostname <redis-hostname> --port <redis-port>
--start <first index to backfill> --end <last index to backfill>
--hostname <redis-hostname> --port <redis-port> --concurrency <num-of-workers> \
--start <first index to backfill> --end <last index to backfill> [--dry-run]
*/

package main
Expand All @@ -30,13 +30,17 @@ import (
"bytes"
"context"
"encoding/base64"
"errors"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/go-openapi/runtime"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
"sigs.k8s.io/release-utils/version"

"github.com/sigstore/rekor/pkg/client"
Expand Down Expand Up @@ -66,6 +70,8 @@ var (
endIndex = flag.Int("end", -1, "Last index to backfill")
rekorAddress = flag.String("rekor-address", "", "Address for Rekor, e.g. https://rekor.sigstore.dev")
versionFlag = flag.Bool("version", false, "Print the current version of Backfill Redis")
concurrency = flag.Int("concurrency", 1, "Number of workers to use for backfill")
dryRun = flag.Bool("dry-run", false, "Dry run - don't actually insert into Redis")
)

func main() {
Expand Down Expand Up @@ -106,45 +112,99 @@ func main() {
log.Fatalf("creating rekor client: %v", err)
}

for i := *startIndex; i <= *endIndex; i++ {
params := entries.NewGetLogEntryByIndexParamsWithContext(context.Background())
params.SetLogIndex(int64(i))
resp, err := rekorClient.Entries.GetLogEntryByIndex(params)
if err != nil {
log.Fatalf("retrieving log uuid by index: %v", err)
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
group, ctx := errgroup.WithContext(ctx)
group.SetLimit(*concurrency)

type result struct {
index int
parseErrs []error
insertErrs []error
}
var resultChan = make(chan result)
parseErrs := make([]int, 0)
insertErrs := make([]int, 0)

go func() {
for r := range resultChan {
if len(r.parseErrs) > 0 {
parseErrs = append(parseErrs, r.index)
}
if len(r.insertErrs) > 0 {
insertErrs = append(insertErrs, r.index)
}
}
var insertErrs []error
for uuid, entry := range resp.Payload {
// uuid is the global UUID - tree ID and entry UUID
e, _, _, err := unmarshalEntryImpl(entry.Body.(string))
}()

for i := *startIndex; i <= *endIndex; i++ {
index := i // capture loop variable for closure
group.Go(func() error {
params := entries.NewGetLogEntryByIndexParamsWithContext(ctx)
params.SetLogIndex(int64(index))
resp, err := rekorClient.Entries.GetLogEntryByIndex(params)
if err != nil {
insertErrs = append(insertErrs, fmt.Errorf("error unmarshalling entry for %s: %v", uuid, err))
continue
// in case of sigterm, just return to exit gracefully
if errors.Is(err, context.Canceled) {
return nil
}
log.Fatalf("retrieving log uuid by index: %v", err)
}
keys, err := e.IndexKeys()
if err != nil {
insertErrs = append(insertErrs, fmt.Errorf("error building index keys for %s: %v", uuid, err))
continue
var parseErrs []error
var insertErrs []error
for uuid, entry := range resp.Payload {
// uuid is the global UUID - tree ID and entry UUID
e, _, _, err := unmarshalEntryImpl(entry.Body.(string))
if err != nil {
parseErrs = append(parseErrs, fmt.Errorf("error unmarshalling entry for %s: %v", uuid, err))
continue
}
keys, err := e.IndexKeys()
if err != nil {
parseErrs = append(parseErrs, fmt.Errorf("error building index keys for %s: %v", uuid, err))
continue
}
for _, key := range keys {
// remove the key-value pair from the index in case it already exists
if err := removeFromIndex(ctx, redisClient, key, uuid); err != nil {
insertErrs = append(insertErrs, fmt.Errorf("error removing UUID %s with key %s: %v", uuid, key, err))
}
if err := addToIndex(ctx, redisClient, key, uuid); err != nil {
insertErrs = append(insertErrs, fmt.Errorf("error inserting UUID %s with key %s: %v", uuid, key, err))
}
fmt.Printf("Uploaded Redis entry %s, index %d, key %s\n", uuid, index, key)
}
}
for _, key := range keys {
// remove the key-value pair from the index in case it already exists
if err := removeFromIndex(context.Background(), redisClient, key, uuid); err != nil {
insertErrs = append(insertErrs, fmt.Errorf("error removing UUID %s with key %s: %v", uuid, key, err))
if len(insertErrs) != 0 || len(parseErrs) != 0 {
fmt.Printf("Errors with log index %d:\n", index)
for _, e := range insertErrs {
fmt.Println(e)
}
if err := addToIndex(context.Background(), redisClient, key, uuid); err != nil {
insertErrs = append(insertErrs, fmt.Errorf("error inserting UUID %s with key %s: %v", uuid, key, err))
for _, e := range parseErrs {
fmt.Println(e)
}
fmt.Printf("Uploaded Redis entry %s, index %d, key %s\n", uuid, i, key)
} else {
fmt.Printf("Completed log index %d\n", index)
}
}
if len(insertErrs) != 0 {
fmt.Printf("Errors with log index %d:\n", i)
for _, e := range insertErrs {
fmt.Println(e)
resultChan <- result{
index: index,
parseErrs: parseErrs,
insertErrs: insertErrs,
}
} else {
fmt.Printf("Completed log index %d\n", i)
}

return nil
})
}
err = group.Wait()
if err != nil {
log.Fatalf("error running backfill: %v", err)
}
close(resultChan)
fmt.Println("Backfill complete")
if len(parseErrs) > 0 {
fmt.Printf("Failed to parse %d entries: %v\n", len(parseErrs), parseErrs)
}
if len(insertErrs) > 0 {
fmt.Printf("Failed to insert/remove %d entries: %v\n", len(insertErrs), insertErrs)
}
}

Expand All @@ -171,12 +231,18 @@ func unmarshalEntryImpl(e string) (types.EntryImpl, string, string, error) {
// removeFromIndex removes all occurrences of a value from a given key. This guards against
// multiple invocations of backfilling creating duplicates.
func removeFromIndex(ctx context.Context, redisClient *redis.Client, key, value string) error {
if *dryRun {
return nil
}
_, err := redisClient.LRem(ctx, key, 0, value).Result()
return err
}

// addToIndex pushes a value onto a key of type list.
func addToIndex(ctx context.Context, redisClient *redis.Client, key, value string) error {
if *dryRun {
return nil
}
_, err := redisClient.LPush(ctx, key, value).Result()
return err
}

0 comments on commit 3adca0d

Please sign in to comment.