Skip to content

Commit

Permalink
feat(storage-node): unregister storage unit when storage node be deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
Xu-Wentao committed May 19, 2023
1 parent 45394e7 commit cd25639
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -645,5 +645,81 @@ var _ = Describe("StorageNode Controller Mock Test", func() {
Expect(reconciler.registerStorageUnit(ctx, sn)).To(BeNil())
Expect(sn.Status.Registered).To(BeTrue())
})

Context("Test unregisterStorageUnit", func() {
BeforeEach(func() {
mockCtrl = gomock.NewController(GinkgoT())
mockSS = mock_shardingsphere.NewMockIServer(mockCtrl)
monkey.Patch(shardingsphere.NewServer, func(_, _ string, _ uint, _, _ string) (shardingsphere.IServer, error) {
return mockSS, nil
})
})
AfterEach(func() {
mockCtrl.Finish()
monkey.UnpatchAll()
})
It("should be successful when unregister storage unit", func() {
testName := "test-unregister-storage-unit"

cn := &v1alpha1.ComputeNode{
ObjectMeta: metav1.ObjectMeta{
Name: testName,
Namespace: defaultTestNamespace,
},
Spec: v1alpha1.ComputeNodeSpec{
Bootstrap: v1alpha1.BootstrapConfig{
ServerConfig: v1alpha1.ServerConfig{
Authority: v1alpha1.ComputeNodeAuthority{
Users: []v1alpha1.ComputeNodeUser{
{
User: "root",
Password: "root",
},
},
},
},
},
},
}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: testName,
Namespace: defaultTestNamespace,
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "http",
Protocol: "TCP",
Port: 3307,
},
},
},
}
Expect(fakeClient.Create(ctx, cn)).Should(Succeed())
Expect(fakeClient.Create(ctx, svc)).Should(Succeed())

sn := &v1alpha1.StorageNode{
ObjectMeta: metav1.ObjectMeta{
Name: testName,
Namespace: defaultTestNamespace,
Annotations: map[string]string{
AnnotationKeyLogicDatabaseName: testName,
dbmeshv1alpha1.AnnotationsInstanceDBName: testName,
AnnotationKeyComputeNodeName: testName,
AnnotationKeyComputeNodeNamespace: defaultTestNamespace,
},
},
Status: v1alpha1.StorageNodeStatus{
Registered: true,
},
}
Expect(fakeClient.Create(ctx, sn)).Should(Succeed())

mockSS.EXPECT().UnRegisterStorageUnit(gomock.Any()).Return(nil)
mockSS.EXPECT().Close().Return(nil)
Expect(reconciler.unregisterStorageUnit(ctx, sn)).To(BeNil())
})
})
})
})
32 changes: 32 additions & 0 deletions shardingsphere-operator/pkg/controllers/storage_node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ func (r *StorageNodeReconciler) finalize(ctx context.Context, node *v1alpha1.Sto
return ctrl.Result{}, nil
}

// Try to unregister storage unit in shardingsphere.
if err = r.unregisterStorageUnit(ctx, node); err != nil {
r.Log.Error(err, "failed to delete storage unit")
return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
}

if err = r.deleteDatabaseCluster(ctx, node, databaseClass); err != nil {
r.Log.Error(err, "failed to delete database cluster")
return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
Expand Down Expand Up @@ -506,6 +512,32 @@ func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node *v
return nil
}

func (r *StorageNodeReconciler) unregisterStorageUnit(ctx context.Context, node *v1alpha1.StorageNode) error {
if !node.Status.Registered {
return nil
}
if err := r.validateComputeNodeAnnotations(node); err != nil {
return err
}

ssServer, err := r.getShardingsphereServer(ctx, node)
if err != nil {
return fmt.Errorf("getShardingsphereServer failed: %w", err)
}

defer ssServer.Close()

// TODO how to set ds name?
if err := ssServer.UnRegisterStorageUnit("ds_0"); err != nil {
return fmt.Errorf("unregister storage unit failed: %w", err)
}

r.Recorder.Eventf(node, corev1.EventTypeNormal, "StorageUnitUnRegistered", "StorageUnit of node %s/%s is unregistered", node.GetNamespace(), node.GetName())

node.Status.Registered = false
return nil
}

func (r *StorageNodeReconciler) validateComputeNodeAnnotations(node *v1alpha1.StorageNode) error {
requiredAnnos := []string{
AnnotationKeyLogicDatabaseName,
Expand Down
26 changes: 2 additions & 24 deletions shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ const (
DistSQLDropTable = `DROP TABLE %s;`
)

var (
ruleTypeMap = map[string]string{}
)
var ruleTypeMap = map[string]string{}

type Rule struct {
Type string
Expand Down Expand Up @@ -142,22 +140,11 @@ func (s *server) UnRegisterStorageUnit(dsName string) error {
return fmt.Errorf("get rules used error: %w", err)
}

// TODO DISCUSS: should we drop all tables used by storage unit?
// clean all rules and tables used by storage unit
tables := map[string]struct{}{}
// clean all rules used by storage unit
for _, rule := range rules {
if err := s.dropRule(rule.Type, rule.Name); err != nil {
return fmt.Errorf("drop rule error: %w", err)
}
if _, ok := tables[rule.Name]; !ok {
tables[rule.Name] = struct{}{}
}
}

for table := range tables {
if err := s.dropTable(table); err != nil {
return fmt.Errorf("drop table error: %w", err)
}
}

distSQL := fmt.Sprintf(DistSQLUnRegisterStorageUnit, dsName)
Expand All @@ -181,15 +168,6 @@ func (s *server) dropRule(ruleType, ruleName string) error {
return nil
}

func (s *server) dropTable(tableName string) error {
distSQL := fmt.Sprintf(DistSQLDropTable, tableName)
_, err := s.db.Exec(distSQL)
if err != nil {
return fmt.Errorf("drop table fail, err: %s", err)
}
return nil
}

func init() {
// init rule type map
// implement more rule type if needed
Expand Down
42 changes: 35 additions & 7 deletions shardingsphere-operator/test/e2e/storage_node_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"database/sql"
"github.com/DATA-DOG/go-sqlmock"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"reflect"
"regexp"
"time"
Expand Down Expand Up @@ -145,15 +146,13 @@ var _ = Describe("StorageNode Controller Suite Test", func() {
}, 10*time.Second, 1*time.Second).Should(BeTrue())
})

It("should register storage unit success", func() {
It("should register and unregister storage unit success", func() {
// mock mysql
db, dbmock, err := sqlmock.New()
Expect(err).Should(Succeed())
Expect(dbmock).ShouldNot(BeNil())
defer db.Close()

// mock rds DescribeDBInstances func returns success
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescInstance, error) {
g := monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescInstance, error) {
return &dbmesh_rds.DescInstance{
DBInstanceStatus: dbmesh_rds.DBInstanceStatusAvailable,
Endpoint: dbmesh_rds.Endpoint{
Expand All @@ -162,9 +161,15 @@ var _ = Describe("StorageNode Controller Suite Test", func() {
},
}, nil
})
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "DeleteInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode, _ *dbmeshv1alpha1.DatabaseClass) error {
return nil
})
monkey.Patch(sql.Open, func(_ string, _ string) (*sql.DB, error) {
return db, nil
})
monkey.PatchInstanceMethod(reflect.TypeOf(db), "Close", func(_ *sql.DB) error {
return nil
})

cn := &v1alpha1.ComputeNode{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -238,20 +243,43 @@ var _ = Describe("StorageNode Controller Suite Test", func() {
Expect(k8sClient.Create(ctx, cn)).Should(Succeed())
Expect(k8sClient.Create(ctx, node)).Should(Succeed())

dbmock.ExpectExec(regexp.QuoteMeta("CREATE DATABASE IF NOT EXISTS")).WillReturnResult(sqlmock.NewResult(0, 0))
dbmock.ExpectExec(regexp.QuoteMeta("CREATE DATABASE IF NOT EXISTS")).WillReturnResult(sqlmock.NewResult(1, 1))
dbmock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE UNIT IF NOT EXISTS")).WillReturnResult(sqlmock.NewResult(0, 0))

Eventually(func() v1alpha1.StorageNodePhaseStatus {
newSN := &v1alpha1.StorageNode{}
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)).Should(Succeed())
return newSN.Status.Phase
}, 10, 1).Should(Equal(v1alpha1.StorageNodePhaseReady))
}, 20, 2).Should(Equal(v1alpha1.StorageNodePhaseReady))

Eventually(func() bool {
newSN := &v1alpha1.StorageNode{}
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)).Should(Succeed())
return newSN.Status.Registered
}, 10, 1).Should(BeTrue())
}, 20, 2).Should(BeTrue())

// delete storage node
Expect(k8sClient.Delete(ctx, node)).Should(Succeed())

dbmock.ExpectQuery(regexp.QuoteMeta("SHOW RULES USED STORAGE UNIT")).WillReturnRows(sqlmock.NewRows([]string{"type", "name"}).AddRow("sharding", "t_order"))
dbmock.ExpectExec("DROP SHARDING TABLE RULE").WillReturnResult(sqlmock.NewResult(1, 1))
dbmock.ExpectExec(regexp.QuoteMeta("UNREGISTER STORAGE UNIT")).WillReturnResult(sqlmock.NewResult(0, 0))
Eventually(func() v1alpha1.StorageNodePhaseStatus {
newSN := &v1alpha1.StorageNode{}
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)).Should(Succeed())
return newSN.Status.Phase
}, 20, 2).Should(Equal(v1alpha1.StorageNodePhaseDeleting))

g.Unpatch()
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) (*dbmesh_rds.DescInstance, error) {
return nil, nil
})

Eventually(func() bool {
newSN := &v1alpha1.StorageNode{}
err := k8sClient.Get(ctx, client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)
return apierrors.IsNotFound(err)
}, 20, 2).Should(BeTrue())
})
})
})

0 comments on commit cd25639

Please sign in to comment.