Skip to content

Commit

Permalink
UPSTREAM: <carry>: extend termination events
Browse files Browse the repository at this point in the history
  • Loading branch information
soltysh committed Jul 1, 2024
1 parent 309017f commit 28550a5
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 44 deletions.
66 changes: 66 additions & 0 deletions cmd/kube-apiserver/app/patch_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package app

import (
"context"
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/endpoints/request"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/kubernetes/pkg/apis/core"
v1 "k8s.io/kubernetes/pkg/apis/core/v1"
eventstorage "k8s.io/kubernetes/pkg/registry/core/event/storage"
)

// eventRegistrySink wraps an event registry in order to be used as direct event sync, without going through the API.
type eventRegistrySink struct {
*eventstorage.REST
}

var _ genericapiserver.EventSink = eventRegistrySink{}

func (s eventRegistrySink) Create(v1event *corev1.Event) (*corev1.Event, error) {
ctx := request.WithNamespace(request.WithRequestInfo(request.NewContext(), &request.RequestInfo{APIVersion: "v1"}), v1event.Namespace)
// since we are bypassing the API set a hard timeout for the storage layer
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

var event core.Event
if err := v1.Convert_v1_Event_To_core_Event(v1event, &event, nil); err != nil {
return nil, err
}

obj, err := s.REST.Create(ctx, &event, nil, &metav1.CreateOptions{})
if err != nil {
return nil, err
}
ret, ok := obj.(*core.Event)
if !ok {
return nil, fmt.Errorf("expected corev1.Event, got %T", obj)
}

var v1ret corev1.Event
if err := v1.Convert_core_Event_To_v1_Event(ret, &v1ret, nil); err != nil {
return nil, err
}

return &v1ret, nil
}
48 changes: 4 additions & 44 deletions staging/src/k8s.io/apiserver/pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"crypto/sha256"
"encoding/base32"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
Expand All @@ -37,7 +36,6 @@ import (
"golang.org/x/crypto/cryptobyte"
jsonpatch "gopkg.in/evanphx/json-patch.v4"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -771,8 +769,8 @@ func (c *RecommendedConfig) Complete() CompletedConfig {
if len(ns) == 0 {
ns = "default"
}
c.EventSink = &v1.EventSinkImpl{
Interface: kubernetes.NewForConfigOrDie(c.ClientConfig).CoreV1().Events(ns),
c.EventSink = clientEventSink{
&v1.EventSinkImpl{Interface: kubernetes.NewForConfigOrDie(c.ClientConfig).CoreV1().Events(ns)},
}
}
}
Expand All @@ -786,39 +784,6 @@ var allowedMediaTypes = []string{
runtime.ContentTypeProtobuf,
}

func eventReference() (*corev1.ObjectReference, error) {
ns := os.Getenv("POD_NAMESPACE")
pod := os.Getenv("POD_NAME")
if len(ns) == 0 && len(pod) > 0 {
serviceAccountNamespaceFile := "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
if _, err := os.Stat(serviceAccountNamespaceFile); err == nil {
bs, err := ioutil.ReadFile(serviceAccountNamespaceFile)
if err != nil {
return nil, err
}
ns = string(bs)
}
}
if len(ns) == 0 {
pod = ""
ns = "openshift-kube-apiserver"
}
if len(pod) == 0 {
return &corev1.ObjectReference{
Kind: "Namespace",
Name: ns,
APIVersion: "v1",
}, nil
}

return &corev1.ObjectReference{
Kind: "Pod",
Namespace: ns,
Name: pod,
APIVersion: "v1",
}, nil
}

// New creates a new server which logically combines the handling chain with the passed server.
// name is used to differentiate for logging. The handler chain in particular can be difficult as it starts delegating.
// delegationTarget may not be nil.
Expand Down Expand Up @@ -916,8 +881,9 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
ref, err := eventReference()
if err != nil {
klog.Warningf("Failed to derive event reference, won't create events: %v", err)
c.EventSink = nullEventSink{}
s.OpenShiftGenericAPIServerPatch.eventSink = nullEventSink{}
}
s.RegisterDestroyFunc(c.EventSink.Destroy)
s.eventRef = ref

if c.FeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
Expand Down Expand Up @@ -1276,9 +1242,3 @@ func SetHostnameFuncForTests(name string) {
return
}
}

type nullEventSink struct{}

func (nullEventSink) Create(event *corev1.Event) (*corev1.Event, error) {
return nil, nil
}
52 changes: 52 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/server/patch_genericapiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package server

import (
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
Expand All @@ -31,13 +32,15 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
)

// EventSink allows to create events.
type EventSink interface {
Create(event *corev1.Event) (*corev1.Event, error)
Destroy()
}

type OpenShiftGenericAPIServerPatch struct {
Expand Down Expand Up @@ -110,6 +113,39 @@ func (s *GenericAPIServer) Eventf(eventType, reason, messageFmt string, args ...
}
}

func eventReference() (*corev1.ObjectReference, error) {
ns := os.Getenv("POD_NAMESPACE")
pod := os.Getenv("POD_NAME")
if len(ns) == 0 && len(pod) > 0 {
serviceAccountNamespaceFile := "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
if _, err := os.Stat(serviceAccountNamespaceFile); err == nil {
bs, err := ioutil.ReadFile(serviceAccountNamespaceFile)
if err != nil {
return nil, err
}
ns = string(bs)
}
}
if len(ns) == 0 {
pod = ""
ns = "openshift-kube-apiserver"
}
if len(pod) == 0 {
return &corev1.ObjectReference{
Kind: "Namespace",
Name: ns,
APIVersion: "v1",
}, nil
}

return &corev1.ObjectReference{
Kind: "Pod",
Namespace: ns,
Name: pod,
APIVersion: "v1",
}, nil
}

// terminationLoggingListener wraps the given listener to mark late connections
// as such, identified by the remote address. In parallel, we have a filter that
// logs bad requests through these connections. We need this filter to get
Expand Down Expand Up @@ -230,3 +266,19 @@ func isLocal(req *http.Request) bool {
func isKubeApiserverLoopBack(req *http.Request) bool {
return strings.HasPrefix(req.UserAgent(), "kube-apiserver/")
}

type nullEventSink struct{}

func (nullEventSink) Create(event *corev1.Event) (*corev1.Event, error) {
return nil, nil
}

func (nullEventSink) Destroy() {
}

type clientEventSink struct {
*v1.EventSinkImpl
}

func (clientEventSink) Destroy() {
}

0 comments on commit 28550a5

Please sign in to comment.