Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STRMCMP-558] Event improvements #44

Merged
merged 2 commits into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 12 additions & 17 deletions pkg/controller/flink/flink.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type ControllerInterface interface {
FindExternalizedCheckpoint(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (string, error)

// Logs an event to the FlinkApplication resource and to the operator log
LogEvent(ctx context.Context, app *v1alpha1.FlinkApplication, eventType string, message string)
LogEvent(ctx context.Context, app *v1alpha1.FlinkApplication, eventType string, reason string, message string)

// Compares and updates new cluster status with current cluster status
// Returns true if there is a change in ClusterStatus
Expand Down Expand Up @@ -206,21 +206,24 @@ func (f *Controller) CreateCluster(ctx context.Context, application *v1alpha1.Fl
newlyCreatedJm, err := f.jobManager.CreateIfNotExist(ctx, application)
if err != nil {
logger.Errorf(ctx, "Job manager cluster creation did not succeed %v", err)
f.LogEvent(ctx, application, corev1.EventTypeWarning,
fmt.Sprintf("Failed to create job managers: %v", err))
f.LogEvent(ctx, application, corev1.EventTypeWarning, "CreateClusterFailed",
fmt.Sprintf("Failed to create job managers for deploy %s: %v",
HashForApplication(application), err))
glaksh100 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: hash := HashForApplication(app), and use hash.


return err
}
newlyCreatedTm, err := f.taskManager.CreateIfNotExist(ctx, application)
if err != nil {
logger.Errorf(ctx, "Task manager cluster creation did not succeed %v", err)
f.LogEvent(ctx, application, corev1.EventTypeWarning,
fmt.Sprintf("Failed to create task managers: %v", err))
f.LogEvent(ctx, application, corev1.EventTypeWarning, "CreateClusterFailed",
fmt.Sprintf("Failed to create task managers for deploy %s: %v",
HashForApplication(application), err))
return err
}

if newlyCreatedJm || newlyCreatedTm {
f.LogEvent(ctx, application, corev1.EventTypeNormal, "Flink cluster created")
f.LogEvent(ctx, application, corev1.EventTypeNormal, "CreatingCluster",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, Move reason to constant variable ?

fmt.Sprintf("Creating Flink cluster for deploy %s", HashForApplication(application)))
}
return nil
}
Expand Down Expand Up @@ -388,7 +391,8 @@ func (f *Controller) DeleteOldResourcesForApp(ctx context.Context, app *v1alpha1
}

for k := range deletedHashes {
f.LogEvent(ctx, app, corev1.EventTypeNormal, fmt.Sprintf("Deleted old cluster with hash %s", k))
f.LogEvent(ctx, app, corev1.EventTypeNormal, "ToreDownCluster",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same. Nit, Move reason to constant variable ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see any reason to. It's only used here and is descriptive of this event.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing major. Just a nit, so that they can be reused later.

fmt.Sprintf("Deleted old cluster with hash %s", k))
}

return nil
Expand All @@ -411,16 +415,7 @@ func (f *Controller) FindExternalizedCheckpoint(ctx context.Context, application
return checkpoint.ExternalPath, nil
}

func (f *Controller) LogEvent(ctx context.Context, app *v1alpha1.FlinkApplication, eventType string, message string) {
reason := "Create"
if app.Status.DeployHash != "" {
// this is not the first deploy
reason = "Update"
}
if app.DeletionTimestamp != nil {
reason = "Delete"
}

func (f *Controller) LogEvent(ctx context.Context, app *v1alpha1.FlinkApplication, eventType string, reason string, message string) {
f.eventRecorder.Event(app, eventType, reason, message)
logger.Infof(ctx, "Logged %s event: %s: %s", eventType, reason, message)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/flink/mock/mock_flink.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@ func (m *FlinkController) FindExternalizedCheckpoint(ctx context.Context, applic
return "", nil
}

func (m *FlinkController) LogEvent(ctx context.Context, app *v1alpha1.FlinkApplication, eventType string, message string) {
func (m *FlinkController) LogEvent(ctx context.Context, app *v1alpha1.FlinkApplication, eventType string, reason string, message string) {
m.Events = append(m.Events, corev1.Event{
InvolvedObject: corev1.ObjectReference{
Kind: app.Kind,
Name: app.Name,
Namespace: app.Namespace,
},
Type: eventType,
Reason: "Test",
Reason: reason,
Message: message,
})
}
Expand Down
49 changes: 30 additions & 19 deletions pkg/controller/flinkapplication/flink_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ func (s *FlinkStateMachine) shouldRollback(ctx context.Context, application *v1a
// Check if the error is retryable
if application.Status.LastSeenError != nil && s.retryHandler.IsErrorRetryable(application.Status.LastSeenError) {
if s.retryHandler.IsRetryRemaining(application.Status.LastSeenError, application.Status.RetryCount) {
s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, fmt.Sprintf("Application in phase %v retrying with error %v", application.Status.Phase, application.Status.LastSeenError))
logger.Warnf(ctx, "Application in phase %v retrying with error %v",
application.Status.Phase, application.Status.LastSeenError)
return false
}
// Retryable error with retries exhausted
Expand All @@ -99,16 +100,13 @@ func (s *FlinkStateMachine) shouldRollback(ctx context.Context, application *v1a

// For non-retryable errors, always fail fast
if application.Status.LastSeenError != nil {
s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, fmt.Sprintf("Application failed to progress in the %v phase with error %v", application.Status.Phase, application.Status.LastSeenError))
application.Status.LastSeenError = nil
return true
}

// As a default, use a time based wait to determine whether to rollback or not.
if application.Status.LastUpdatedAt != nil {
if elapsedTime, ok := s.retryHandler.WaitOnError(s.clock, application.Status.LastUpdatedAt.Time); !ok {
s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, fmt.Sprintf("Application failed to progress for %v in the %v phase",
elapsedTime, application.Status.Phase))
if _, ok := s.retryHandler.WaitOnError(s.clock, application.Status.LastUpdatedAt.Time); !ok {
application.Status.LastSeenError = nil
return true
}
Expand Down Expand Up @@ -212,8 +210,10 @@ func (s *FlinkStateMachine) handleNewOrUpdating(ctx context.Context, application
}

func (s *FlinkStateMachine) deployFailed(ctx context.Context, app *v1alpha1.FlinkApplication) error {
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "Deployment failed, rolled back successfully")
app.Status.FailedDeployHash = flink.HashForApplication(app)
hash := flink.HashForApplication(app)
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RolledBackDeploy",
fmt.Sprintf("Successfull rolled back deploy %s", hash))
app.Status.FailedDeployHash = hash

// Reset error and retry count
app.Status.LastSeenError = nil
Expand Down Expand Up @@ -266,7 +266,8 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a
return err
}

s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, fmt.Sprintf("Cancelling job %s with a final savepoint", application.Status.JobStatus.JobID))
s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "CancellingJob",
fmt.Sprintf("Cancelling job %s with a final savepoint", application.Status.JobStatus.JobID))

application.Spec.SavepointInfo.TriggerID = triggerID
return s.k8Cluster.UpdateK8Object(ctx, application)
Expand All @@ -283,8 +284,9 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a
savepointStatusResponse.SavepointStatus.Status != client.SavePointInProgress {
// Savepointing failed
// TODO: we should probably retry this a few times before failing
s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, fmt.Sprintf("Failed to take savepoint: %v",
savepointStatusResponse.Operation.FailureCause))
s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "SavepointFailed",
fmt.Sprintf("Failed to take savepoint for job %s: %v",
application.Status.JobStatus.JobID, savepointStatusResponse.Operation.FailureCause))

// try to find an externalized checkpoint
path, err := s.flinkController.FindExternalizedCheckpoint(ctx, application, application.Status.DeployHash)
Expand All @@ -296,12 +298,15 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a
return s.deployFailed(ctx, application)
}

s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, fmt.Sprintf("Restoring from externalized checkpoint %s", path))
s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "RestoringExternalizedCheckpoint",
fmt.Sprintf("Restoring from externalized checkpoint %s for deploy %s",
path, flink.HashForApplication(application)))

restorePath = path
} else if savepointStatusResponse.SavepointStatus.Status == client.SavePointCompleted {
s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, fmt.Sprintf("Canceled job with savepoint %s",
savepointStatusResponse.Operation.Location))
s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "CanceledJob",
fmt.Sprintf("Canceled job with savepoint %s",
savepointStatusResponse.Operation.Location))
restorePath = savepointStatusResponse.Operation.Location
}

Expand Down Expand Up @@ -339,13 +344,15 @@ func (s *FlinkStateMachine) submitJobIfNeeded(ctx context.Context, app *v1alpha1
jobID, err := s.flinkController.StartFlinkJob(ctx, app, hash,
jarName, parallelism, entryClass, programArgs)
if err != nil {
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, fmt.Sprintf("Failed to submit job to cluster: %v", err))
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobSubmissionFailed",
fmt.Sprintf("Failed to submit job to cluster for deploy %s: %v", hash, err))

// TODO: we probably want some kind of back-off here
return nil, err
}

s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, fmt.Sprintf("Flink job submitted to cluster with id %s", jobID))
s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, "JobSubmitted",
fmt.Sprintf("Flink job submitted to cluster with id %s", jobID))
app.Status.JobStatus.JobID = jobID
activeJob = flink.GetActiveFlinkJob(jobs)
} else {
Expand Down Expand Up @@ -429,7 +436,8 @@ func (s *FlinkStateMachine) handleRollingBack(ctx context.Context, app *v1alpha1
return s.deployFailed(ctx, app)
}

s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "Deployment failed, rolling back")
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "DeployFailed",
fmt.Sprintf("Deployment %s failed, rolling back", flink.HashForApplication(app)))

// TODO: handle single mode

Expand Down Expand Up @@ -605,7 +613,8 @@ func (s *FlinkStateMachine) handleApplicationDeleting(ctx context.Context, app *
if err != nil {
return err
}
s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, fmt.Sprintf("Cancelling job with savepoint %v", triggerID))
s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, "CancellingJob",
fmt.Sprintf("Cancelling job with savepoint %v", triggerID))
app.Spec.SavepointInfo.TriggerID = triggerID
} else {
// we've already started savepointing; check the status
Expand All @@ -616,12 +625,14 @@ func (s *FlinkStateMachine) handleApplicationDeleting(ctx context.Context, app *

if status.Operation.Location == "" && status.SavepointStatus.Status != client.SavePointInProgress {
// savepointing failed
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, fmt.Sprintf("Failed to take savepoint %v", status.Operation.FailureCause))
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "SavepointFailed",
fmt.Sprintf("Failed to take savepoint %v", status.Operation.FailureCause))
// clear the trigger id so that we can try again
app.Spec.SavepointInfo.TriggerID = ""
} else if status.SavepointStatus.Status == client.SavePointCompleted {
// we're done, clean up
s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, fmt.Sprintf("Cancelled job with savepoint '%s'", status.Operation.Location))
s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, "CanceledJob",
fmt.Sprintf("Cancelled job with savepoint '%s'", status.Operation.Location))
app.Spec.SavepointInfo.SavepointLocation = status.Operation.Location
app.Spec.SavepointInfo.TriggerID = ""
}
Expand Down