Skip to content

Commit

Permalink
fix: decouple cloud storage tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hperl committed Sep 20, 2022
1 parent 1daecb6 commit c1ed811
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 96 deletions.
21 changes: 15 additions & 6 deletions credentials/fetcher_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,31 @@ type FetcherDefault struct {
mux *blob.URLMux
}

type FetcherOption func(f *FetcherDefault)

func WithURLMux(mux *blob.URLMux) FetcherOption {
return func(f *FetcherDefault) { f.mux = mux }
}

// NewFetcherDefault returns a new JWKS Fetcher with:
//
// - cancelAfter: If reached, the fetcher will stop waiting for responses and return an error.
// - waitForResponse: While the fetcher might stop waiting for responses, we will give the server more time to respond
// and add the keys to the registry unless waitForResponse is reached in which case we'll terminate the request.
func NewFetcherDefault(l *logrusx.Logger, cancelAfter time.Duration, ttl time.Duration) *FetcherDefault {
return &FetcherDefault{
func NewFetcherDefault(l *logrusx.Logger, cancelAfter time.Duration, ttl time.Duration, opts ...FetcherOption) *FetcherDefault {
f := &FetcherDefault{
cancelAfter: cancelAfter,
l: l,
ttl: ttl,
keys: make(map[string]jose.JSONWebKeySet),
fetchedAt: make(map[string]time.Time),
client: httpx.NewResilientClient(
httpx.ResilientClientWithConnectionTimeout(15 * time.Second),
).StandardClient(),
mux: cloudstorage.NewURLMux(),
client: httpx.NewResilientClient(httpx.ResilientClientWithConnectionTimeout(15 * time.Second)).StandardClient(),
mux: cloudstorage.NewURLMux(),
}
for _, o := range opts {
o(f)
}
return f
}

func (s *FetcherDefault) ResolveSets(ctx context.Context, locations []url.URL) ([]jose.JSONWebKeySet, error) {
Expand Down Expand Up @@ -200,6 +208,7 @@ func (s *FetcherDefault) resolveAll(done chan struct{}, errs chan error, locatio
var wg sync.WaitGroup

for _, l := range locations {
l := l
wg.Add(1)
go s.resolve(&wg, errs, l)
}
Expand Down
35 changes: 16 additions & 19 deletions credentials/fetcher_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@ var sets = [...]json.RawMessage{
}

func TestFetcherDefault(t *testing.T) {
t.Parallel()

const maxWait = time.Millisecond * 100
const JWKsTTL = maxWait * 7
const timeoutServerDelay = maxWait * 2

t.Cleanup(func() {
cloudstorage.SetCurrentTest(nil)
})

l := logrusx.New("", "", logrusx.ForceLevel(logrus.DebugLevel))
w := herodot.NewJSONWriter(l)
s := NewFetcherDefault(l, maxWait, JWKsTTL)
Expand Down Expand Up @@ -109,21 +107,23 @@ func TestFetcherDefault(t *testing.T) {
})

t.Run("name=should find the key even if the upstream server is no longer active", func(t *testing.T) {
t.Parallel()
fastServer.Close()
key, err := s.ResolveKey(context.Background(), uris, "392e1a6b-6ae1-48b8-bea3-2fe09447805c", "sig")
require.NoError(t, err)
assert.Equal(t, "392e1a6b-6ae1-48b8-bea3-2fe09447805c", key.KeyID)
})

time.Sleep(maxWait)
t.Run("name=should no longer find the key if the remote does not find it", func(t *testing.T) {
time.Sleep(maxWait)

t.Run("name=should no longer find the key if the remote does not find it", func(t *testing.T) {
key, err := s.ResolveKey(context.Background(), uris, "392e1a6b-6ae1-48b8-bea3-2fe09447805c", "sig")
require.NoError(t, err)
assert.Equal(t, "392e1a6b-6ae1-48b8-bea3-2fe09447805c", key.KeyID)
key, err := s.ResolveKey(context.Background(), uris, "392e1a6b-6ae1-48b8-bea3-2fe09447805c", "sig")
require.NoError(t, err)
assert.Equal(t, "392e1a6b-6ae1-48b8-bea3-2fe09447805c", key.KeyID)
})
})

t.Run("name=should fetch keys from the file system", func(t *testing.T) {
t.Parallel()
key, err := s.ResolveKey(context.Background(), uris, "81be3441-5303-4c52-b00d-bbdfadc75633", "sig")
require.NoError(t, err)
assert.Equal(t, "81be3441-5303-4c52-b00d-bbdfadc75633", key.KeyID)
Expand Down Expand Up @@ -172,10 +172,9 @@ func TestFetcherDefault(t *testing.T) {
})

t.Run("name=should fetch from s3 object storage", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
cloudstorage.SetCurrentTest(t)

s := NewFetcherDefault(l, maxWait, JWKsTTL)
s := NewFetcherDefault(l, maxWait, JWKsTTL, WithURLMux(cloudstorage.NewTestURLMux(t)))

key, err := s.ResolveKey(ctx, []url.URL{
*urlx.ParseOrPanic("s3://oathkeeper-test-bucket/path/prefix/jwks.json"),
Expand All @@ -185,10 +184,9 @@ func TestFetcherDefault(t *testing.T) {
})

t.Run("name=should fetch from gs object storage", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
cloudstorage.SetCurrentTest(t)

s := NewFetcherDefault(l, maxWait, JWKsTTL)
s := NewFetcherDefault(l, maxWait, JWKsTTL, WithURLMux(cloudstorage.NewTestURLMux(t)))

key, err := s.ResolveKey(ctx, []url.URL{
*urlx.ParseOrPanic("gs://oathkeeper-test-bucket/path/prefix/jwks.json"),
Expand All @@ -198,10 +196,9 @@ func TestFetcherDefault(t *testing.T) {
})

t.Run("name=should fetch from azure object storage", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
cloudstorage.SetCurrentTest(t)

s := NewFetcherDefault(l, maxWait, JWKsTTL)
s := NewFetcherDefault(l, maxWait, JWKsTTL, WithURLMux(cloudstorage.NewTestURLMux(t)))

jwkKey, err := s.ResolveKey(ctx, []url.URL{
*urlx.ParseOrPanic("azblob://path/prefix/jwks.json"),
Expand Down
124 changes: 59 additions & 65 deletions internal/cloudstorage/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,88 +29,82 @@ import (
"github.com/Azure/azure-storage-blob-go/azblob"
)

var currentTest *testing.T = nil
func NewTestURLMux(t *testing.T) *blob.URLMux {
ctx := context.Background()
mux := new(blob.URLMux)

func SetCurrentTest(t *testing.T) {
currentTest = t
}
// Prepare S3 client
awsErr, awsSession, awsDone := NewAWSSession(t, "us-west-1")

func NewURLMux() *blob.URLMux {
if nil == currentTest {
return blob.DefaultURLMux()
} else {
ctx := context.Background()
mux := new(blob.URLMux)
if nil == awsErr {
s3UrlOpener := new(s3blob.URLOpener)
s3UrlOpener.ConfigProvider = awsSession

// Prepare S3 client
awsErr, awsSession, awsDone := NewAWSSession(currentTest, "us-west-1")
mux.RegisterBucket(s3blob.Scheme, s3UrlOpener)
}

if nil == awsErr {
s3UrlOpener := new(s3blob.URLOpener)
s3UrlOpener.ConfigProvider = awsSession
// Prepare GCS client
gcpError, gcpClient, gcpDone := NewGCPClient(ctx, t)

mux.RegisterBucket(s3blob.Scheme, s3UrlOpener)
}
if nil == gcpError {
gcsUrlOpener := new(gcsblob.URLOpener)
gcsUrlOpener.Client = gcpClient

// Prepare GCS client
gcpError, gcpClient, gcpDone := NewGCPClient(ctx, currentTest)
mux.RegisterBucket(gcsblob.Scheme, gcsUrlOpener)
}

if nil == gcpError {
gcsUrlOpener := new(gcsblob.URLOpener)
gcsUrlOpener.Client = gcpClient
// Prepare Azure client
accountName := azureblob.AccountName("oathkeepertestbucket")

mux.RegisterBucket(gcsblob.Scheme, gcsUrlOpener)
var key azureblob.AccountKey
if *Record {
name, err := azureblob.DefaultAccountName()
if err != nil {
t.Fatal(err)
}

// Prepare Azure client
accountName := azureblob.AccountName("oathkeepertestbucket")

var key azureblob.AccountKey
if *Record {
name, err := azureblob.DefaultAccountName()
if err != nil {
currentTest.Fatal(err)
}
if name != accountName {
currentTest.Fatalf("Please update the accountName constant to match your settings file so future records work (%q vs %q)", name, accountName)
}
key, err = azureblob.DefaultAccountKey()
if err != nil {
currentTest.Fatal(err)
}
} else {
// In replay mode, we use fake credentials.
key = azureblob.AccountKey(base64.StdEncoding.EncodeToString([]byte("FAKECREDS")))
if name != accountName {
t.Fatalf("Please update the accountName constant to match your settings file so future records work (%q vs %q)", name, accountName)
}

credential, err := azureblob.NewCredential(accountName, key)
key, err = azureblob.DefaultAccountKey()
if err != nil {
require.NoError(currentTest, err)
t.Fatal(err)
}
} else {
// In replay mode, we use fake credentials.
key = azureblob.AccountKey(base64.StdEncoding.EncodeToString([]byte("FAKECREDS")))
}

credential, err := azureblob.NewCredential(accountName, key)
if err != nil {
require.NoError(t, err)
}

azureError, azureClient, azureDone := NewAzureTestPipeline(t, "blob", credential)
if nil == azureError {
azureUrlOpener := new(azureblob.URLOpener)
azureUrlOpener.Pipeline = azureClient
azureUrlOpener.AccountName = accountName

azureError, azureClient, azureDone := NewAzureTestPipeline(currentTest, "blob", credential)
if nil == azureError {
azureUrlOpener := new(azureblob.URLOpener)
azureUrlOpener.Pipeline = azureClient
azureUrlOpener.AccountName = accountName
mux.RegisterBucket(azureblob.Scheme, azureUrlOpener)
}

mux.RegisterBucket(azureblob.Scheme, azureUrlOpener)
t.Cleanup(func() {
if nil != awsDone {
awsDone()
}
if nil != gcpDone {
gcpDone()
}
if nil != azureDone {
azureDone()
}
})

currentTest.Cleanup(func() {
if nil != awsDone {
awsDone()
}
if nil != gcpDone {
gcpDone()
}
if nil != azureDone {
azureDone()
}
})
return mux
}

return mux
}
func NewURLMux() *blob.URLMux {
return blob.DefaultURLMux()
}

// Record is true iff the tests are being run in "record" mode.
Expand Down
10 changes: 9 additions & 1 deletion rule/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@

package rule

import "context"
import (
"context"

"gocloud.dev/blob"
)

type Fetcher interface {
Watch(ctx context.Context) error
}

type URLMuxSetter interface {
SetURLMux(mux *blob.URLMux)
}
4 changes: 4 additions & 0 deletions rule/fetcher_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func NewFetcherDefault(
}
}

func (f *FetcherDefault) SetURLMux(mux *blob.URLMux) {
f.mux = mux
}

func (f *FetcherDefault) configUpdate(ctx context.Context, watcher *fsnotify.Watcher, replace []url.URL, events chan event) error {
var directoriesToWatch []string
var filesBeingWatched []string
Expand Down
7 changes: 2 additions & 5 deletions rule/fetcher_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,13 +400,9 @@ func TestFetcherWatchRepositoryFromKubernetesConfigMap(t *testing.T) {
}

func TestFetchRulesFromObjectStorage(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Cleanup(func() {
cloudstorage.SetCurrentTest(nil)
})

cloudstorage.SetCurrentTest(t)

configFile, _ := os.CreateTemp(t.TempDir(), ".oathkeeper-*.yml")
configFile.WriteString(`
Expand All @@ -428,6 +424,7 @@ access_rules:
configx.WithConfigFiles(configFile.Name()),
)
r := internal.NewRegistry(conf)
r.RuleFetcher().(rule.URLMuxSetter).SetURLMux(cloudstorage.NewTestURLMux(t))

go func() {
require.NoError(t, r.RuleFetcher().Watch(ctx))
Expand Down

0 comments on commit c1ed811

Please sign in to comment.