Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serve locally stored fbc content via an http server #148

Merged
merged 1 commit into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package main
import (
"flag"
"fmt"
"net/http"
"os"
"time"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
Expand All @@ -35,6 +37,7 @@ import (
"github.com/spf13/pflag"

"github.com/operator-framework/catalogd/internal/source"
"github.com/operator-framework/catalogd/internal/third_party/server"
"github.com/operator-framework/catalogd/internal/version"
corecontrollers "github.com/operator-framework/catalogd/pkg/controllers/core"
"github.com/operator-framework/catalogd/pkg/features"
Expand Down Expand Up @@ -67,6 +70,7 @@ func main() {
catalogdVersion bool
systemNamespace string
storageDir string
catalogServerAddr string
)
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
joelanford marked this conversation as resolved.
Show resolved Hide resolved
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
Expand All @@ -77,6 +81,7 @@ func main() {
flag.StringVar(&unpackImage, "unpack-image", "quay.io/operator-framework/rukpak:v0.12.0", "The unpack image to use when unpacking catalog images")
flag.StringVar(&systemNamespace, "system-namespace", "", "The namespace catalogd uses for internal state, configuration, and workloads")
flag.StringVar(&storageDir, "catalogs-storage-dir", "/var/cache/catalogs", "The directory in the filesystem where unpacked catalog content will be stored and served from")
flag.StringVar(&catalogServerAddr, "catalogs-server-addr", ":8083", "The address where the unpacked catalogs' content will be accessible")
flag.BoolVar(&profiling, "profiling", false, "enable profiling endpoints to allow for using pprof")
flag.BoolVar(&catalogdVersion, "version", false, "print the catalogd version and exit")
opts := zap.Options{
Expand Down Expand Up @@ -122,10 +127,27 @@ func main() {
if err := os.MkdirAll(storageDir, 0700); err != nil {
setupLog.Error(err, "unable to create storage directory for catalogs")
}
localStorage := storage.LocalDir{RootDir: storageDir}
shutdownTimeout := 30 * time.Second
catalogServer := server.Server{
Kind: "catalogs",
Server: &http.Server{
Addr: catalogServerAddr,
Handler: localStorage.StorageServerHandler(),
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
},
ShutdownTimeout: &shutdownTimeout,
}
if err := mgr.Add(&catalogServer); err != nil {
setupLog.Error(err, "unable to start catalog server")
os.Exit(1)
}

if err = (&corecontrollers.CatalogReconciler{
Client: mgr.GetClient(),
Unpacker: unpacker,
Storage: storage.LocalDir{RootDir: storageDir},
Storage: localStorage,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Catalog")
os.Exit(1)
Expand Down
3 changes: 2 additions & 1 deletion config/default/manager_auth_proxy_patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ spec:
- "--metrics-bind-address=127.0.0.1:8080"
anik120 marked this conversation as resolved.
Show resolved Hide resolved
- "--leader-elect"
- "--catalogs-storage-dir=/var/cache/catalogs"
- "--feature-gates=CatalogMetadataAPI=true,HTTPServer=false"
- "--feature-gates=CatalogMetadataAPI=true,HTTPServer=true"

13 changes: 13 additions & 0 deletions config/rbac/catalogserver_service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: v1
kind: Service
metadata:
name: catalogserver
namespace: system
spec:
selector:
control-plane: controller-manager
ports:
- name: http
protocol: TCP
port: 80
targetPort: 8083
1 change: 1 addition & 0 deletions config/rbac/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ resources:
- role_binding.yaml
- leader_election_role.yaml
- leader_election_role_binding.yaml
- catalogserver_service.yaml
# Comment the following 4 lines if you want to disable
# the auth proxy (https://github.com/brancz/kube-rbac-proxy)
# which protects your /metrics endpoint.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.20

require (
github.com/blang/semver/v4 v4.0.0
github.com/go-logr/logr v1.2.4
github.com/google/go-cmp v0.5.9
github.com/nlepage/go-tarfs v1.1.0
github.com/onsi/ginkgo/v2 v2.9.7
Expand Down Expand Up @@ -31,7 +32,6 @@ require (
github.com/go-git/gcfg v1.5.0 // indirect
github.com/go-git/go-billy/v5 v5.4.1 // indirect
github.com/go-git/go-git/v5 v5.4.2 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/zapr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
Expand Down
122 changes: 122 additions & 0 deletions internal/third_party/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
Copyright 2022 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// this is copied from https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/77b08a845e451b695cfa25b79ebe277d85064345/pkg/manager/server.go
// we will remove this once we update to a version of controller-runitme that has this included
// https://github.com/kubernetes-sigs/controller-runtime/pull/2473

package server

import (
"context"
"errors"
"net"
"net/http"
"time"

"github.com/go-logr/logr"

crlog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

var (
_ manager.Runnable = (*Server)(nil)
_ manager.LeaderElectionRunnable = (*Server)(nil)
)

// Server is a general purpose HTTP server Runnable for a manager.
// It is used to serve some internal handlers for health probes and profiling,
// but it can also be used to run custom servers.
type Server struct {
// Kind is an optional string that describes the purpose of the server. It is used in logs to distinguish
// among multiple servers.
Kind string

// Log is the logger used by the server. If not set, a logger will be derived from the context passed to Start.
Log logr.Logger

// Server is the HTTP server to run. It is required.
Server *http.Server

// Listener is an optional listener to use. If not set, the server start a listener using the server.Addr.
// Using a listener is useful when the port reservation needs to happen in advance of this runnable starting.
Listener net.Listener

// OnlyServeWhenLeader is an optional bool that indicates that the server should only be started when the manager is the leader.
OnlyServeWhenLeader bool

// ShutdownTimeout is an optional duration that indicates how long to wait for the server to shutdown gracefully. If not set,
// the server will wait indefinitely for all connections to close.
ShutdownTimeout *time.Duration
}

// Start starts the server. It will block until the server is stopped or an error occurs.
func (s *Server) Start(ctx context.Context) error {
log := s.Log
if log.GetSink() == nil {
log = crlog.FromContext(ctx)
}
if s.Kind != "" {
log = log.WithValues("kind", s.Kind)
}
log = log.WithValues("addr", s.addr())

serverShutdown := make(chan struct{})
go func() {
<-ctx.Done()
log.Info("shutting down server")

shutdownCtx := context.Background()
if s.ShutdownTimeout != nil {
var shutdownCancel context.CancelFunc
shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), *s.ShutdownTimeout)
defer shutdownCancel()
}

if err := s.Server.Shutdown(shutdownCtx); err != nil {
log.Error(err, "error shutting down server")
}
close(serverShutdown)
}()

log.Info("starting server")
if err := s.serve(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}

<-serverShutdown
return nil
}

// NeedLeaderElection returns true if the server should only be started when the manager is the leader.
func (s *Server) NeedLeaderElection() bool {
return s.OnlyServeWhenLeader
}

func (s *Server) addr() string {
if s.Listener != nil {
return s.Listener.Addr().String()
}
return s.Server.Addr
}

func (s *Server) serve() error {
if s.Listener != nil {
return s.Server.Serve(s.Listener)
}
return s.Server.ListenAndServe()
}
5 changes: 5 additions & 0 deletions pkg/controllers/core/catalog_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io/fs"
"net/http"
"os"
"testing/fstest"

Expand Down Expand Up @@ -67,6 +68,10 @@ func (m MockStore) Delete(_ string) error {
return nil
}

func (m MockStore) StorageServerHandler() http.Handler {
panic("not needed")
anik120 marked this conversation as resolved.
Show resolved Hide resolved
}

var _ = Describe("Catalogd Controller Test", func() {
format.MaxLength = 0
var (
Expand Down
34 changes: 34 additions & 0 deletions pkg/storage/localdir.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"fmt"
"io/fs"
"net/http"
"os"
"path/filepath"

Expand Down Expand Up @@ -45,3 +46,36 @@
func (s LocalDir) Delete(catalog string) error {
return os.RemoveAll(filepath.Join(s.RootDir, catalog))
}

func (s LocalDir) StorageServerHandler() http.Handler {
mux := http.NewServeMux()
mux.Handle("/catalogs/", http.StripPrefix("/catalogs/", http.FileServer(http.FS(&filesOnlyFilesystem{os.DirFS(s.RootDir)}))))
return mux
}

// filesOnlyFilesystem is a file system that can open only regular
// files from the underlying filesystem. All other file types result
// in os.ErrNotExists
type filesOnlyFilesystem struct {
FS fs.FS
}

// Open opens a named file from the underlying filesystem. If the file
// is not a regular file, it return os.ErrNotExists. Callers are resposible
// for closing the file returned.
func (f *filesOnlyFilesystem) Open(name string) (fs.File, error) {
file, err := f.FS.Open(name)
joelanford marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
stat, err := file.Stat()
if err != nil {
_ = file.Close()
return nil, err

Check warning on line 74 in pkg/storage/localdir.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/localdir.go#L73-L74

Added lines #L73 - L74 were not covered by tests
}
if !stat.Mode().IsRegular() {
_ = file.Close()
return nil, os.ErrNotExist
}
return file, nil
}
67 changes: 66 additions & 1 deletion pkg/storage/localdir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package storage

import (
"fmt"
"io"
"io/fs"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing/fstest"
Expand Down Expand Up @@ -34,8 +37,8 @@ var _ = Describe("LocalDir Storage Test", func() {
)
BeforeEach(func() {
d, err := os.MkdirTemp(GinkgoT().TempDir(), "cache")
rootDir = d
Expect(err).ToNot(HaveOccurred())
rootDir = d

store = LocalDir{RootDir: rootDir}
unpackResultFS = &fstest.MapFS{
Expand Down Expand Up @@ -76,6 +79,68 @@ var _ = Describe("LocalDir Storage Test", func() {
})
})

var _ = Describe("LocalDir Server Handler tests", func() {
var (
testServer *httptest.Server
store LocalDir
)
BeforeEach(func() {
d, err := os.MkdirTemp(GinkgoT().TempDir(), "cache")
Expect(err).ToNot(HaveOccurred())
Expect(os.Mkdir(filepath.Join(d, "test-catalog"), 0700)).To(Succeed())
store = LocalDir{RootDir: d}
testServer = httptest.NewServer(store.StorageServerHandler())

})
It("gets 404 for the path /", func() {
expectNotFound(testServer.URL)
})
It("gets 404 for the path /catalogs/", func() {
expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/"))
})
It("gets 404 for the path /catalogs/test-catalog/", func() {
expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/"))
})
It("gets 404 for the path /test-catalog/foo.txt", func() {
// This ensures that even if the file exists, the URL must contain the /catalogs/ prefix
Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", "foo.txt"), []byte("bar"), 0600)).To(Succeed())
expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/test-catalog/foo.txt"))
})
It("gets 404 for the path /catalogs/test-catalog/non-existent.txt", func() {
expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/non-existent.txt"))
})
It("gets 200 for the path /catalogs/foo.txt", func() {
expectedContent := []byte("bar")
Expect(os.WriteFile(filepath.Join(store.RootDir, "foo.txt"), expectedContent, 0600)).To(Succeed())
expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/foo.txt"), expectedContent)
})
It("gets 200 for the path /catalogs/test-catalog/foo.txt", func() {
expectedContent := []byte("bar")
Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", "foo.txt"), expectedContent, 0600)).To(Succeed())
expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/foo.txt"), expectedContent)
})
AfterEach(func() {
testServer.Close()
})
})

func expectNotFound(url string) {
resp, err := http.Get(url) //nolint:gosec
Expect(err).To(Not(HaveOccurred()))
Expect(resp.StatusCode).To(Equal(http.StatusNotFound))
Expect(resp.Body.Close()).To(Succeed())
}

func expectFound(url string, expectedContent []byte) {
resp, err := http.Get(url) //nolint:gosec
Expect(err).To(Not(HaveOccurred()))
Expect(resp.StatusCode).To(Equal(http.StatusOK))
actualContent, err := io.ReadAll(resp.Body)
Expect(err).To(Not(HaveOccurred()))
Expect(actualContent).To(Equal(expectedContent))
Expect(resp.Body.Close()).To(Succeed())
}

const testBundleTemplate = `---
image: %s
name: %s
Expand Down
Loading
Loading