Skip to content

Commit

Permalink
[issue-368] knative integration with DataIndex and JobService: addres…
Browse files Browse the repository at this point in the history
…s review comments
  • Loading branch information
jianrongzhang89 committed Jun 23, 2024
1 parent 80cd80e commit dbb453a
Show file tree
Hide file tree
Showing 21 changed files with 412 additions and 64 deletions.
13 changes: 13 additions & 0 deletions api/v1alpha08/sonataflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,19 @@ type SonataFlowStatus struct {
// Platform displays which platform is being used by this workflow
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="platform"
Platform *SonataFlowPlatformRef `json:"platform,omitempty"`
// Triggers list of triggers created for the SonataFlow
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="triggers"
Triggers []SonataFlowTriggerRef `json:"triggers,omitempty"`
}

// SonataFlowTriggerRef defines a trigger created for the SonataFlow.
type SonataFlowTriggerRef struct {
// Name of the Trigger
//+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Trigger_Name"
Name string `json:"name"`
// Namespace of the Trigger
//+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Trigger_NS"
Namespace string `json:"namespace"`
}

func (s *SonataFlowStatus) GetTopLevelConditionType() api.ConditionType {
Expand Down
13 changes: 13 additions & 0 deletions api/v1alpha08/sonataflowplatform_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@ type SonataFlowPlatformStatus struct {
// ClusterPlatformRef information related to the (optional) active SonataFlowClusterPlatform
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="clusterPlatformRef"
ClusterPlatformRef *SonataFlowClusterPlatformRefStatus `json:"clusterPlatformRef,omitempty"`
// Triggers list of triggers created for the SonataFlowPlatform
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="triggers"
Triggers []SonataFlowPlatformTriggerRef `json:"triggers,omitempty"`
}

// SonataFlowPlatformTriggerRef defines a trigger created for the SonataFlowPlatform.
type SonataFlowPlatformTriggerRef struct {
// Name of the Trigger
//+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Trigger_Name"
Name string `json:"name"`
// Namespace of the Trigger
//+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Trigger_NS"
Namespace string `json:"namespace"`
}

// SonataFlowClusterPlatformRefStatus information related to the (optional) active SonataFlowClusterPlatform
Expand Down
40 changes: 40 additions & 0 deletions api/v1alpha08/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ spec:
- description: Info generic information related to the build
displayName: info
path: info
- description: Triggers list of triggers created for the SonataFlowPlatform
displayName: triggers
path: triggers
- description: Version the operator version controlling this Platform
displayName: version
path: version
Expand Down Expand Up @@ -348,6 +351,9 @@ spec:
workflow
displayName: services
path: services
- description: Triggers list of triggers created for the SonataFlow
displayName: triggers
path: triggers
version: v1alpha08
description: |-
SonataFlow Kubernetes Operator for deploying workflow applications
Expand Down
17 changes: 17 additions & 0 deletions bundle/manifests/sonataflow.org_sonataflowplatforms.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16597,6 +16597,23 @@ spec:
description: The generation observed by the deployment controller.
format: int64
type: integer
triggers:
description: Triggers list of triggers created for the SonataFlowPlatform
items:
description: SonataFlowPlatformTriggerRef defines a trigger created
for the SonataFlowPlatform.
properties:
name:
description: Name of the Trigger
type: string
namespace:
description: Namespace of the Trigger
type: string
required:
- name
- namespace
type: object
type: array
version:
description: Version the operator version controlling this Platform
type: string
Expand Down
17 changes: 17 additions & 0 deletions bundle/manifests/sonataflow.org_sonataflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9557,6 +9557,23 @@ spec:
type: string
type: object
type: object
triggers:
description: Triggers list of triggers created for the SonataFlow
items:
description: SonataFlowTriggerRef defines a trigger created for
the SonataFlow.
properties:
name:
description: Name of the Trigger
type: string
namespace:
description: Namespace of the Trigger
type: string
required:
- name
- namespace
type: object
type: array
type: object
type: object
served: true
Expand Down
17 changes: 17 additions & 0 deletions config/crd/bases/sonataflow.org_sonataflowplatforms.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16598,6 +16598,23 @@ spec:
description: The generation observed by the deployment controller.
format: int64
type: integer
triggers:
description: Triggers list of triggers created for the SonataFlowPlatform
items:
description: SonataFlowPlatformTriggerRef defines a trigger created
for the SonataFlowPlatform.
properties:
name:
description: Name of the Trigger
type: string
namespace:
description: Namespace of the Trigger
type: string
required:
- name
- namespace
type: object
type: array
version:
description: Version the operator version controlling this Platform
type: string
Expand Down
17 changes: 17 additions & 0 deletions config/crd/bases/sonataflow.org_sonataflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9558,6 +9558,23 @@ spec:
type: string
type: object
type: object
triggers:
description: Triggers list of triggers created for the SonataFlow
items:
description: SonataFlowTriggerRef defines a trigger created for
the SonataFlow.
properties:
name:
description: Name of the Trigger
type: string
namespace:
description: Namespace of the Trigger
type: string
required:
- name
- namespace
type: object
type: array
type: object
type: object
served: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ spec:
- description: Info generic information related to the build
displayName: info
path: info
- description: Triggers list of triggers created for the SonataFlowPlatform
displayName: triggers
path: triggers
- description: Version the operator version controlling this Platform
displayName: version
path: version
Expand Down Expand Up @@ -232,6 +235,9 @@ spec:
workflow
displayName: services
path: services
- description: Triggers list of triggers created for the SonataFlow
displayName: triggers
path: triggers
version: v1alpha08
description: |-
SonataFlow Kubernetes Operator for deploying workflow applications
Expand Down
30 changes: 26 additions & 4 deletions controllers/knative/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@ import (
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
appsv1 "k8s.io/api/apps/v1"
"github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
clienteventingv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
clientservingv1 "knative.dev/serving/pkg/client/clientset/versioned/typed/serving/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var servingClient clientservingv1.ServingV1Interface
Expand Down Expand Up @@ -208,9 +211,9 @@ func moveKnativeVolumeMountToEnd(mounts []corev1.VolumeMount) {

// Knative Sinkbinding injects K_SINK env, a volume and volume mount. The volume and volume mount
// must be in the end of the array to avoid repeadly restarting of the workflow pod
func RestoreKnativeVolumeAndVolumeMount(deployment *appsv1.Deployment) {
moveKnativeVolumeToEnd(deployment.Spec.Template.Spec.Volumes)
visitContainers(&deployment.Spec.Template.Spec, func(container *corev1.Container) {
func RestoreKnativeVolumeAndVolumeMount(podSpec *corev1.PodSpec) {
moveKnativeVolumeToEnd(podSpec.Volumes)
visitContainers(podSpec, func(container *corev1.Container) {
moveKnativeVolumeMountToEnd(container.VolumeMounts)
})
}
Expand All @@ -230,3 +233,22 @@ func visitContainers(podSpec *corev1.PodSpec, visitor containerVisitor) {
visitor((*corev1.Container)(&podSpec.EphemeralContainers[i].EphemeralContainerCommon))
}
}

// if a trigger is changed and it has namespace different from the platform is changed, reconcile the parent SonataFlowPlatform in the cluster.
func MapTriggerToPlatformRequests(ctx context.Context, object client.Object) []reconcile.Request {
if trigger, ok := object.(*eventingv1.Trigger); ok {
nameFound := ""
namespaceFound := ""
for k, v := range trigger.GetLabels() {
if k == workflowproj.LabelApp {
nameFound = v
} else if k == workflowproj.LabelAppNamespace {
namespaceFound = v
}
}
if len(nameFound) > 0 && len(namespaceFound) > 0 && namespaceFound != trigger.Namespace {
return []reconcile.Request{reconcile.Request{NamespacedName: types.NamespacedName{Name: nameFound, Namespace: namespaceFound}}}
}
}
return nil
}
70 changes: 57 additions & 13 deletions controllers/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

Expand Down Expand Up @@ -241,8 +243,9 @@ func createOrUpdateService(ctx context.Context, client client.Client, platform *

func getLabels(platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) (map[string]string, map[string]string) {
lbl := map[string]string{
workflowproj.LabelApp: platform.Name,
workflowproj.LabelService: psh.GetServiceName(),
workflowproj.LabelApp: platform.Name,
workflowproj.LabelAppNamespace: platform.Namespace,
workflowproj.LabelService: psh.GetServiceName(),
}
selectorLbl := map[string]string{
workflowproj.LabelService: psh.GetServiceName(),
Expand Down Expand Up @@ -284,25 +287,66 @@ func createOrUpdateConfigMap(ctx context.Context, client client.Client, platform
return nil
}

func createOrUpdateKnativeResources(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
func setSonataFlowPlatformFinalizer(ctx context.Context, c client.Client, platform *operatorapi.SonataFlowPlatform) error {
if !controllerutil.ContainsFinalizer(platform, constants.TriggerFinalizer) {
controllerutil.AddFinalizer(platform, constants.TriggerFinalizer)
return c.Update(ctx, platform)
}
return nil
}

func createOrUpdateKnativeResources(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
lbl, _ := getLabels(platform, psh)
if objs, err := psh.GenerateKnativeResources(platform, lbl); err != nil {
objs, err := psh.GenerateKnativeResources(platform, lbl)
if err != nil {
return err
} else if len(objs) > 0 {
for _, obj := range objs {
if op, err := controllerutil.CreateOrUpdate(ctx, client, obj, func() error {
if err := controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != nil {
return err
}
for _, obj := range objs {
if platform.Namespace == obj.GetNamespace() {
if err := controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != nil {
return err
}
}
if triggerDef, ok := obj.(*eventingv1.Trigger); ok {
trigger := &eventingv1.Trigger{
ObjectMeta: triggerDef.ObjectMeta,
}
op, err := controllerutil.CreateOrUpdate(ctx, client, trigger, func() error {
trigger.Spec = triggerDef.Spec
return nil
})
if err != nil {
return err
}
if op == controllerutil.OperationResultCreated { // trigger has been successfully created
addToSonataFlowPlatformTriggerList(platform, trigger)
if platform.Namespace != obj.GetNamespace() {
// This is for Knative trigger in a different namespace
// Set the finalizer for trigger cleanup when the platform is deleted
return setSonataFlowPlatformFinalizer(ctx, client, platform)
}
}
} else if sbDef, ok := obj.(*sourcesv1.SinkBinding); ok {
sinkBinding := &sourcesv1.SinkBinding{
ObjectMeta: sbDef.ObjectMeta,
}
_, err := controllerutil.CreateOrUpdate(ctx, client, sinkBinding, func() error {
sinkBinding.Spec = sbDef.Spec
return nil
}); err != nil {
})
if err != nil {
return err
} else {
klog.V(log.I).InfoS("Knative Eventing resources successfully created", "operation", op)
}
}
return nil
}
return nil
}

func addToSonataFlowPlatformTriggerList(platform *operatorapi.SonataFlowPlatform, trigger *eventingv1.Trigger) {
for _, t := range platform.Status.Triggers {
if t.Name == trigger.Name && t.Namespace == trigger.Namespace {
return // trigger already exists
}
}
platform.Status.Triggers = append(platform.Status.Triggers, operatorapi.SonataFlowPlatformTriggerRef{Name: trigger.Name, Namespace: trigger.Namespace})
}
Loading

0 comments on commit dbb453a

Please sign in to comment.