Skip to content

Commit

Permalink
Merge branch 'main' into ca-rotation-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pierDipi committed Jun 27, 2024
2 parents ec5b4b1 + a6ac811 commit e1c9599
Show file tree
Hide file tree
Showing 965 changed files with 8,457 additions and 231,133 deletions.
8 changes: 8 additions & 0 deletions .github/.dependabot.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Set update schedule for GitHub Actions
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
schedule:
# Check for updates to GitHub Actions every week
interval: "weekly"
4 changes: 1 addition & 3 deletions .github/workflows/kind-e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ jobs:

steps:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.21.x
uses: knative/actions/setup-go@main

# Install the latest release of ko
- name: Install ko
Expand Down
4 changes: 1 addition & 3 deletions .github/workflows/knative-downstream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ jobs:
GOPATH: ${{ github.workspace }}
steps:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.21.x
uses: knative/actions/setup-go@main
- name: Install Dependencies
run: |
go install github.com/google/go-licenses@latest
Expand Down
68 changes: 39 additions & 29 deletions cmd/jobsink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,19 @@ func main() {

logger.Info("Starting the JobSink Ingress")

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {})
featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
logger.Info("Updated", zap.String("name", name), zap.Any("value", value))
})
featureStore.WatchConfigs(configMapWatcher)

// Decorate contexts with the current state of the feature config.
ctxFunc := func(ctx context.Context) context.Context {
return featureStore.ToContext(ctx)
return logging.WithLogger(featureStore.ToContext(ctx), sl)
}

h := &Handler{
k8s: kubeclient.Get(ctx),
lister: jobsink.Get(ctx).Lister(),
logger: logger,
withContext: ctxFunc,
oidcTokenVerifier: auth.NewOIDCTokenVerifier(ctx),
}
Expand All @@ -135,6 +136,12 @@ func main() {
log.Fatal(err)
}

// configMapWatcher does not block, so start it first.
logger.Info("Starting ConfigMap watcher")
if err = configMapWatcher.Start(ctx.Done()); err != nil {
logger.Fatal("Failed to start ConfigMap watcher", zap.Error(err))
}

// Start informers and wait for them to sync.
logger.Info("Starting informers.")
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
Expand All @@ -153,26 +160,28 @@ func main() {
type Handler struct {
k8s kubernetes.Interface
lister sinkslister.JobSinkLister
logger *zap.Logger
withContext func(ctx context.Context) context.Context
oidcTokenVerifier *auth.OIDCTokenVerifier
}

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := h.withContext(r.Context())
logger := logging.FromContext(ctx).Desugar()

if r.Method == http.MethodGet {
h.handleGet(w, r)
h.handleGet(ctx, w, r)
return
}

if r.Method != http.MethodPost {
h.logger.Info("Unexpected HTTP method", zap.String("method", r.Method))
logger.Info("Unexpected HTTP method", zap.String("method", r.Method))
w.WriteHeader(http.StatusBadRequest)
return
}

parts := strings.Split(strings.TrimSuffix(r.RequestURI, "/"), "/")
if len(parts) != 3 {
h.logger.Info("Malformed uri", zap.String("URI", r.RequestURI), zap.Any("parts", parts))
logger.Info("Malformed uri", zap.String("URI", r.RequestURI), zap.Any("parts", parts))
w.WriteHeader(http.StatusBadRequest)
return
}
Expand All @@ -182,55 +191,56 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Name: parts[2],
}

h.logger.Debug("Handling POST request", zap.String("URI", r.RequestURI))
logger.Debug("Handling POST request", zap.String("URI", r.RequestURI))

ctx := h.withContext(r.Context())
features := feature.FromContext(ctx)
logger.Debug("features", zap.Any("features", features))

if features.IsOIDCAuthentication() {
h.logger.Debug("OIDC authentication is enabled")
logger.Debug("OIDC authentication is enabled")

audience := auth.GetAudienceDirect(sinksv.SchemeGroupVersion.WithKind("JobSink"), ref.Namespace, ref.Name)

err := h.oidcTokenVerifier.VerifyJWTFromRequest(ctx, r, &audience, w)
if err != nil {
h.logger.Warn("Error when validating the JWT token in the request", zap.Error(err))
logger.Warn("Error when validating the JWT token in the request", zap.Error(err))
return
}
h.logger.Debug("Request contained a valid JWT. Continuing...")
logger.Debug("Request contained a valid JWT. Continuing...")
}

message := cehttp.NewMessageFromHttpRequest(r)
defer message.Finish(nil)

event, err := binding.ToEvent(r.Context(), message)
if err != nil {
h.logger.Warn("failed to extract event from request", zap.Error(err))
logger.Warn("failed to extract event from request", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

if err := event.Validate(); err != nil {
h.logger.Info("failed to validate event from request", zap.Error(err))
logger.Info("failed to validate event from request", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

js, err := h.lister.JobSinks(ref.Namespace).Get(ref.Name)
if err != nil {
h.logger.Warn("Failed to retrieve jobsink", zap.String("ref", ref.String()), zap.Error(err))
logger.Warn("Failed to retrieve jobsink", zap.String("ref", ref.String()), zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

id := toIdHashLabelValue(event.Source(), event.ID())
h.logger.Debug("Getting job for event", zap.String("URI", r.RequestURI), zap.String("id", id))
logger.Debug("Getting job for event", zap.String("URI", r.RequestURI), zap.String("id", id))

jobs, err := h.k8s.BatchV1().Jobs(js.GetNamespace()).List(r.Context(), metav1.ListOptions{
LabelSelector: jobLabelSelector(ref, id),
Limit: 1,
})
if err != nil {
h.logger.Warn("Failed to retrieve job", zap.Error(err))
logger.Warn("Failed to retrieve job", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand All @@ -242,14 +252,14 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

eventBytes, err := event.MarshalJSON()
if err != nil {
h.logger.Info("Failed to marshal event", zap.Error(err))
logger.Info("Failed to marshal event", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}

jobName := kmeta.ChildName(ref.Name, id)

h.logger.Debug("Creating secret for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName))
logger.Debug("Creating secret for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName))

jobSinkUID := js.GetUID()

Expand Down Expand Up @@ -280,14 +290,14 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

_, err = h.k8s.CoreV1().Secrets(ref.Namespace).Create(r.Context(), secret, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
h.logger.Warn("Failed to create secret", zap.Error(err))
logger.Warn("Failed to create secret", zap.Error(err))

w.Header().Add("Reason", err.Error())
w.WriteHeader(http.StatusInternalServerError)
return
}

h.logger.Debug("Creating job for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName))
logger.Debug("Creating job for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName))

job := js.Spec.Job.DeepCopy()
job.Name = jobName
Expand Down Expand Up @@ -339,7 +349,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

_, err = h.k8s.BatchV1().Jobs(ref.Namespace).Create(r.Context(), job, metav1.CreateOptions{})
if err != nil {
h.logger.Warn("Failed to create job", zap.Error(err))
logger.Warn("Failed to create job", zap.Error(err))

w.Header().Add("Reason", err.Error())
w.WriteHeader(http.StatusInternalServerError)
Expand All @@ -350,10 +360,11 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusAccepted)
}

func (h *Handler) handleGet(w http.ResponseWriter, r *http.Request) {
func (h *Handler) handleGet(ctx context.Context, w http.ResponseWriter, r *http.Request) {
logger := logging.FromContext(ctx)
parts := strings.Split(strings.TrimSuffix(r.RequestURI, "/"), "/")
if len(parts) != 9 {
h.logger.Info("Malformed uri", zap.String("URI", r.RequestURI))
logger.Info("Malformed uri", zap.String("URI", r.RequestURI))
w.WriteHeader(http.StatusBadRequest)
return
}
Expand All @@ -363,21 +374,20 @@ func (h *Handler) handleGet(w http.ResponseWriter, r *http.Request) {
Name: parts[4],
}

h.logger.Debug("Handling GET request", zap.String("URI", r.RequestURI))
logger.Debug("Handling GET request", zap.String("URI", r.RequestURI))

ctx := h.withContext(r.Context())
features := feature.FromContext(ctx)
if features.IsOIDCAuthentication() {
h.logger.Debug("OIDC authentication is enabled")
logger.Debug("OIDC authentication is enabled")

audience := auth.GetAudienceDirect(sinksv.SchemeGroupVersion.WithKind("JobSink"), ref.Namespace, ref.Name)

err := h.oidcTokenVerifier.VerifyJWTFromRequest(ctx, r, &audience, w)
if err != nil {
h.logger.Warn("Error when validating the JWT token in the request", zap.Error(err))
logger.Warn("Error when validating the JWT token in the request", zap.Error(err))
return
}
h.logger.Debug("Request contained a valid JWT. Continuing...")
logger.Debug("Request contained a valid JWT. Continuing...")
}

eventSource := parts[6]
Expand Down
2 changes: 2 additions & 0 deletions cmd/schema/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"knative.dev/hack/schema/registry"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
flowsv1 "knative.dev/eventing/pkg/apis/flows/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
Expand All @@ -47,6 +48,7 @@ func main() {
// Flows
registry.Register(&flowsv1.Sequence{})
registry.Register(&flowsv1.Parallel{})
registry.Register(&eventingv1alpha1.EventPolicy{})

if err := commands.New("knative.dev/eventing").Execute(); err != nil {
log.Fatal("Error during command execution: ", err)
Expand Down
12 changes: 11 additions & 1 deletion cmd/webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
kubeclient "knative.dev/pkg/client/injection/kube/client"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered"

eventingv1beta3 "knative.dev/eventing/pkg/apis/eventing/v1beta3"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/apis/sinks"
sinksv1alpha1 "knative.dev/eventing/pkg/apis/sinks/v1alpha1"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/eventingtls"
Expand Down Expand Up @@ -156,9 +158,17 @@ func NewValidationAdmissionController(ctx context.Context, cmw configmap.Watcher
featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
featureStore.WatchConfigs(cmw)

k8s := kubeclient.Get(ctx)

// Decorate contexts with the current state of the config.
ctxFunc := func(ctx context.Context) context.Context {
return featureStore.ToContext(channelStore.ToContext(pingstore.ToContext(store.ToContext(ctx))))
return sinks.WithConfig(
featureStore.ToContext(
channelStore.ToContext(
pingstore.ToContext(store.ToContext(ctx)))),
&sinks.Config{
KubeClient: k8s,
})
}

return validation.NewAdmissionController(ctx,
Expand Down
1 change: 1 addition & 0 deletions config/200-job-sink-clusterrole.yaml
1 change: 1 addition & 0 deletions config/200-job-sink-serviceaccount.yaml
1 change: 1 addition & 0 deletions config/300-jobsink.yaml
1 change: 1 addition & 0 deletions config/500-job-sink.yaml
12 changes: 12 additions & 0 deletions config/channels/in-memory-channel/resources/in-memory-channel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,18 @@ spec:
description: Annotations is additional Status fields for the Resource to save some additional State as well as convey more information to the user. This is roughly akin to Annotations on any k8s resource, just the reconciler conveying richer information outwards.
type: object
x-kubernetes-preserve-unknown-fields: true
policies:
description: List of applied EventPolicies
type: array
items:
type: object
properties:
apiVersion:
description: The API version of the applied EventPolicy. This indicates, which version of EventPolicy is supported by the resource.
type: string
name:
description: The name of the applied EventPolicy
type: string
conditions:
description: Conditions the latest available observations of a resource's current state.
type: array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ rules:
- inmemorychannels
verbs:
- patch
- apiGroups:
- eventing.knative.dev
resources:
- eventpolicies
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
6 changes: 6 additions & 0 deletions config/core/configmaps/features.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ data:
# For more details: https://github.com/knative/eventing/issues/7174
authentication-oidc: "disabled"

# ALPHA feature: The default-authorization-mode flag allows you to change the default
# authorization mode for resources that have no EventPolicy associated with them.
#
# This feature flag is only used when "authentication-oidc" is enabled.
default-authorization-mode: "allow-same-namespace"

# ALPHA feature: The cross-namespace-event-links flag allows you to use cross-namespace referencing for Eventing.
# For more details: https://github.com/knative/eventing/issues/7739
cross-namespace-event-links: "disabled"
Expand Down
12 changes: 12 additions & 0 deletions config/core/resources/broker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ spec:
description: Annotations is additional Status fields for the Resource to save some additional State as well as convey more information to the user. This is roughly akin to Annotations on any k8s resource, just the reconciler conveying richer information outwards.
type: object
x-kubernetes-preserve-unknown-fields: true
policies:
description: List of applied EventPolicies
type: array
items:
type: object
properties:
apiVersion:
description: The API version of the applied EventPolicy. This indicates, which version of EventPolicy is supported by the resource.
type: string
name:
description: The name of the applied EventPolicy
type: string
conditions:
description: Conditions the latest available observations of a resource's current state.
type: array
Expand Down
12 changes: 12 additions & 0 deletions config/core/resources/channel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,18 @@ spec:
namespace:
description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ This is optional field, it gets defaulted to the object holding it if left out.'
type: string
policies:
description: List of applied EventPolicies
type: array
items:
type: object
properties:
apiVersion:
description: The API version of the applied EventPolicy. This indicates, which version of EventPolicy is supported by the resource.
type: string
name:
description: The name of the applied EventPolicy
type: string
conditions:
description: Conditions the latest available observations of a resource's current state.
type: array
Expand Down
Loading

0 comments on commit e1c9599

Please sign in to comment.