Skip to content

Commit

Permalink
[issue-368] knative integration with DataIndex and JobService
Browse files Browse the repository at this point in the history
  • Loading branch information
jianrongzhang89 committed May 20, 2024
1 parent 21d1aee commit 55dcb24
Show file tree
Hide file tree
Showing 36 changed files with 453 additions and 288 deletions.
2 changes: 1 addition & 1 deletion api/v1alpha08/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions api/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion container-builder/api/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 3 additions & 49 deletions controllers/knative/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,14 @@ import (
"fmt"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/utils/pointer"
clienteventingv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
clientservingv1 "knative.dev/serving/pkg/client/clientset/versioned/typed/serving/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var servingClient clientservingv1.ServingV1Interface
Expand Down Expand Up @@ -103,7 +97,7 @@ func GetKnativeAvailability(cfg *rest.Config) (*Availability, error) {
}
}

func GetWorkflowSink(ctx context.Context, c client.Client, workflow *operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) {
func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) {
if workflow == nil {
return nil, nil
}
Expand All @@ -117,7 +111,7 @@ func GetWorkflowSink(ctx context.Context, c client.Client, workflow *operatorapi
} else if pl.Status.ClusterPlatformRef != nil {
// Find the platform referred by the cluster platform
platform := &operatorapi.SonataFlowPlatform{}
if err := c.Get(ctx, types.NamespacedName{Namespace: pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name: pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
if err := utils.GetClient().Get(context.TODO(), types.NamespacedName{Namespace: pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name: pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
return nil, fmt.Errorf("error reading the platform referred by the cluster platform")
}
if platform.Spec.Eventing != nil {
Expand All @@ -128,50 +122,10 @@ func GetWorkflowSink(ctx context.Context, c client.Client, workflow *operatorapi
return nil, nil
}

const knativeBrokerAnnotation = "eventing.knative.dev/broker.class"

func GetKnativeResource(ctx context.Context, cfg *rest.Config, kRef *duckv1.KReference) (*unstructured.Unstructured, error) {
dynamicClient, err := dynamic.NewForConfig(cfg)
if err != nil {
return nil, err
}
gv, err := schema.ParseGroupVersion(kRef.APIVersion)
if err != nil {
return nil, err
}
resourceId := schema.GroupVersionResource{
Group: gv.Group,
Version: gv.Version,
Resource: kRef.Kind,
}
if len(kRef.Namespace) == 0 {
return nil, fmt.Errorf("namespace for knative resource %s is missing", kRef.Name)
}
list, err := dynamicClient.Resource(resourceId).Namespace(kRef.Namespace).List(ctx, metav1.ListOptions{})
fmt.Printf("list:%v", list)
return dynamicClient.Resource(resourceId).Namespace(kRef.Namespace).Get(ctx, kRef.Name, metav1.GetOptions{})
}

func IsKnativeBroker(kRef *duckv1.KReference) bool {
return kRef.APIVersion == "eventing.knative.dev/v1" && kRef.Kind == "Broker"
}

func IsKnativeEnvInjected(ctx context.Context, c client.Client, deploymentName, namespace string) (bool, error) {
deployment := &appsv1.Deployment{}
if err := c.Get(ctx, types.NamespacedName{Name: deploymentName, Namespace: namespace}, deployment); err != nil {
if errors.IsNotFound(err) {
return false, nil //deployment not found
}
return false, err
}
for _, env := range deployment.Spec.Template.Spec.Containers[0].Env {
if env.Name == KSink {
return true, nil
}
}
return false, nil
}

func IsDataIndexEnabled(plf *operatorapi.SonataFlowPlatform) bool {
if plf.Spec.Services != nil {
if plf.Spec.Services.DataIndex != nil {
Expand Down
4 changes: 2 additions & 2 deletions controllers/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.S
return nil, err
}

psDI := services.NewDataIndexHandler(ctx, action.client, platform)
psDI := services.NewDataIndexHandler(platform)
if psDI.IsServiceSetInSpec() {
if err := createOrUpdateServiceComponents(ctx, action.client, platform, psDI); err != nil {
return nil, err
}
}

psJS := services.NewJobServiceHandler(ctx, action.client, platform)
psJS := services.NewJobServiceHandler(platform)
if psJS.IsServiceSetInSpec() {
if err := createOrUpdateServiceComponents(ctx, action.client, platform, psJS); err != nil {
return nil, err
Expand Down
1 change: 0 additions & 1 deletion controllers/platform/services/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,3 @@ func GetKnativeAvailability(c client.Client) *knative.Availability {
}
return result
}

14 changes: 6 additions & 8 deletions controllers/platform/services/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@
package services

import (
"context"
"fmt"
"net/url"
"strings"

"github.com/apache/incubator-kie-kogito-serverless-operator/log"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative"
Expand Down Expand Up @@ -160,15 +158,15 @@ func generateReactiveURL(postgresSpec *operatorapi.PersistencePostgreSQL, schema
// with the Data Index. For the calculation this function considers if the Data Index is present in the
// SonataFlowPlatform, if not present, no properties.
// Never nil.
func GenerateDataIndexWorkflowProperties(ctx context.Context, client client.Client, workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (*properties.Properties, error) {
func GenerateDataIndexWorkflowProperties(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (*properties.Properties, error) {
props := properties.NewProperties()
props.Set(constants.KogitoProcessDefinitionsEventsEnabled, "false")
props.Set(constants.KogitoProcessInstancesEventsEnabled, "false")
sink, err := knative.GetWorkflowSink(ctx, client, workflow, platform)
sink, err := knative.GetWorkflowSink(workflow, platform)
if err != nil {
return nil, err
}
di := NewDataIndexHandler(ctx, client, platform)
di := NewDataIndexHandler(platform)
if !profiles.IsDevProfile(workflow) && workflow != nil && workflow.Status.Services != nil && workflow.Status.Services.DataIndexRef != nil {
serviceBaseUrl := workflow.Status.Services.DataIndexRef.Url
if di.IsServiceEnabled() && len(serviceBaseUrl) > 0 {
Expand Down Expand Up @@ -199,15 +197,15 @@ func GenerateDataIndexWorkflowProperties(ctx context.Context, client client.Clie
// with the Job Service. For the calculation this function considers if the Job Service is present in the
// SonataFlowPlatform, if not present, no properties.
// Never nil.
func GenerateJobServiceWorkflowProperties(ctx context.Context, client client.Client, workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (*properties.Properties, error) {
func GenerateJobServiceWorkflowProperties(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (*properties.Properties, error) {
props := properties.NewProperties()
props.Set(constants.JobServiceRequestEventsConnector, constants.QuarkusHTTP)
props.Set(constants.JobServiceRequestEventsURL, fmt.Sprintf("%s://localhost/v2/jobs/events", constants.JobServiceURLProtocol))
sink, err := knative.GetWorkflowSink(ctx, client, workflow, platform)
sink, err := knative.GetWorkflowSink(workflow, platform)
if err != nil {
return nil, err
}
js := NewJobServiceHandler(ctx, client, platform)
js := NewJobServiceHandler(platform)
if !profiles.IsDevProfile(workflow) && workflow != nil && workflow.Status.Services != nil && workflow.Status.Services.JobServiceRef != nil {
serviceBaseUrl := workflow.Status.Services.JobServiceRef.Url
if js.IsServiceEnabled() && len(serviceBaseUrl) > 0 {
Expand Down
14 changes: 7 additions & 7 deletions controllers/platform/services/properties_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var _ = Describe("PlatformServiceHandler properties", func() {

DescribeTable("Job Service",
func(plfm *operatorapi.SonataFlowPlatform, expectedProperties *properties.Properties) {
js := NewJobServiceHandler(plf)
js := NewJobServiceHandler(plfm)
handler, err := NewServiceAppPropertyHandler(js)
Expect(err).NotTo(HaveOccurred())
p, err := properties.LoadString(handler.Build())
Expand Down Expand Up @@ -177,7 +177,7 @@ func setJobServiceEnabledValue(v *bool) plfmOptionFn {
p.Spec.Services = &operatorapi.ServicesPlatformSpec{}
}
if p.Spec.Services.JobService == nil {
p.Spec.Services.JobService = &operatorapi.ServiceSpec{}
p.Spec.Services.JobService = &operatorapi.JobServiceServiceSpec{}
}
p.Spec.Services.JobService.Enabled = v
}
Expand All @@ -189,7 +189,7 @@ func setDataIndexEnabledValue(v *bool) plfmOptionFn {
p.Spec.Services = &operatorapi.ServicesPlatformSpec{}
}
if p.Spec.Services.DataIndex == nil {
p.Spec.Services.DataIndex = &operatorapi.ServiceSpec{}
p.Spec.Services.DataIndex = &operatorapi.DataIndexServiceSpec{}
}
p.Spec.Services.DataIndex.Enabled = v
}
Expand All @@ -201,7 +201,7 @@ func emptyDataIndexServiceSpec() plfmOptionFn {
p.Spec.Services = &operatorapi.ServicesPlatformSpec{}
}
if p.Spec.Services.DataIndex == nil {
p.Spec.Services.DataIndex = &operatorapi.ServiceSpec{}
p.Spec.Services.DataIndex = &operatorapi.DataIndexServiceSpec{}
}
}
}
Expand All @@ -212,7 +212,7 @@ func emptyJobServiceSpec() plfmOptionFn {
p.Spec.Services = &operatorapi.ServicesPlatformSpec{}
}
if p.Spec.Services.JobService == nil {
p.Spec.Services.JobService = &operatorapi.ServiceSpec{}
p.Spec.Services.JobService = &operatorapi.JobServiceServiceSpec{}
}
}
}
Expand All @@ -235,7 +235,7 @@ func setJobServiceJDBC(jdbc string) plfmOptionFn {
p.Spec.Services = &operatorapi.ServicesPlatformSpec{}
}
if p.Spec.Services.JobService == nil {
p.Spec.Services.JobService = &operatorapi.ServiceSpec{}
p.Spec.Services.JobService = &operatorapi.JobServiceServiceSpec{}
}
if p.Spec.Services.JobService.Persistence == nil {
p.Spec.Services.JobService.Persistence = &operatorapi.PersistenceOptionsSpec{}
Expand All @@ -253,7 +253,7 @@ func setDataIndexJDBC(jdbc string) plfmOptionFn {
p.Spec.Services = &operatorapi.ServicesPlatformSpec{}
}
if p.Spec.Services.DataIndex == nil {
p.Spec.Services.DataIndex = &operatorapi.ServiceSpec{}
p.Spec.Services.DataIndex = &operatorapi.DataIndexServiceSpec{}
}
if p.Spec.Services.DataIndex.Persistence == nil {
p.Spec.Services.DataIndex.Persistence = &operatorapi.PersistenceOptionsSpec{}
Expand Down
44 changes: 10 additions & 34 deletions controllers/platform/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package services

import (
"context"
"fmt"

"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/cfg"
Expand Down Expand Up @@ -104,19 +103,14 @@ type PlatformServiceHandler interface {
SetServiceUrlInWorkflowStatus(workflow *operatorapi.SonataFlow)

GetServiceSource() *duckv1.Destination

GetContext() context.Context
GetClient() client.Client
}

type DataIndexHandler struct {
platform *operatorapi.SonataFlowPlatform
ctx context.Context
client client.Client
}

func NewDataIndexHandler(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform) PlatformServiceHandler {
return &DataIndexHandler{platform: platform, ctx: ctx, client: client}
func NewDataIndexHandler(platform *operatorapi.SonataFlowPlatform) PlatformServiceHandler {
return &DataIndexHandler{platform: platform}
}

func (d *DataIndexHandler) GetContainerName() string {
Expand Down Expand Up @@ -146,7 +140,7 @@ func (d *DataIndexHandler) GetServiceName() string {
}

func (d DataIndexHandler) SetServiceUrlInPlatformStatus(clusterRefPlatform *operatorapi.SonataFlowPlatform) {
psDI := NewDataIndexHandler(d.ctx, d.client, clusterRefPlatform)
psDI := NewDataIndexHandler(clusterRefPlatform)
if !isServicesSet(d.platform) && psDI.IsServiceEnabledInSpec() {
if d.platform.Status.ClusterPlatformRef != nil {
if d.platform.Status.ClusterPlatformRef.Services == nil {
Expand Down Expand Up @@ -289,12 +283,10 @@ func (d *DataIndexHandler) GenerateServiceProperties() (*properties.Properties,

type JobServiceHandler struct {
platform *operatorapi.SonataFlowPlatform
ctx context.Context
client client.Client
}

func NewJobServiceHandler(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform) PlatformServiceHandler {
return &JobServiceHandler{ctx: ctx, client: client, platform: platform}
func NewJobServiceHandler(platform *operatorapi.SonataFlowPlatform) PlatformServiceHandler {
return &JobServiceHandler{platform: platform}
}

func (j *JobServiceHandler) GetContainerName() string {
Expand Down Expand Up @@ -328,7 +320,7 @@ func (j *JobServiceHandler) GetServiceCmName() string {
}

func (j JobServiceHandler) SetServiceUrlInPlatformStatus(clusterRefPlatform *operatorapi.SonataFlowPlatform) {
psJS := NewJobServiceHandler(j.ctx, j.client, clusterRefPlatform)
psJS := NewJobServiceHandler(clusterRefPlatform)
if !isServicesSet(j.platform) && psJS.IsServiceEnabledInSpec() {
if j.platform.Status.ClusterPlatformRef != nil {
if j.platform.Status.ClusterPlatformRef.Services == nil {
Expand Down Expand Up @@ -469,7 +461,7 @@ func (j *JobServiceHandler) GenerateServiceProperties() (*properties.Properties,
if isDataIndexEnabled(j.platform) {
props.Set(constants.JobServiceStatusChangeEvents, "true")
if /*!isKnativeEnvInjected ||*/ j.GetServiceSource() == nil {
di := NewDataIndexHandler(j.ctx, j.client, j.platform)
di := NewDataIndexHandler(j.platform)
props.Set(constants.JobServiceStatusChangeEventsURL, di.GetLocalServiceBaseUrl()+"/jobs")
} else {
props.Set(constants.JobServiceStatusChangeEventsURL, constants.KnativeInjectedEnvVar)
Expand All @@ -481,9 +473,9 @@ func (j *JobServiceHandler) GenerateServiceProperties() (*properties.Properties,
return props, nil
}

func SetServiceUrlsInWorkflowStatus(ctx context.Context, client client.Client, pl *operatorapi.SonataFlowPlatform, workflow *operatorapi.SonataFlow) {
tpsDI := NewDataIndexHandler(ctx, client, pl)
tpsJS := NewJobServiceHandler(ctx, client, pl)
func SetServiceUrlsInWorkflowStatus(pl *operatorapi.SonataFlowPlatform, workflow *operatorapi.SonataFlow) {
tpsDI := NewDataIndexHandler(pl)
tpsJS := NewJobServiceHandler(pl)

workflow.Status.Services = nil
tpsDI.SetServiceUrlInWorkflowStatus(workflow)
Expand All @@ -504,22 +496,6 @@ func (j *JobServiceHandler) GetServiceSink() *duckv1.Destination {
return GetPlatformBroker(j.platform)
}

func (j *JobServiceHandler) GetContext() context.Context {
return j.ctx
}

func (j *JobServiceHandler) GetClient() client.Client {
return j.client
}

func (d *DataIndexHandler) GetContext() context.Context {
return d.ctx
}

func (d *DataIndexHandler) GetClient() client.Client {
return d.client
}

func isDataIndexEnabled(platform *operatorapi.SonataFlowPlatform) bool {
return isDataIndexSet(platform) && platform.Spec.Services.DataIndex.Enabled != nil &&
*platform.Spec.Services.DataIndex.Enabled
Expand Down
Loading

0 comments on commit 55dcb24

Please sign in to comment.