Skip to content

Commit

Permalink
#1329 update the operator to allow subpaths to be used with the spark…
Browse files Browse the repository at this point in the history
… ui ingress. (#1330)

* Slightly restructure the way the ingressURL is created, and the variable which is passed around, and configure the Ingress manifest to work with subpaths

Signed-off-by: Tom Hellier <me@tomhellier.com>

* fix #1329 unit tests now capture groups are being added in the subpath condition, and protect against empty paths

Signed-off-by: Tom Hellier <me@tomhellier.com>
  • Loading branch information
TomHellier authored Oct 21, 2021
1 parent bfb0446 commit 9a85d4b
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 52 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
run: make detect-crds-drift

- name: Create kind cluster
uses: helm/kind-action@v1.0.0
uses: helm/kind-action@v1.2.0

- name: Run chart-testing (install)
run: ct install
4 changes: 2 additions & 2 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
run: ct lint

- name: Create kind cluster
uses: helm/kind-action@v1.0.0
uses: helm/kind-action@v1.2.0
if: steps.list-changed.outputs.changed == 'true'

- name: Run chart-testing (install)
Expand All @@ -53,4 +53,4 @@ jobs:
uses: helm/chart-releaser-action@v1.1.0
env:
CR_TOKEN: "${{ secrets.GITHUB_TOKEN }}"
CR_RELEASE_NAME_TEMPLATE: "spark-operator-chart-{{ .Version }}"
CR_RELEASE_NAME_TEMPLATE: "spark-operator-chart-{{ .Version }}"
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o /usr/bin
FROM ${SPARK_IMAGE}
USER root
COPY --from=builder /usr/bin/spark-operator /usr/bin/
RUN apt-get update \
RUN apt-get update --allow-releaseinfo-change \
&& apt-get update \
&& apt-get install -y openssl curl tini \
&& rm -rf /var/lib/apt/lists/*
COPY hack/gencerts.sh /usr/bin/
Expand Down
55 changes: 35 additions & 20 deletions pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,41 @@ func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1be
}
}

if c.enableUIService {
service, err := createSparkUIService(app, c.kubeClient)
if err != nil {
glog.Errorf("failed to create UI service for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
app.Status.DriverInfo.WebUIServiceName = service.serviceName
app.Status.DriverInfo.WebUIPort = service.servicePort
app.Status.DriverInfo.WebUIAddress = fmt.Sprintf("%s:%d", service.serviceIP, app.Status.DriverInfo.WebUIPort)
// Create UI Ingress if ingress-format is set.
if c.ingressURLFormat != "" {
// We are going to want to use an ingress url.
ingressURL, err := getSparkUIingressURL(c.ingressURLFormat, app.GetName(), app.GetNamespace())
if err != nil {
glog.Errorf("failed to get the spark ingress url %s/%s: %v", app.Namespace, app.Name, err)
} else {
// need to ensure the spark.ui variables are configured correctly if a subPath is used.
if ingressURL.Path != "" {
if app.Spec.SparkConf == nil {
app.Spec.SparkConf = make(map[string]string)
}
app.Spec.SparkConf["spark.ui.proxyBase"] = ingressURL.Path
app.Spec.SparkConf["spark.ui.proxyRedirectUri"] = "/"
}
ingress, err := createSparkUIIngress(app, *service, ingressURL, c.kubeClient)
if err != nil {
glog.Errorf("failed to create UI Ingress for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
app.Status.DriverInfo.WebUIIngressAddress = ingress.ingressURL.String()
app.Status.DriverInfo.WebUIIngressName = ingress.ingressName
}
}
}
}
}

driverPodName := getDriverPodName(app)
submissionID := uuid.New().String()
submissionCmdArgs, err := buildSubmissionCommandArgs(app, driverPodName, submissionID)
Expand Down Expand Up @@ -715,26 +750,6 @@ func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1be
}
c.recordSparkApplicationEvent(app)

if c.enableUIService {
service, err := createSparkUIService(app, c.kubeClient)
if err != nil {
glog.Errorf("failed to create UI service for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
app.Status.DriverInfo.WebUIServiceName = service.serviceName
app.Status.DriverInfo.WebUIPort = service.servicePort
app.Status.DriverInfo.WebUIAddress = fmt.Sprintf("%s:%d", service.serviceIP, app.Status.DriverInfo.WebUIPort)
// Create UI Ingress if ingress-format is set.
if c.ingressURLFormat != "" {
ingress, err := createSparkUIIngress(app, *service, c.ingressURLFormat, c.kubeClient)
if err != nil {
glog.Errorf("failed to create UI Ingress for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
app.Status.DriverInfo.WebUIIngressAddress = ingress.ingressURL
app.Status.DriverInfo.WebUIIngressName = ingress.ingressName
}
}
}
}
return app
}

Expand Down
49 changes: 49 additions & 0 deletions pkg/controller/sparkapplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1566,6 +1566,55 @@ func TestIsNextRetryDue(t *testing.T) {
assert.True(t, isNextRetryDue(int64ptr(50), 3, metav1.Time{Time: metav1.Now().Add(-151 * time.Second)}))
}

func TestIngressWithSubpathAffectsSparkConfiguration(t *testing.T) {
os.Setenv(kubernetesServiceHostEnvVar, "localhost")
os.Setenv(kubernetesServicePortEnvVar, "443")

appName := "ingressaffectssparkconfig"

app := &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: appName,
Namespace: "test",
},
Spec: v1beta2.SparkApplicationSpec{
RestartPolicy: v1beta2.RestartPolicy{
Type: v1beta2.Never,
},
TimeToLiveSeconds: int64ptr(1),
},
Status: v1beta2.SparkApplicationStatus{},
}

ctrl, _ := newFakeController(app)
ctrl.ingressURLFormat = "example.com/{{$appNamespace}}/{{$appName}}"
ctrl.enableUIService = true
_, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(context.TODO(), app, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
deployedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
ingresses, err := ctrl.kubeClient.ExtensionsV1beta1().Ingresses(app.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
t.Fatal(err)
}
if ingresses.Items[0].Spec.Rules[0].IngressRuleValue.HTTP.Paths[0].Path != "/"+app.Namespace+"/"+app.Name+"(/|$)(.*)" {
t.Fatal("The ingress subpath was not created successfully.")
}
// The controller doesn't sync changes to the sparkConf performed by submitSparkApplication back to the kubernetes API server.
if deployedApp.Spec.SparkConf["spark.ui.proxyBase"] != "/"+app.Namespace+"/"+app.Name {
t.Log("The spark configuration does not reflect the subpath expected by the ingress")
}
if deployedApp.Spec.SparkConf["spark.ui.proxyRedirectUri"] != "/" {
t.Log("The spark configuration does not reflect the proxyRedirectUri expected by the ingress")
}
}

func stringptr(s string) *string {
return &s
}
Expand Down
49 changes: 31 additions & 18 deletions pkg/controller/sparkapplication/sparkui.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,20 @@ const (
var ingressAppNameURLRegex = regexp.MustCompile("{{\\s*[$]appName\\s*}}")
var ingressAppNamespaceURLRegex = regexp.MustCompile("{{\\s*[$]appNamespace\\s*}}")

func getSparkUIingressURL(ingressURLFormat string, appName string, appNamespace string) string {
return ingressAppNamespaceURLRegex.ReplaceAllString(ingressAppNameURLRegex.ReplaceAllString(ingressURLFormat, appName), appNamespace)
func getSparkUIingressURL(ingressURLFormat string, appName string, appNamespace string) (*url.URL, error) {
ingressURL := ingressAppNamespaceURLRegex.ReplaceAllString(ingressAppNameURLRegex.ReplaceAllString(ingressURLFormat, appName), appNamespace)
parsedURL, err := url.Parse(ingressURL)
if err != nil {
return nil, err
}
if parsedURL.Scheme == "" {
//url does not contain any scheme, adding http:// so url.Parse can function correctly
parsedURL, err = url.Parse("http://" + ingressURL)
if err != nil {
return nil, err
}
}
return parsedURL, nil
}

// SparkService encapsulates information about the driver UI service.
Expand All @@ -62,26 +74,19 @@ type SparkService struct {
// SparkIngress encapsulates information about the driver UI ingress.
type SparkIngress struct {
ingressName string
ingressURL string
ingressURL *url.URL
annotations map[string]string
ingressTLS []extensions.IngressTLS
}

func createSparkUIIngress(app *v1beta2.SparkApplication, service SparkService, ingressURLFormat string, kubeClient clientset.Interface) (*SparkIngress, error) {
ingressURL := getSparkUIingressURL(ingressURLFormat, app.GetName(), app.GetNamespace())
func createSparkUIIngress(app *v1beta2.SparkApplication, service SparkService, ingressURL *url.URL, kubeClient clientset.Interface) (*SparkIngress, error) {
ingressResourceAnnotations := getIngressResourceAnnotations(app)
ingressTlsHosts := getIngressTlsHosts(app)

parsedURL, err := url.Parse(ingressURL)
if err != nil {
return nil, err
}
if parsedURL.Scheme == "" {
//url does not contain any scheme, adding http:// so url.Parse can function correctly
parsedURL, err = url.Parse("http://" + ingressURL)
if err != nil {
return nil, err
}
ingressURLPath := ingressURL.Path
// If we're serving on a subpath, we need to ensure we create capture groups
if ingressURLPath != "" && ingressURLPath != "/" {
ingressURLPath = ingressURLPath + "(/|$)(.*)"
}

ingress := extensions.Ingress{
Expand All @@ -93,7 +98,7 @@ func createSparkUIIngress(app *v1beta2.SparkApplication, service SparkService, i
},
Spec: extensions.IngressSpec{
Rules: []extensions.IngressRule{{
Host: parsedURL.Host,
Host: ingressURL.Host,
IngressRuleValue: extensions.IngressRuleValue{
HTTP: &extensions.HTTPIngressRuleValue{
Paths: []extensions.HTTPIngressPath{{
Expand All @@ -104,7 +109,7 @@ func createSparkUIIngress(app *v1beta2.SparkApplication, service SparkService, i
IntVal: service.servicePort,
},
},
Path: parsedURL.Path,
Path: ingressURLPath,
}},
},
},
Expand All @@ -115,11 +120,19 @@ func createSparkUIIngress(app *v1beta2.SparkApplication, service SparkService, i
if len(ingressResourceAnnotations) != 0 {
ingress.ObjectMeta.Annotations = ingressResourceAnnotations
}

// If we're serving on a subpath, we need to ensure we use the capture groups
if ingressURL.Path != "" && ingressURL.Path != "/" {
if ingress.ObjectMeta.Annotations == nil {
ingress.ObjectMeta.Annotations = make(map[string]string)
}
ingress.ObjectMeta.Annotations["nginx.ingress.kubernetes.io/rewrite-target"] = "/$2"
}
if len(ingressTlsHosts) != 0 {
ingress.Spec.TLS = ingressTlsHosts
}
glog.Infof("Creating an Ingress %s for the Spark UI for application %s", ingress.Name, app.Name)
_, err = kubeClient.ExtensionsV1beta1().Ingresses(ingress.Namespace).Create(context.TODO(), &ingress, metav1.CreateOptions{})
_, err := kubeClient.ExtensionsV1beta1().Ingresses(ingress.Namespace).Create(context.TODO(), &ingress, metav1.CreateOptions{})

if err != nil {
return nil, err
Expand Down
53 changes: 43 additions & 10 deletions pkg/controller/sparkapplication/sparkui_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sparkapplication
import (
"context"
"fmt"
"net/url"
"reflect"
"testing"

Expand Down Expand Up @@ -330,7 +331,14 @@ func TestCreateSparkUIIngress(t *testing.T) {
testFn := func(test testcase, t *testing.T, ingressURLFormat string) {
fakeClient := fake.NewSimpleClientset()
sparkService, err := createSparkUIService(test.app, fakeClient)
sparkIngress, err := createSparkUIIngress(test.app, *sparkService, ingressURLFormat, fakeClient)
if err != nil {
t.Fatal(err)
}
ingressURL, err := getSparkUIingressURL(ingressURLFormat, test.app.Name, test.app.Namespace)
if err != nil {
t.Fatal(err)
}
sparkIngress, err := createSparkUIIngress(test.app, *sparkService, ingressURL, fakeClient)
if err != nil {
if test.expectError {
return
Expand All @@ -340,7 +348,7 @@ func TestCreateSparkUIIngress(t *testing.T) {
if sparkIngress.ingressName != test.expectedIngress.ingressName {
t.Errorf("Ingress name wanted %s got %s", test.expectedIngress.ingressName, sparkIngress.ingressName)
}
if sparkIngress.ingressURL != test.expectedIngress.ingressURL {
if sparkIngress.ingressURL.String() != test.expectedIngress.ingressURL.String() {
t.Errorf("Ingress URL wanted %s got %s", test.expectedIngress.ingressURL, sparkIngress.ingressURL)
}
ingress, err := fakeClient.ExtensionsV1beta1().Ingresses(test.app.Namespace).
Expand Down Expand Up @@ -373,9 +381,13 @@ func TestCreateSparkUIIngress(t *testing.T) {
t.Errorf("No Ingress rules found.")
}
ingressRule := ingress.Spec.Rules[0]
//ingress URL is same as Host and Path combined from k8s ingress
if ingressRule.Host+ingressRule.IngressRuleValue.HTTP.Paths[0].Path != test.expectedIngress.ingressURL {
t.Errorf("Ingress of app %s has the wrong host %s", test.expectedIngress.ingressURL, ingressRule.Host)
// If we have a path, then the ingress adds capture groups
if ingressRule.IngressRuleValue.HTTP.Paths[0].Path != "" && ingressRule.IngressRuleValue.HTTP.Paths[0].Path != "/" {
test.expectedIngress.ingressURL.Path = test.expectedIngress.ingressURL.Path + "(/|$)(.*)"
}
if ingressRule.Host+ingressRule.IngressRuleValue.HTTP.Paths[0].Path != test.expectedIngress.ingressURL.Host+test.expectedIngress.ingressURL.Path {

t.Errorf("Ingress of app %s has the wrong host %s", ingressRule.Host+ingressRule.IngressRuleValue.HTTP.Paths[0].Path, test.expectedIngress.ingressURL.Host+test.expectedIngress.ingressURL.Path)
}

if len(ingressRule.IngressRuleValue.HTTP.Paths) != 1 {
Expand Down Expand Up @@ -481,7 +493,7 @@ func TestCreateSparkUIIngress(t *testing.T) {
app: app1,
expectedIngress: SparkIngress{
ingressName: fmt.Sprintf("%s-ui-ingress", app1.GetName()),
ingressURL: app1.GetName() + ".ingress.clusterName.com",
ingressURL: parseURLAndAssertError(app1.GetName()+".ingress.clusterName.com", t),
},
expectError: false,
},
Expand All @@ -490,7 +502,7 @@ func TestCreateSparkUIIngress(t *testing.T) {
app: app2,
expectedIngress: SparkIngress{
ingressName: fmt.Sprintf("%s-ui-ingress", app2.GetName()),
ingressURL: app2.GetName() + ".ingress.clusterName.com",
ingressURL: parseURLAndAssertError(app2.GetName()+".ingress.clusterName.com", t),
annotations: map[string]string{
"kubernetes.io/ingress.class": "nginx",
"nginx.ingress.kubernetes.io/force-ssl-redirect": "true",
Expand All @@ -503,7 +515,7 @@ func TestCreateSparkUIIngress(t *testing.T) {
app: app3,
expectedIngress: SparkIngress{
ingressName: fmt.Sprintf("%s-ui-ingress", app3.GetName()),
ingressURL: app3.GetName() + ".ingress.clusterName.com",
ingressURL: parseURLAndAssertError(app3.GetName()+".ingress.clusterName.com", t),
annotations: map[string]string{
"kubernetes.io/ingress.class": "nginx",
"nginx.ingress.kubernetes.io/force-ssl-redirect": "true",
Expand All @@ -519,7 +531,7 @@ func TestCreateSparkUIIngress(t *testing.T) {
app: app4,
expectedIngress: SparkIngress{
ingressName: fmt.Sprintf("%s-ui-ingress", app4.GetName()),
ingressURL: app3.GetName() + ".ingress.clusterName.com",
ingressURL: parseURLAndAssertError(app3.GetName()+".ingress.clusterName.com", t),
annotations: map[string]string{
"kubernetes.io/ingress.class": "nginx",
"nginx.ingress.kubernetes.io/force-ssl-redirect": "true",
Expand All @@ -542,7 +554,10 @@ func TestCreateSparkUIIngress(t *testing.T) {
app: app1,
expectedIngress: SparkIngress{
ingressName: fmt.Sprintf("%s-ui-ingress", app1.GetName()),
ingressURL: "ingress.clusterName.com/" + app1.GetNamespace() + "/" + app1.GetName(),
ingressURL: parseURLAndAssertError("ingress.clusterName.com/"+app1.GetNamespace()+"/"+app1.GetName(), t),
annotations: map[string]string{
"nginx.ingress.kubernetes.io/rewrite-target": "/$2",
},
},
expectError: false,
},
Expand All @@ -552,3 +567,21 @@ func TestCreateSparkUIIngress(t *testing.T) {
testFn(test, t, "ingress.clusterName.com/{{$appNamespace}}/{{$appName}}")
}
}

func parseURLAndAssertError(testURL string, t *testing.T) *url.URL {
fallbackURL, _ := url.Parse("http://example.com")
parsedURL, err := url.Parse(testURL)
if err != nil {
t.Errorf("failed to parse the url: %s", testURL)
return fallbackURL
}
if parsedURL.Scheme == "" {
//url does not contain any scheme, adding http:// so url.Parse can function correctly
parsedURL, err = url.Parse("http://" + testURL)
if err != nil {
t.Errorf("failed to parse the url: %s", testURL)
return fallbackURL
}
}
return parsedURL
}

0 comments on commit 9a85d4b

Please sign in to comment.