diff --git a/controllers/postgres_controller.go b/controllers/postgres_controller.go index 612f1ecc..6c9cb51b 100644 --- a/controllers/postgres_controller.go +++ b/controllers/postgres_controller.go @@ -1062,14 +1062,27 @@ func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(log logr.Log return requeueAfterReconcile, nil } if instance.Spec.PostgresConnection.SynchronousReplication { - if resp.SynchronousNodesAdditional == nil || *resp.SynchronousNodesAdditional != instance.Spec.PostgresConnection.ConnectedPostgresID { - log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating", "response", resp) - return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP) + // fetch the sync standby to determine the correct application_name of the instance + var synchronousStandbyApplicationName *string + s := &pg.Postgres{} + ns := types.NamespacedName{ + Name: instance.Spec.PostgresConnection.ConnectedPostgresID, + Namespace: instance.Namespace, + } + if err := r.CtrlClient.Get(ctx, ns, s); err != nil { + r.recorder.Eventf(s, "Warning", "Error", "failed to get referenced sync standby: %v", err) + synchronousStandbyApplicationName = nil + } else { + synchronousStandbyApplicationName = pointer.String(s.ToPeripheralResourceName()) + } + if resp.SynchronousNodesAdditional == nil || *resp.SynchronousNodesAdditional != *synchronousStandbyApplicationName { + log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating and requeing", "response", resp) + return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, synchronousStandbyApplicationName) } } else { if resp.SynchronousNodesAdditional != nil { - log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating", "response", resp) - return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP) + log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating and requeing", "response", resp) + return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil) } } @@ -1078,25 +1091,25 @@ func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(log logr.Log log.V(debugLogLevel).Info("standby_cluster mismatch, requeing", "response", resp) return requeueAfterReconcile, nil } - if resp.StandbyCluster.ApplicationName != instance.ObjectMeta.Name { - log.V(debugLogLevel).Info("application_name mismatch, updating", "response", resp) - return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP) - } - if resp.SynchronousNodesAdditional != nil { - log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating", "response", resp) - return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP) - } if resp.StandbyCluster.CreateReplicaMethods == nil { - log.V(debugLogLevel).Info("create_replica_methods mismatch, updating", "response", resp) - return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP) + log.V(debugLogLevel).Info("create_replica_methods mismatch, updating and requeing", "response", resp) + return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil) } if resp.StandbyCluster.Host != instance.Spec.PostgresConnection.ConnectionIP { - log.V(debugLogLevel).Info("host mismatch, requeing", "updating", resp) - return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP) + log.V(debugLogLevel).Info("host mismatch, updating and requeing", "updating", resp) + return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil) } if resp.StandbyCluster.Port != int(instance.Spec.PostgresConnection.ConnectionPort) { - log.V(debugLogLevel).Info("port mismatch, requeing", "updating", resp) - return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP) + log.V(debugLogLevel).Info("port mismatch, updating and requeing", "updating", resp) + return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil) + } + if resp.StandbyCluster.ApplicationName != instance.ToPeripheralResourceName() { + log.V(debugLogLevel).Info("application_name mismatch, updating and requeing", "response", resp) + return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil) + } + if resp.SynchronousNodesAdditional != nil { + log.V(debugLogLevel).Info("synchronous_nodes_additional mistmatch, updating and requeing", "response", resp) + return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil) } } @@ -1142,7 +1155,7 @@ func (r *PostgresReconciler) updatePatroniReplicationConfigOnAllPods(log logr.Lo for _, pod := range pods.Items { pod := pod // pin! podIP := pod.Status.PodIP - if err := r.httpPatchPatroni(log, ctx, instance, podIP); err != nil { + if err := r.httpPatchPatroni(log, ctx, instance, podIP, nil); err != nil { lastErr = err log.Info("failed to update pod") } @@ -1155,7 +1168,7 @@ func (r *PostgresReconciler) updatePatroniReplicationConfigOnAllPods(log logr.Lo return nil } -func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Context, instance *pg.Postgres, podIP string) error { +func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Context, instance *pg.Postgres, podIP string, synchronousStandbyApplicationName *string) error { if podIP == "" { return errors.New("podIP must not be empty") } @@ -1170,8 +1183,23 @@ func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Conte StandbyCluster: nil, } if instance.Spec.PostgresConnection.SynchronousReplication { + if synchronousStandbyApplicationName == nil { + // fetch the sync standby to determine the correct application_name of the instance + log.V(debugLogLevel).Info("unexpectetly having to fetch the referenced sync standby") + s := &pg.Postgres{} + ns := types.NamespacedName{ + Name: instance.Spec.PostgresConnection.ConnectedPostgresID, + Namespace: instance.Namespace, + } + if err := r.CtrlClient.Get(ctx, ns, s); err != nil { + r.recorder.Eventf(s, "Warning", "Error", "failed to get referenced sync standby: %v", err) + synchronousStandbyApplicationName = nil + } else { + synchronousStandbyApplicationName = pointer.String(s.ToPeripheralResourceName()) + } + } // enable sync replication - request.SynchronousNodesAdditional = pointer.String(instance.Spec.PostgresConnection.ConnectedPostgresID) + request.SynchronousNodesAdditional = synchronousStandbyApplicationName } else { // disable sync replication request.SynchronousNodesAdditional = nil @@ -1183,7 +1211,7 @@ func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Conte CreateReplicaMethods: []string{"basebackup_fast_xlog"}, Host: instance.Spec.PostgresConnection.ConnectionIP, Port: int(instance.Spec.PostgresConnection.ConnectionPort), - ApplicationName: instance.ObjectMeta.Name, + ApplicationName: instance.ToPeripheralResourceName(), }, SynchronousNodesAdditional: nil, } @@ -1212,6 +1240,11 @@ func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Conte } defer resp.Body.Close() + // fake error when standbyApplicationName is required but not provided + if instance.IsReplicationPrimary() && instance.Spec.PostgresConnection.SynchronousReplication && synchronousStandbyApplicationName == nil { + return fmt.Errorf("missing application_name of synchronous standby, disable synchronous replication") + } + return nil }