Skip to content

Commit

Permalink
621 allow for plain http connections (#622)
Browse files Browse the repository at this point in the history
* add flag to connect to cluster using http even if tls enabled
* use https if non-TLS listeners are disabled
  • Loading branch information
JorgeAndresQuintero authored May 30, 2023
1 parent 7625a11 commit 9bba483
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 47 deletions.
1 change: 1 addition & 0 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ const (
OperatorNamespaceEnvVar = "OPERATOR_NAMESPACE"
EnableWebhooksEnvVar = "ENABLE_WEBHOOKS"
ControllerSyncPeriodEnvVar = "SYNC_PERIOD"
ConnectUsingPlainHTTPEnvVar = "CONNECT_USING_PLAIN_HTTP"
)
3 changes: 2 additions & 1 deletion controllers/topology_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type TopologyReconciler struct {
Recorder record.EventRecorder
RabbitmqClientFactory rabbitmqclient.Factory
KubernetesClusterDomain string
ConnectUsingPlainHTTP bool
}

func (r *TopologyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand All @@ -48,7 +49,7 @@ func (r *TopologyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, err
}

credsProvider, tlsEnabled, err := rabbitmqclient.ParseReference(ctx, r.Client, obj.RabbitReference(), obj.GetNamespace(), r.KubernetesClusterDomain)
credsProvider, tlsEnabled, err := rabbitmqclient.ParseReference(ctx, r.Client, obj.RabbitReference(), obj.GetNamespace(), r.KubernetesClusterDomain, r.ConnectUsingPlainHTTP)
if err != nil {
return r.handleRMQReferenceParseError(ctx, obj, err)
}
Expand Down
31 changes: 31 additions & 0 deletions controllers/topology_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,35 @@ var _ = Describe("TopologyReconciler", func() {
Expect(uri).To(BeEquivalentTo("https://example-rabbit.default.svc:15671"))
})
})

When("flag for plain HTTP connection is set", func() {
It("uses http for connection", func() {
Expect((&controllers.TopologyReconciler{
Client: mgr.GetClient(),
Type: &topology.Queue{},
Scheme: mgr.GetScheme(),
Recorder: fakeRecorder,
RabbitmqClientFactory: fakeRabbitMQClientFactory,
ReconcileFunc: &controllers.QueueReconciler{},
ConnectUsingPlainHTTP: true,
}).SetupWithManager(mgr)).To(Succeed())

queue := &topology.Queue{
ObjectMeta: metav1.ObjectMeta{Name: "cb-queue", Namespace: "default"},
Spec: topology.QueueSpec{RabbitmqClusterReference: commonRabbitmqClusterRef},
}
fakeRabbitMQClient.DeclareQueueReturns(commonHttpCreatedResponse, nil)
fakeRabbitMQClient.DeleteQueueReturns(commonHttpDeletedResponse, nil)
Expect(client.Create(ctx, queue)).To(Succeed())

Eventually(func() int {
return len(fakeRabbitMQClientFactoryArgsForCall)
}, 5).Should(BeNumerically(">", 0))

credentials, _, _ := FakeRabbitMQClientFactoryArgsForCall(0)
uri, found := credentials["uri"]
Expect(found).To(BeTrue(), "expected to find key 'uri'")
Expect(uri).To(BeEquivalentTo("http://example-rabbit.default.svc:15672"))
})
})
})
27 changes: 26 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func main() {

clusterDomain := sanitizeClusterDomainInput(os.Getenv(controllers.KubernetesInternalDomainEnvVar))

usePlainHTTP := getBoolEnv(controllers.ConnectUsingPlainHTTPEnvVar)

managerOpts := ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Expand Down Expand Up @@ -167,6 +169,7 @@ func main() {
RabbitmqClientFactory: rabbitmqclient.RabbitholeClientFactory,
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.QueueReconciler{},
ConnectUsingPlainHTTP: usePlainHTTP,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.QueueControllerName)
os.Exit(1)
Expand All @@ -181,6 +184,7 @@ func main() {
RabbitmqClientFactory: rabbitmqclient.RabbitholeClientFactory,
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.ExchangeReconciler{},
ConnectUsingPlainHTTP: usePlainHTTP,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.ExchangeControllerName)
os.Exit(1)
Expand All @@ -195,6 +199,7 @@ func main() {
RabbitmqClientFactory: rabbitmqclient.RabbitholeClientFactory,
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.BindingReconciler{},
ConnectUsingPlainHTTP: usePlainHTTP,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.BindingControllerName)
os.Exit(1)
Expand All @@ -210,6 +215,7 @@ func main() {
KubernetesClusterDomain: clusterDomain,
WatchTypes: []client.Object{&corev1.Secret{}},
ReconcileFunc: &controllers.UserReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()},
ConnectUsingPlainHTTP: usePlainHTTP,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.UserControllerName)
os.Exit(1)
Expand All @@ -224,6 +230,7 @@ func main() {
RabbitmqClientFactory: rabbitmqclient.RabbitholeClientFactory,
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.VhostReconciler{Client: mgr.GetClient()},
ConnectUsingPlainHTTP: usePlainHTTP,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.VhostControllerName)
os.Exit(1)
Expand All @@ -238,6 +245,7 @@ func main() {
RabbitmqClientFactory: rabbitmqclient.RabbitholeClientFactory,
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.PolicyReconciler{},
ConnectUsingPlainHTTP: usePlainHTTP,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.PolicyControllerName)
os.Exit(1)
Expand All @@ -252,6 +260,7 @@ func main() {
RabbitmqClientFactory: rabbitmqclient.RabbitholeClientFactory,
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.PermissionReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()},
ConnectUsingPlainHTTP: usePlainHTTP,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.PermissionControllerName)
os.Exit(1)
Expand All @@ -266,6 +275,7 @@ func main() {
RabbitmqClientFactory: rabbitmqclient.RabbitholeClientFactory,
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.SchemaReplicationReconciler{Client: mgr.GetClient()},
ConnectUsingPlainHTTP: usePlainHTTP,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.SchemaReplicationControllerName)
os.Exit(1)
Expand All @@ -280,6 +290,7 @@ func main() {
RabbitmqClientFactory: rabbitmqclient.RabbitholeClientFactory,
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.FederationReconciler{Client: mgr.GetClient()},
ConnectUsingPlainHTTP: usePlainHTTP,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.FederationControllerName)
os.Exit(1)
Expand All @@ -294,6 +305,7 @@ func main() {
RabbitmqClientFactory: rabbitmqclient.RabbitholeClientFactory,
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.ShovelReconciler{Client: mgr.GetClient()},
ConnectUsingPlainHTTP: usePlainHTTP,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.ShovelControllerName)
os.Exit(1)
Expand All @@ -308,6 +320,7 @@ func main() {
RabbitmqClientFactory: rabbitmqclient.RabbitholeClientFactory,
KubernetesClusterDomain: clusterDomain,
ReconcileFunc: &controllers.TopicPermissionReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()},
ConnectUsingPlainHTTP: usePlainHTTP,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", controllers.TopicPermissionControllerName)
os.Exit(1)
Expand Down Expand Up @@ -386,7 +399,7 @@ func main() {

func getEnvInDuration(envName string) time.Duration {
var durationInt int64
if durationStr := os.Getenv(envName); durationStr != "" {
if durationStr, ok := os.LookupEnv(envName); ok {
var err error
if durationInt, err = strconv.ParseInt(durationStr, 10, 64); err != nil {
log.Error(err, fmt.Sprintf("unable to parse provided '%s'", envName))
Expand All @@ -395,3 +408,15 @@ func getEnvInDuration(envName string) time.Duration {
}
return time.Duration(durationInt) * time.Second
}

func getBoolEnv(envName string) bool {
var boolVar bool
if boolStr, ok := os.LookupEnv(envName); ok {
var err error
if boolVar, err = strconv.ParseBool(boolStr); err != nil {
log.Error(err, fmt.Sprintf("unable to parse provided '%s'", envName))
os.Exit(1)
}
}
return boolVar
}
27 changes: 15 additions & 12 deletions rabbitmqclient/cluster_reference.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
NoServiceReferenceSetError = errors.New("RabbitmqCluster has no ServiceReference set in status.defaultUser")
)

func ParseReference(ctx context.Context, c client.Client, rmq topology.RabbitmqClusterReference, requestNamespace string, clusterDomain string) (map[string]string, bool, error) {
func ParseReference(ctx context.Context, c client.Client, rmq topology.RabbitmqClusterReference, requestNamespace string, clusterDomain string, connectUsingHTTP bool) (map[string]string, bool, error) {
if rmq.ConnectionSecret != nil {
secret := &corev1.Secret{}
if err := c.Get(ctx, types.NamespacedName{Namespace: requestNamespace, Name: rmq.ConnectionSecret.Name}, secret); err != nil {
Expand Down Expand Up @@ -99,8 +99,8 @@ func ParseReference(ctx context.Context, c client.Client, rmq topology.RabbitmqC
if err != nil {
return nil, false, fmt.Errorf("unable to parse additionconfig setting from the rabbitmqcluster resource: %w", err)
}

endpoint, err := managementURI(svc, cluster.TLSEnabled(), clusterDomain, additionalConfig["management.path_prefix"])
useTLSForConnection := cluster.TLSEnabled() && (!connectUsingHTTP || cluster.DisableNonTLSListeners())
endpoint, err := managementURI(svc, useTLSForConnection, clusterDomain, additionalConfig["management.path_prefix"])
if err != nil {
return nil, false, fmt.Errorf("failed to get endpoint from specified rabbitmqcluster: %w", err)
}
Expand All @@ -109,7 +109,7 @@ func ParseReference(ctx context.Context, c client.Client, rmq topology.RabbitmqC
"username": user,
"password": pass,
"uri": endpoint,
}, cluster.TLSEnabled(), nil
}, useTLSForConnection, nil
}

func AllowedNamespace(rmq topology.RabbitmqClusterReference, requestNamespace string, cluster *rabbitmqv1beta1.RabbitmqCluster) bool {
Expand Down Expand Up @@ -178,25 +178,28 @@ func readUsernamePassword(secret *corev1.Secret) (string, string, error) {
return string(secret.Data["username"]), string(secret.Data["password"]), nil
}

func managementURI(svc *corev1.Service, tlsEnabled bool, clusterDomain string, pathPrefix string) (string, error) {
func managementURI(svc *corev1.Service, useTLSForConnection bool, clusterDomain string, pathPrefix string) (string, error) {
var managementUiPort int
var portName string

if useTLSForConnection {
portName = "management-tls"
} else {
portName = "management"
}
for _, port := range svc.Spec.Ports {
if port.Name == "management-tls" {
if port.Name == portName {
managementUiPort = int(port.Port)
break
}
if port.Name == "management" {
managementUiPort = int(port.Port)
// Do not break here because we may still find 'management-tls' port
}
}

if managementUiPort == 0 {
return "", fmt.Errorf("failed to find 'management' or 'management-tls' from service %s", svc.Name)
return "", fmt.Errorf("failed to find %s from service %s", portName, svc.Name)
}

scheme := "http"
if tlsEnabled {
if useTLSForConnection {
scheme = "https"
}
url := url.URL{
Expand Down
Loading

0 comments on commit 9bba483

Please sign in to comment.