Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS VPC Peering and Security Group Firewall rule cluster resources enhancement #566

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,14 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) startFirewallRuleStatusJob(fire
func (r *AWSSecurityGroupFirewallRuleReconciler) newWatchStatusJob(firewallRule *v1beta1.AWSSecurityGroupFirewallRule) scheduler.Job {
l := log.Log.WithValues("component", "FirewallRuleStatusJob")
return func() error {
ctx := context.Background()
instaFirewallRuleStatus, err := r.API.GetFirewallRuleStatus(firewallRule.Status.ID, instaclustr.AWSSecurityGroupFirewallRuleEndpoint)
if err != nil {
if errors.Is(err, instaclustr.NotFound) {
l.Info("The resource has been deleted on Instaclustr, deleting resource in k8s...")
return r.Delete(ctx, firewallRule)
}

l.Error(err, "Cannot get AWS security group firewall rule status from Inst API", "firewall rule ID", firewallRule.Status.ID)
return err
}
Expand All @@ -299,13 +305,13 @@ func (r *AWSSecurityGroupFirewallRuleReconciler) newWatchStatusJob(firewallRule
"firewall rule status", firewallRule.Status)
patch := firewallRule.NewPatch()
firewallRule.Status.FirewallRuleStatus = *instaFirewallRuleStatus
err := r.Status().Patch(context.Background(), firewallRule, patch)
err := r.Status().Patch(ctx, firewallRule, patch)
if err != nil {
return err
}

if instaFirewallRuleStatus.Status == statusDELETED {
r.Scheduler.RemoveJob(firewallRule.GetJobID(scheduler.StatusChecker))
go r.Scheduler.RemoveJob(firewallRule.GetJobID(scheduler.StatusChecker))
}
}

Expand Down
123 changes: 110 additions & 13 deletions controllers/clusterresources/awsvpcpeering_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/utils/strings/slices"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -194,7 +195,59 @@ func (r *AWSVPCPeeringReconciler) handleUpdatePeering(
aws *v1beta1.AWSVPCPeering,
l logr.Logger,
) reconcile.Result {
err := r.API.UpdatePeering(aws.Status.ID, instaclustr.AWSPeeringEndpoint, &aws.Spec)
instaAWSPeering, err := r.API.GetAWSVPCPeering(aws.Status.ID)
if err != nil {
l.Error(err, "Cannot get AWS VPC Peering from Instaclutr",
"AWS VPC Peering ID", aws.Status.ID,
)
r.EventRecorder.Eventf(aws, models.Warning, models.UpdateFailed,
"Cannot get AWS VPC Peering from Instaclutr. Reason: %v",
)

return models.ReconcileRequeue
}

if aws.Annotations[models.ExternalChangesAnnotation] == models.True {
if !slices.Equal(instaAWSPeering.PeerSubnets, aws.Spec.PeerSubnets) {
l.Info("The resource specification still differs from the Instaclustr resource specification, please reconcile it manually",
"AWS VPC ID", aws.Status.ID,
"k8s peerSubnets", aws.Spec.PeerSubnets,
"instaclustr peerSubnets", instaAWSPeering.PeerSubnets,
)
r.EventRecorder.Eventf(aws, models.Warning, models.UpdateFailed,
"The resource specification still differs from the Instaclustr resource specification, please reconcile it manually.",
)

return models.ExitReconcile
}

patch := aws.NewPatch()
delete(aws.Annotations, models.ExternalChangesAnnotation)
err = r.Patch(ctx, aws, patch)
if err != nil {
l.Error(err, "Cannot delete external changes annotation from the resource",
"AWS VPC Peering ID", aws.Status.ID,
)
r.EventRecorder.Eventf(
aws, models.Warning, models.PatchFailed,
"Deleting external changes annotation is failed. Reason: %v",
err,
)

return models.ReconcileRequeue
}

l.Info("External changes of the k8s resource specification was fixed",
"AWS VPC Peering ID", aws.Status.ID,
)
r.EventRecorder.Eventf(aws, models.Normal, models.ExternalChanges,
"External changes of the k8s resource specification was fixed",
)

return models.ExitReconcile
}

err = r.API.UpdatePeering(aws.Status.ID, instaclustr.AWSPeeringEndpoint, &aws.Spec)
if err != nil {
l.Error(err, "cannot update AWS VPC Peering",
"AWS Account ID", aws.Spec.PeerAWSAccountID,
Expand Down Expand Up @@ -264,7 +317,6 @@ func (r *AWSVPCPeeringReconciler) handleDeletePeering(
}

if status != nil {
r.Scheduler.RemoveJob(aws.GetJobID(scheduler.StatusChecker))
err = r.API.DeletePeering(aws.Status.ID, instaclustr.AWSPeeringEndpoint)
if err != nil {
l.Error(err, "cannot update AWS VPC Peering resource status",
Expand All @@ -288,6 +340,8 @@ func (r *AWSVPCPeeringReconciler) handleDeletePeering(
return models.ReconcileRequeue
}

r.Scheduler.RemoveJob(aws.GetJobID(scheduler.StatusChecker))

patch := aws.NewPatch()
controllerutil.RemoveFinalizer(aws, models.DeletionFinalizer)
aws.Annotations[models.ResourceStateAnnotation] = models.DeletedEvent
Expand Down Expand Up @@ -316,11 +370,6 @@ func (r *AWSVPCPeeringReconciler) handleDeletePeering(
"AWS VPC Peering Status", aws.Status.PeeringStatus,
)

r.EventRecorder.Eventf(
aws, models.Normal, models.Deleted,
"Resource is deleted",
)

return models.ExitReconcile
}

Expand All @@ -338,30 +387,78 @@ func (r *AWSVPCPeeringReconciler) startAWSVPCPeeringStatusJob(awsPeering *v1beta
func (r *AWSVPCPeeringReconciler) newWatchStatusJob(awsPeering *v1beta1.AWSVPCPeering) scheduler.Job {
l := log.Log.WithValues("component", "AWSVPCPeeringStatusJob")
return func() error {
instaPeeringStatus, err := r.API.GetPeeringStatus(awsPeering.Status.ID, instaclustr.AWSPeeringEndpoint)
ctx := context.Background()

namespacedName := client.ObjectKeyFromObject(awsPeering)
err := r.Get(ctx, namespacedName, awsPeering)
if k8serrors.IsNotFound(err) {
l.Info("Resource is not found in the k8s cluster. Closing Instaclustr status sync.",
"namespaced name", namespacedName,
)

go r.Scheduler.RemoveJob(awsPeering.GetJobID(scheduler.StatusChecker))

return nil
}

instaAWSPeering, err := r.API.GetAWSVPCPeering(awsPeering.Status.ID)
if err != nil {
l.Error(err, "cannot get AWS VPC Peering Status from Inst API", "AWS VPC Peering ID", awsPeering.Status.ID)
if errors.Is(err, instaclustr.NotFound) {
l.Info("The resource has been deleted on Instaclustr, deleting resource in k8s...")
return r.Delete(ctx, awsPeering)
}

l.Error(err, "cannot get AWS VPC Peering Status from Inst API",
"AWS VPC Peering ID", awsPeering.Status.ID,
)
return err
}

if !arePeeringStatusesEqual(instaPeeringStatus, &awsPeering.Status.PeeringStatus) {
instaPeeringStatus := v1beta1.PeeringStatus{
ID: instaAWSPeering.ID,
StatusCode: instaAWSPeering.StatusCode,
}

if !arePeeringStatusesEqual(&instaPeeringStatus, &awsPeering.Status.PeeringStatus) {
l.Info("AWS VPC Peering status of k8s is different from Instaclustr. Reconcile statuses..",
"AWS VPC Peering Status from Inst API", instaPeeringStatus,
"AWS VPC Peering Status", awsPeering.Status)

patch := awsPeering.NewPatch()
awsPeering.Status.PeeringStatus = *instaPeeringStatus
err := r.Status().Patch(context.Background(), awsPeering, patch)
awsPeering.Status.PeeringStatus = instaPeeringStatus
err := r.Status().Patch(ctx, awsPeering, patch)
if err != nil {
return err
}
}

if awsPeering.Annotations[models.ResourceStateAnnotation] != models.UpdatingEvent &&
awsPeering.Annotations[models.ExternalChangesAnnotation] != models.True &&
!slices.Equal(instaAWSPeering.PeerSubnets, awsPeering.Spec.PeerSubnets) {
l.Info("The k8s resource specification doesn't match the specification of Instaclustr, please change it manually",
"k8s peerSubnets", instaAWSPeering.PeerSubnets,
"instaclutr peerSubnets", awsPeering.Spec.PeerSubnets,
)

patch := awsPeering.NewPatch()
awsPeering.Annotations[models.ExternalChangesAnnotation] = models.True

err = r.Patch(ctx, awsPeering, patch)
if err != nil {
l.Error(err, "Cannot patch the resource with external changes annotation")

return err
}

r.EventRecorder.Event(awsPeering, models.Warning, models.ExternalChanges,
"k8s spec doesn't match spec of Instaclutr, please change it manually",
)
}

return nil
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *AWSVPCPeeringReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1beta1.AWSVPCPeering{}, builder.WithPredicates(predicate.Funcs{
Expand Down
30 changes: 30 additions & 0 deletions pkg/instaclustr/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2265,3 +2265,33 @@ func (c *Client) GetResizeOperationsByClusterDataCentreID(cdcID string) ([]*v1be

return resize.Operations, nil
}

func (c *Client) GetAWSVPCPeering(peerID string) (*models.AWSVPCPeering, error) {
url := c.serverHostname + AWSPeeringEndpoint + peerID
resp, err := c.DoRequest(url, http.MethodGet, nil)
if err != nil {
return nil, err
}
defer resp.Body.Close()

b, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode == http.StatusNotFound {
return nil, NotFound
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("status code: %d, message: %s", resp.StatusCode, b)
}

var vpcPeering models.AWSVPCPeering
err = json.Unmarshal(b, &vpcPeering)
if err != nil {
return nil, err
}

return &vpcPeering, nil
}
1 change: 1 addition & 0 deletions pkg/instaclustr/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type API interface {
UpdateCluster(id, clusterEndpoint string, instaDCs any) error
DeleteCluster(id, clusterEndpoint string) error
GetPeeringStatus(peerID, peeringEndpoint string) (*clusterresourcesv1beta1.PeeringStatus, error)
GetAWSVPCPeering(peerID string) (*models.AWSVPCPeering, error)
UpdatePeering(peerID, peeringEndpoint string, peerSpec any) error
DeletePeering(peerID, peeringEndpoint string) error
CreatePeering(url string, peeringSpec any) (*clusterresourcesv1beta1.PeeringStatus, error)
Expand Down
4 changes: 4 additions & 0 deletions pkg/instaclustr/mock/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,3 +367,7 @@ func (c *mockClient) GetRedisUser(id string) (*models.RedisUser, error) {
func (c *mockClient) GetResizeOperationsByClusterDataCentreID(cdcID string) ([]*clustersv1beta1.ResizeOperation, error) {
panic("GetResizeOperationsByClusterDataCentreID: is not implemented")
}

func (c *mockClient) GetAWSVPCPeering(peerID string) (*models.AWSVPCPeering, error) {
panic("GetAWSVPCPeering: is not implemented")
}
11 changes: 11 additions & 0 deletions pkg/models/apiv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,14 @@ func (cb *ClusterBackup) GetBackupEvents(clusterKind string) map[int]*BackupEven

return instBackupEvents
}

type AWSVPCPeering struct {
ID string `json:"id"`
CDCID string `json:"cdcId"`
DataCentreVPCID string `json:"dataCentreVpcId"`
PeerAWSAccountID string `json:"peerAwsAccountId"`
PeerRegion string `json:"peerRegion"`
PeerSubnets []string `json:"peerSubnets"`
PeerVpcID string `json:"peerVpcId"`
StatusCode string `json:"statusCode"`
}