Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE #576

Closed
wants to merge 13 commits into from
79 changes: 56 additions & 23 deletions controllers/postgres_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,14 +1062,27 @@ func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(log logr.Log
return requeueAfterReconcile, nil
}
if instance.Spec.PostgresConnection.SynchronousReplication {
if resp.SynchronousNodesAdditional == nil || *resp.SynchronousNodesAdditional != instance.Spec.PostgresConnection.ConnectedPostgresID {
log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating", "response", resp)
return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP)
// fetch the sync standby to determine the correct application_name of the instance
var synchronousStandbyApplicationName *string
s := &pg.Postgres{}
ns := types.NamespacedName{
Name: instance.Spec.PostgresConnection.ConnectedPostgresID,
Namespace: instance.Namespace,
}
if err := r.CtrlClient.Get(ctx, ns, s); err != nil {
r.recorder.Eventf(s, "Warning", "Error", "failed to get referenced sync standby: %v", err)
synchronousStandbyApplicationName = nil
} else {
synchronousStandbyApplicationName = pointer.String(s.ToPeripheralResourceName())
}
if resp.SynchronousNodesAdditional == nil || *resp.SynchronousNodesAdditional != *synchronousStandbyApplicationName {
log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating and requeing", "response", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, synchronousStandbyApplicationName)
}
} else {
if resp.SynchronousNodesAdditional != nil {
log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating", "response", resp)
return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP)
log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating and requeing", "response", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil)
}
}

Expand All @@ -1078,25 +1091,25 @@ func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(log logr.Log
log.V(debugLogLevel).Info("standby_cluster mismatch, requeing", "response", resp)
return requeueAfterReconcile, nil
}
if resp.StandbyCluster.ApplicationName != instance.ObjectMeta.Name {
log.V(debugLogLevel).Info("application_name mismatch, updating", "response", resp)
return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP)
}
if resp.SynchronousNodesAdditional != nil {
log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating", "response", resp)
return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP)
}
if resp.StandbyCluster.CreateReplicaMethods == nil {
log.V(debugLogLevel).Info("create_replica_methods mismatch, updating", "response", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP)
log.V(debugLogLevel).Info("create_replica_methods mismatch, updating and requeing", "response", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil)
}
if resp.StandbyCluster.Host != instance.Spec.PostgresConnection.ConnectionIP {
log.V(debugLogLevel).Info("host mismatch, requeing", "updating", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP)
log.V(debugLogLevel).Info("host mismatch, updating and requeing", "updating", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil)
}
if resp.StandbyCluster.Port != int(instance.Spec.PostgresConnection.ConnectionPort) {
log.V(debugLogLevel).Info("port mismatch, requeing", "updating", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP)
log.V(debugLogLevel).Info("port mismatch, updating and requeing", "updating", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil)
}
if resp.StandbyCluster.ApplicationName != instance.ToPeripheralResourceName() {
log.V(debugLogLevel).Info("application_name mismatch, updating and requeing", "response", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil)
}
if resp.SynchronousNodesAdditional != nil {
log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating and requeing", "response", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil)
}
}

Expand Down Expand Up @@ -1142,7 +1155,7 @@ func (r *PostgresReconciler) updatePatroniReplicationConfigOnAllPods(log logr.Lo
for _, pod := range pods.Items {
pod := pod // pin!
podIP := pod.Status.PodIP
if err := r.httpPatchPatroni(log, ctx, instance, podIP); err != nil {
if err := r.httpPatchPatroni(log, ctx, instance, podIP, nil); err != nil {
lastErr = err
log.Info("failed to update pod")
}
Expand All @@ -1155,7 +1168,7 @@ func (r *PostgresReconciler) updatePatroniReplicationConfigOnAllPods(log logr.Lo
return nil
}

func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Context, instance *pg.Postgres, podIP string) error {
func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Context, instance *pg.Postgres, podIP string, synchronousStandbyApplicationName *string) error {
if podIP == "" {
return errors.New("podIP must not be empty")
}
Expand All @@ -1170,8 +1183,23 @@ func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Conte
StandbyCluster: nil,
}
if instance.Spec.PostgresConnection.SynchronousReplication {
if synchronousStandbyApplicationName == nil {
// fetch the sync standby to determine the correct application_name of the instance
log.V(debugLogLevel).Info("unexpectetly having to fetch the referenced sync standby")
s := &pg.Postgres{}
ns := types.NamespacedName{
Name: instance.Spec.PostgresConnection.ConnectedPostgresID,
Namespace: instance.Namespace,
}
if err := r.CtrlClient.Get(ctx, ns, s); err != nil {
r.recorder.Eventf(s, "Warning", "Error", "failed to get referenced sync standby: %v", err)
synchronousStandbyApplicationName = nil
} else {
synchronousStandbyApplicationName = pointer.String(s.ToPeripheralResourceName())
}
}
// enable sync replication
request.SynchronousNodesAdditional = pointer.String(instance.Spec.PostgresConnection.ConnectedPostgresID)
request.SynchronousNodesAdditional = synchronousStandbyApplicationName
} else {
// disable sync replication
request.SynchronousNodesAdditional = nil
Expand All @@ -1183,7 +1211,7 @@ func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Conte
CreateReplicaMethods: []string{"basebackup_fast_xlog"},
Host: instance.Spec.PostgresConnection.ConnectionIP,
Port: int(instance.Spec.PostgresConnection.ConnectionPort),
ApplicationName: instance.ObjectMeta.Name,
ApplicationName: instance.ToPeripheralResourceName(),
},
SynchronousNodesAdditional: nil,
}
Expand Down Expand Up @@ -1212,6 +1240,11 @@ func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Conte
}
defer resp.Body.Close()

// fake error when standbyApplicationName is required but not provided
if instance.IsReplicationPrimary() && instance.Spec.PostgresConnection.SynchronousReplication && synchronousStandbyApplicationName == nil {
return fmt.Errorf("missing application_name of synchronous standby, disable synchronous replication")
}

return nil
}

Expand Down
Loading