Skip to content

Commit

Permalink
move index storage into interface
Browse files Browse the repository at this point in the history
Signed-off-by: Bob Callaway <bcallaway@google.com>
  • Loading branch information
bobcallaway committed Oct 7, 2023
1 parent 75774ff commit 2beb41f
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 17 deletions.
2 changes: 2 additions & 0 deletions cmd/rekor-server/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ Memory and file-based signers should only be used for testing.`)

rootCmd.PersistentFlags().Bool("enable_retrieve_api", true, "enables Redis-based index API endpoint")
_ = rootCmd.PersistentFlags().MarkDeprecated("enable_retrieve_api", "this flag is deprecated in favor of enabled_api_endpoints (searchIndex)")
rootCmd.PersistentFlags().String("search_index.storage_provider", "redis",
`Index Storage provider to use. Valid options are: [redis].`)
rootCmd.PersistentFlags().String("redis_server.address", "127.0.0.1", "Redis server address")
rootCmd.PersistentFlags().Uint16("redis_server.port", 6379, "Redis server port")

Expand Down
24 changes: 15 additions & 9 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/sigstore/rekor/pkg/indexstorage"
"github.com/sigstore/rekor/pkg/log"
"github.com/sigstore/rekor/pkg/pubsub"
"github.com/sigstore/rekor/pkg/sharding"
Expand Down Expand Up @@ -145,9 +146,10 @@ func NewAPI(treeID uint) (*API, error) {
}

var (
api *API
storageClient storage.AttestationStorage
redisClient *redis.Client
api *API
attestationStorageClient storage.AttestationStorage
indexStorageClient indexstorage.IndexStorage
redisClient *redis.Client
)

func ConfigureAPI(treeID uint) {
Expand All @@ -159,21 +161,25 @@ func ConfigureAPI(treeID uint) {
}
if viper.GetBool("enable_retrieve_api") || viper.GetBool("enable_stable_checkpoint") ||
slices.Contains(viper.GetStringSlice("enabled_api_endpoints"), "searchIndex") {
redisClient = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%v:%v", viper.GetString("redis_server.address"), viper.GetUint64("redis_server.port")),
Network: "tcp",
DB: 0, // default DB
})
indexStorageClient, err = indexstorage.NewIndexStorage(viper.GetString("search_index.storage_provider"))
if err != nil {
log.Logger.Panic(err)
}
}

if viper.GetBool("enable_attestation_storage") {
storageClient, err = storage.NewAttestationStorage()
attestationStorageClient, err = storage.NewAttestationStorage()
if err != nil {
log.Logger.Panic(err)
}
}

if viper.GetBool("enable_stable_checkpoint") {
redisClient = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%v:%v", viper.GetString("redis_server.address"), viper.GetUint64("redis_server.port")),
Network: "tcp",
DB: 0, // default DB
})
checkpointPublisher := witness.NewCheckpointPublisher(context.Background(), api.logClient, api.logRanges.ActiveTreeID(),
viper.GetString("rekor_server.hostname"), api.signer, redisClient, viper.GetUint("publish_frequency"), CheckpointPublishCount)

Expand Down
6 changes: 3 additions & 3 deletions pkg/api/entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,14 @@ func logEntryFromLeaf(ctx context.Context, signer signature.Signer, _ trilliancl
attKey := entryWithAtt.AttestationKey()
// if we're given a key by the type logic, let's try that first
if attKey != "" {
att, fetchErr = storageClient.FetchAttestation(ctx, attKey)
att, fetchErr = attestationStorageClient.FetchAttestation(ctx, attKey)
if fetchErr != nil {
log.ContextLogger(ctx).Debugf("error fetching attestation by key, trying by UUID: %s %v", attKey, fetchErr)
}
}
// if looking up by key failed or we weren't able to generate a key, try looking up by uuid
if attKey == "" || fetchErr != nil {
att, fetchErr = storageClient.FetchAttestation(ctx, entryIDstruct.UUID)
att, fetchErr = attestationStorageClient.FetchAttestation(ctx, entryIDstruct.UUID)
if fetchErr != nil {
log.ContextLogger(ctx).Debugf("error fetching attestation by uuid: %s %v", entryIDstruct.UUID, fetchErr)
}
Expand Down Expand Up @@ -244,7 +244,7 @@ func createLogEntry(params entries.CreateLogEntryParams) (models.LogEntry, middl
IntegratedTime: swag.Int64(queuedLeaf.IntegrateTimestamp.AsTime().Unix()),
}

if redisClient != nil {
if indexStorageClient != nil {
go func() {
keys, err := entry.IndexKeys()
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions pkg/api/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func SearchIndexHandler(params index.SearchIndexParams) middleware.Responder {
if params.Query.Hash != "" {
// This must be a valid hash
sha := util.PrefixSHA(params.Query.Hash)
resultUUIDs, err := redisClient.LRange(httpReqCtx, strings.ToLower(sha), 0, -1).Result()
resultUUIDs, err := indexStorageClient.LookupIndices(httpReqCtx, strings.ToLower(sha))
if err != nil {
return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("redis client: %w", err), redisUnexpectedResult)
}
Expand All @@ -72,14 +72,14 @@ func SearchIndexHandler(params index.SearchIndexParams) middleware.Responder {
}

keyHash := sha256.Sum256(canonicalKey)
resultUUIDs, err := redisClient.LRange(httpReqCtx, strings.ToLower(hex.EncodeToString(keyHash[:])), 0, -1).Result()
resultUUIDs, err := indexStorageClient.LookupIndices(httpReqCtx, strings.ToLower(hex.EncodeToString(keyHash[:])))
if err != nil {
return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("redis client: %w", err), redisUnexpectedResult)
}
result.Add(resultUUIDs)
}
if params.Query.Email != "" {
resultUUIDs, err := redisClient.LRange(httpReqCtx, strings.ToLower(params.Query.Email.String()), 0, -1).Result()
resultUUIDs, err := indexStorageClient.LookupIndices(httpReqCtx, strings.ToLower(params.Query.Email.String()))
if err != nil {
return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("redis client: %w", err), redisUnexpectedResult)
}
Expand All @@ -100,15 +100,15 @@ func SearchIndexNotImplementedHandler(_ index.SearchIndexParams) middleware.Resp
}

func addToIndex(ctx context.Context, key, value string) error {
_, err := redisClient.LPush(ctx, key, value).Result()
err := indexStorageClient.WriteIndex(ctx, key, value)
if err != nil {
return fmt.Errorf("redis client: %w", err)
}
return nil
}

func storeAttestation(ctx context.Context, uuid string, attestation []byte) error {
return storageClient.StoreAttestation(ctx, uuid, attestation)
return attestationStorageClient.StoreAttestation(ctx, uuid, attestation)
}

// Uniq is a collection of unique elements.
Expand Down
38 changes: 38 additions & 0 deletions pkg/indexstorage/indexstorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 The Sigstore Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package indexstorage

import (
"context"
"fmt"

"github.com/sigstore/rekor/pkg/indexstorage/redis"
"github.com/spf13/viper"
)

type IndexStorage interface {
LookupIndices(context.Context, string) ([]string, error) // Returns indices for specified key
WriteIndex(context.Context, string, string) error // Writes index for specified key
}

// NewIndexStorage instantiates a new IndexStorage provider based on the requested type
func NewIndexStorage(providerType string) (IndexStorage, error) {
switch providerType {
case redis.ProviderType:
return redis.NewProvider(viper.GetString("redis_server.address"), viper.GetString("redis_server.port"))
default:
return nil, fmt.Errorf("invalid index storage provider type: %v", providerType)
}
}
58 changes: 58 additions & 0 deletions pkg/indexstorage/redis/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2023 The Sigstore Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package redis

import (
"context"
"errors"
"fmt"
"strings"

redis "github.com/redis/go-redis/v9"
)

const ProviderType = "redis"

type IndexStorageProvider struct {
client *redis.Client
}

func NewProvider(address, port string) (*IndexStorageProvider, error) {
provider := &IndexStorageProvider{}
provider.client = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%v:%v", address, port),
Network: "tcp",
DB: 0, // default DB
})
return provider, nil
}

func (isp *IndexStorageProvider) LookupIndices(ctx context.Context, key string) ([]string, error) {
if isp.client == nil {
return []string{}, errors.New("redis client has not been initialized")
}
return isp.client.LRange(ctx, strings.ToLower(key), 0, -1).Result()
}

func (isp *IndexStorageProvider) WriteIndex(ctx context.Context, key, value string) error {
if isp.client == nil {
return errors.New("redis client has not been initialized")
}
_, err := isp.client.LPush(ctx, strings.ToLower(key), value).Result()
if err != nil {
return fmt.Errorf("redis client: %w", err)
}
return nil
}

0 comments on commit 2beb41f

Please sign in to comment.