Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report failed status #48

Merged
merged 1 commit into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions deploy/subscriptions.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ spec:
singular: subscription
shortNames:
- sub
- subs
preserveUnknownFields: false
scope: Namespaced
versions:
Expand Down Expand Up @@ -53,6 +54,9 @@ spec:
ready:
description: Whether the subscription is ready to be consumed.
type: boolean
failed:
description: Indicates that the operator was unable to deploy a pipeline for this subscription.
type: boolean
message:
description: Error or success message, for information only.
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Access control rule (colloquially, an Acl)
*/
@ApiModel(description = "Access control rule (colloquially, an Acl)")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1Acl implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* AclList is a list of Acl
*/
@ApiModel(description = "AclList is a list of Acl")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1AclList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* A set of related ACL rules.
*/
@ApiModel(description = "A set of related ACL rules.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1AclSpec {
/**
* The resource access method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* The resource being controlled.
*/
@ApiModel(description = "The resource being controlled.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1AclSpecResource {
public static final String SERIALIZED_NAME_KIND = "kind";
@SerializedName(SERIALIZED_NAME_KIND)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Status, as set by the operator.
*/
@ApiModel(description = "Status, as set by the operator.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1AclStatus {
public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Kafka Topic
*/
@ApiModel(description = "Kafka Topic")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* KafkaTopicList is a list of KafkaTopic
*/
@ApiModel(description = "KafkaTopicList is a list of KafkaTopic")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* Desired Kafka topic configuration.
*/
@ApiModel(description = "Desired Kafka topic configuration.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpec {
public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs";
@SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* V1alpha1KafkaTopicSpecClientConfigs
*/
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpecClientConfigs {
public static final String SERIALIZED_NAME_CONFIG_MAP_REF = "configMapRef";
@SerializedName(SERIALIZED_NAME_CONFIG_MAP_REF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Reference to a ConfigMap to use for AdminClient configuration.
*/
@ApiModel(description = "Reference to a ConfigMap to use for AdminClient configuration.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpecConfigMapRef {
public static final String SERIALIZED_NAME_NAME = "name";
@SerializedName(SERIALIZED_NAME_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Current state of the topic.
*/
@ApiModel(description = "Current state of the topic.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1KafkaTopicStatus {
public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Hoptimator Subscription
*/
@ApiModel(description = "Hoptimator Subscription")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* SubscriptionList is a list of Subscription
*/
@ApiModel(description = "SubscriptionList is a list of Subscription")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Subscription spec
*/
@ApiModel(description = "Subscription spec")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1SubscriptionSpec {
public static final String SERIALIZED_NAME_DATABASE = "database";
@SerializedName(SERIALIZED_NAME_DATABASE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@
* Filled in by the operator.
*/
@ApiModel(description = "Filled in by the operator.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
public class V1alpha1SubscriptionStatus {
public static final String SERIALIZED_NAME_FAILED = "failed";
@SerializedName(SERIALIZED_NAME_FAILED)
private Boolean failed;

public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
private String message;
Expand All @@ -49,6 +53,29 @@ public class V1alpha1SubscriptionStatus {
private String sql;


public V1alpha1SubscriptionStatus failed(Boolean failed) {

this.failed = failed;
return this;
}

/**
* Indicates that the operator was unable to deploy a pipeline for this subscription.
* @return failed
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "Indicates that the operator was unable to deploy a pipeline for this subscription.")

public Boolean getFailed() {
return failed;
}


public void setFailed(Boolean failed) {
this.failed = failed;
}


public V1alpha1SubscriptionStatus message(String message) {

this.message = message;
Expand Down Expand Up @@ -158,22 +185,24 @@ public boolean equals(Object o) {
return false;
}
V1alpha1SubscriptionStatus v1alpha1SubscriptionStatus = (V1alpha1SubscriptionStatus) o;
return Objects.equals(this.message, v1alpha1SubscriptionStatus.message) &&
return Objects.equals(this.failed, v1alpha1SubscriptionStatus.failed) &&
Objects.equals(this.message, v1alpha1SubscriptionStatus.message) &&
Objects.equals(this.ready, v1alpha1SubscriptionStatus.ready) &&
Objects.equals(this.resources, v1alpha1SubscriptionStatus.resources) &&
Objects.equals(this.sql, v1alpha1SubscriptionStatus.sql);
}

@Override
public int hashCode() {
return Objects.hash(message, ready, resources, sql);
return Objects.hash(failed, message, ready, resources, sql);
}


@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("class V1alpha1SubscriptionStatus {\n");
sb.append(" failed: ").append(toIndentedString(failed)).append("\n");
sb.append(" message: ").append(toIndentedString(message)).append("\n");
sb.append(" ready: ").append(toIndentedString(ready)).append("\n");
sb.append(" resources: ").append(toIndentedString(resources)).append("\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,39 +86,49 @@ public Result reconcile(Request request) {
// Phase 1
log.info("Planning a new pipeline for {}/{} with SQL `{}`...", kind, name, object.getSpec().getSql());

Pipeline pipeline = pipeline(object);
Resource.Environment subEnv = new SubscriptionEnvironment(namespace, name, pipeline)
.orElse(environment);
Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(subEnv);

// For sink resources, also expose hints.
Resource.TemplateFactory sinkTemplateFactory = new Resource.SimpleTemplateFactory(subEnv
.orElse(new Resource.SimpleEnvironment(map(object.getSpec().getHints()))));

// Render resources related to all source tables.
List<String> upstreamResources = pipeline.upstreamResources().stream()
.map(x -> x.render(templateFactory))
.collect(Collectors.toList());

// Render the SQL job
String sqlJob = pipeline.sqlJob().render(templateFactory);

// Render resources related to the sink table. For these resources, we pass along any
// "hints" as part of the environment.
List<String> downstreamResources = pipeline.downstreamResources().stream()
.map(x -> x.render(sinkTemplateFactory))
.collect(Collectors.toList());

List<String> combined = new ArrayList<>();
combined.addAll(upstreamResources);
combined.add(sqlJob);
combined.addAll(downstreamResources);

status.setResources(combined);

status.setSql(object.getSpec().getSql());
status.setReady(null); // null indicates that pipeline needs to be deployed
status.setMessage("Planned.");
try {
Pipeline pipeline = pipeline(object);
Resource.Environment subEnv = new SubscriptionEnvironment(namespace, name, pipeline)
.orElse(environment);
Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(subEnv);

// For sink resources, also expose hints.
Resource.TemplateFactory sinkTemplateFactory = new Resource.SimpleTemplateFactory(subEnv
.orElse(new Resource.SimpleEnvironment(map(object.getSpec().getHints()))));

// Render resources related to all source tables.
List<String> upstreamResources = pipeline.upstreamResources().stream()
.map(x -> x.render(templateFactory))
.collect(Collectors.toList());

// Render the SQL job
String sqlJob = pipeline.sqlJob().render(templateFactory);

// Render resources related to the sink table. For these resources, we pass along any
// "hints" as part of the environment.
List<String> downstreamResources = pipeline.downstreamResources().stream()
.map(x -> x.render(sinkTemplateFactory))
.collect(Collectors.toList());

List<String> combined = new ArrayList<>();
combined.addAll(upstreamResources);
combined.add(sqlJob);
combined.addAll(downstreamResources);

status.setResources(combined);

status.setSql(object.getSpec().getSql());
status.setReady(null); // null indicates that pipeline needs to be deployed
status.setFailed(null);
status.setMessage("Planned.");
} catch (Exception e) {
log.error("Encountered error when planning a pipeline for {}/{} with SQL `{}`.", kind, name,
object.getSpec().getSql(), e);

// Mark the Subscription as failed.
status.setFailed(true);
status.setMessage("Error: " + e.getMessage());
}
} else if (status.getReady() == null && status.getResources() != null) {
// Phase 2
log.info("Deploying pipeline for {}/{}...", kind, name);
Expand All @@ -128,6 +138,7 @@ public Result reconcile(Request request) {

if (deployed) {
status.setReady(false);
status.setFailed(false);
status.setMessage("Deployed.");
} else {
return new Result(true, operator.failureRetryDuration());
Expand All @@ -140,11 +151,13 @@ public Result reconcile(Request request) {

if (ready) {
status.setReady(true);
status.setFailed(false);
status.setMessage("Ready.");
log.info("{}/{} is ready.", kind, name);
result = new Result(false);
} else {
status.setReady(false);
status.setFailed(false);
status.setMessage("Deployed.");
log.info("Pipeline for {}/{} is NOT ready.", kind, name);
}
Expand Down Expand Up @@ -277,10 +290,6 @@ public static Controller controller(Operator operator, HoptimatorPlanner.Factory
.withReconciler(reconciler)
.withName("subscription-controller")
.withWorkerCount(1)
//.withReadyFunc(resourceInformer::hasSynced) // optional, only starts controller when the
// cache has synced up
//.withWorkQueue(resourceWorkQueue)
//.watch()
.watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1Subscription.class, x).build())
.build();
}
Expand Down