Skip to content

Commit

Permalink
Update RDS aurora app implementation to execute mysql Query from pod (#…
Browse files Browse the repository at this point in the history
…1949)

* Update RDS postgres app implementation, execute postgres command from Deployment

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Fix Lint Remove unnecessary trailing new line

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Initialize testWorkloadName field

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Move BastionWorkload function to be used as utility

* Minor refactor

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Remove export of BastionWorkload() utility function

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Add app name in error messages

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Minor refactor

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Minor refactor

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Minor refactor

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Minor refactor

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Update RDS Aurora app implementation, execute mysql query from Deployment

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Remove unused methods from rds aurora app

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Minor refactor

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* minor refactor

* Minor refactor

* refactor error messages

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Correct error message

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* go mod tidy

---------

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>
  • Loading branch information
akankshakumari393 committed Mar 17, 2023
1 parent cac55c9 commit 24d4559
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 147 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ require (
github.com/aws/aws-sdk-go v1.44.209
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/go-openapi/strfmt v0.21.3
github.com/go-sql-driver/mysql v1.7.0
github.com/gofrs/uuid v4.4.0+incompatible
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,6 @@ github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-openapi/swag v0.19.14 h1:gm3vOOXfiuw5i9p5N9xJvfjvuofpyvLA9Wr6QfK5Fng=
github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA=
Expand Down
224 changes: 80 additions & 144 deletions pkg/app/rds_aurora_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ package app

import (
"context"
"database/sql"
"fmt"
"os"
"strconv"

awssdk "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
rdserr "github.com/aws/aws-sdk-go/service/rds"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

Expand All @@ -36,32 +37,31 @@ import (
"github.com/kanisterio/kanister/pkg/function"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/log"
"github.com/kanisterio/kanister/pkg/poll"

_ "github.com/go-sql-driver/mysql"
)

const (
AuroraDBInstanceClass = "db.r5.large"
AuroraDBStorage = 20
DetailsCMName = "dbconfig"
mysqlConnectionString = "mysql -h %s -u %s -p%s %s -N -e"
)

type RDSAuroraMySQLDB struct {
name string
cli kubernetes.Interface
namespace string
id string
host string
dbName string
username string
password string
accessID string
secretKey string
region string
sessionToken string
securityGroupID string
securityGroupName string
name string
cli kubernetes.Interface
namespace string
id string
host string
dbName string
username string
password string
accessID string
secretKey string
region string
sessionToken string
securityGroupID string
securityGroupName string
bastionDebugWorkloadName string
}

func NewRDSAuroraMySQLDB(name, region string) App {
Expand Down Expand Up @@ -127,6 +127,17 @@ func (a *RDSAuroraMySQLDB) Install(ctx context.Context, namespace string) error
return err
}

a.bastionDebugWorkloadName = fmt.Sprintf("%s-workload", a.name)

deploymentSpec := bastionDebugWorkloadSpec(ctx, a.bastionDebugWorkloadName, "mysql", a.namespace)
_, err = a.cli.AppsV1().Deployments(a.namespace).Create(ctx, deploymentSpec, metav1.CreateOptions{})
if err != nil {
return errors.Wrapf(err, "Failed to create deployment %s, app=%s", a.bastionDebugWorkloadName, a.name)
}

if err := kube.WaitOnDeploymentReady(ctx, a.cli, a.namespace, a.bastionDebugWorkloadName); err != nil {
return errors.Wrapf(err, "Failed while waiting for deployment %s to be ready, app=%s", a.bastionDebugWorkloadName, a.name)
}
// Create security group
log.Info().Print("Creating security group.", field.M{"app": a.name, "name": a.securityGroupName})
sg, err := ec2Cli.CreateSecurityGroup(ctx, a.securityGroupName, "To allow ingress to Aurora DB cluster")
Expand Down Expand Up @@ -201,156 +212,76 @@ func (a *RDSAuroraMySQLDB) IsReady(context.Context) (bool, error) {
return true, nil
}

func (a *RDSAuroraMySQLDB) Ping(context.Context) error {
db, err := a.openDBConnection()
if err != nil {
return errors.Wrap(err, "Error opening database connection")
}
defer func() {
if err = a.closeDBConnection(db); err != nil {
log.Print("Error closing DB connection", field.M{"app": a.name})
}
}()
func (a *RDSAuroraMySQLDB) Ping(ctx context.Context) error {
log.Print("Pinging rds aurora database", field.M{"app": a.name})
pingQuery := fmt.Sprintf(mysqlConnectionString+"'SELECT 1;'", a.host, a.username, a.password, a.dbName)

pingQuery := "select 1"
_, err = db.Query(pingQuery)
return err
}
pingCommand := []string{"sh", "-c", pingQuery}

func (a *RDSAuroraMySQLDB) Insert(ctx context.Context) error {
db, err := a.openDBConnection()
_, stderr, err := a.execCommand(ctx, pingCommand)
if err != nil {
return err
return errors.Wrapf(err, "Error while Pinging the database: %s, app: %s", stderr, a.name)
}
defer func() {
if err = a.closeDBConnection(db); err != nil {
log.Print("Error closing DB connection", field.M{"app": a.name})
}
}()

query, err := db.Prepare("INSERT INTO pets VALUES (?,?,?,?,?,?);")
if err != nil {
return errors.Wrap(err, "Error preparing query")
}
log.Print("Ping to the application was success.", field.M{"app": a.name})
return nil
}

// start a transaction
tx, err := db.Begin()
if err != nil {
return errors.Wrap(err, "Error beginning transaction")
}
func (a *RDSAuroraMySQLDB) Insert(ctx context.Context) error {
log.Print("Adding entry to database", field.M{"app": a.name})
insertQuery := fmt.Sprintf(mysqlConnectionString+
"\"INSERT INTO pets VALUES ('Puffball', 'Diane', 'hamster', 'f', '1999-03-30', 'NULL');\"", a.host, a.username, a.password, a.dbName)

_, err = tx.Stmt(query).Exec("Puffball", "Diane", "hamster", "f", "1999-03-30", "NULL")
insertCommand := []string{"sh", "-c", insertQuery}
_, stderr, err := a.execCommand(ctx, insertCommand)
if err != nil {
return errors.Wrap(err, "Error inserting data into Aurora DB cluster")
}

if err = tx.Commit(); err != nil {
return errors.Wrap(err, "Error committing data into Aurora DB database")
return errors.Wrapf(err, "Error while inserting data into table: %s, app: %s", stderr, a.name)
}

log.Info().Print("Inserted a row in test db.", field.M{"app": a.name})
return nil
}

func (a *RDSAuroraMySQLDB) Count(context.Context) (int, error) {
db, err := a.openDBConnection()
func (a *RDSAuroraMySQLDB) Count(ctx context.Context) (int, error) {
log.Print("Counting entries from database", field.M{"app": a.name})
countQuery := fmt.Sprintf(mysqlConnectionString+
"\"SELECT COUNT(*) FROM pets;\"", a.host, a.username, a.password, a.dbName)

countCommand := []string{"sh", "-c", countQuery}
stdout, stderr, err := a.execCommand(ctx, countCommand)
if err != nil {
return 0, err
return 0, errors.Wrapf(err, "Error while counting data of table: %s, app: %s", stderr, a.name)
}
defer func() {
if err = a.closeDBConnection(db); err != nil {
log.Print("Error closing DB connection", field.M{"app": a.name})
}
}()

rows, err := db.Query("select * from pets;")
rowsReturned, err := strconv.Atoi(stdout)
if err != nil {
return 0, errors.Wrap(err, "Error preparing count query")
}
count := 0
for rows.Next() {
count++
return 0, errors.Wrapf(err, "Error while converting response of count query to int: %s, app: %s", stderr, a.name)
}

return count, nil
log.Info().Print("Number of rows in test DB.", field.M{"app": a.name, "count": rowsReturned})
return rowsReturned, nil
}

func (a *RDSAuroraMySQLDB) Reset(ctx context.Context) error {
timeoutCtx, waitCancel := context.WithTimeout(ctx, mysqlWaitTimeout)
defer waitCancel()
err := poll.Wait(timeoutCtx, func(ctx context.Context) (bool, error) {
err := a.Ping(ctx)
return err == nil, nil
})

if err != nil {
return errors.Wrapf(err, "Error waiting for application %s to be ready to reset it", a.name)
}

log.Print("Resetting the mysql instance.", field.M{"app": a.name})

db, err := a.openDBConnection()
deleteQuery := fmt.Sprintf(mysqlConnectionString+"\"DROP TABLE IF EXISTS pets;\"", a.host, a.username, a.password, a.dbName)
deleteCommand := []string{"sh", "-c", deleteQuery}
_, stderr, err := a.execCommand(ctx, deleteCommand)
if err != nil {
return err
}
defer func() {
if err = a.closeDBConnection(db); err != nil {
log.Print("Error closing DB connection", field.M{"app": a.name})
}
}()

query, err := db.Prepare("DROP TABLE IF EXISTS pets;")
if err != nil {
return errors.Wrap(err, "Error preparing reset query")
}

// start a transaction
tx, err := db.Begin()
if err != nil {
return errors.Wrap(err, "Error beginning transaction")
}

_, err = tx.Stmt(query).Exec()
if err != nil {
return errors.Wrap(err, "Error resetting Aurora database")
}

if err = tx.Commit(); err != nil {
return errors.Wrap(err, "Error committing DB reset transaction")
return errors.Wrapf(err, "Error while deleting data from table: %s, app: %s", stderr, a.name)
}

log.Info().Print("Database reset was successful!", field.M{"app": a.name})
return nil
}

func (a *RDSAuroraMySQLDB) Initialize(context.Context) error {
db, err := a.openDBConnection()
func (a *RDSAuroraMySQLDB) Initialize(ctx context.Context) error {
log.Print("Initializing database", field.M{"app": a.name})
createQuery := fmt.Sprintf(mysqlConnectionString+"\"CREATE TABLE pets (name VARCHAR(20), owner VARCHAR(20), species VARCHAR(20), sex CHAR(1), birth DATE, death DATE);\"", a.host, a.username, a.password, a.dbName)
createCommand := []string{"sh", "-c", createQuery}
_, stderr, err := a.execCommand(ctx, createCommand)
if err != nil {
return err
}

defer func() {
if err = a.closeDBConnection(db); err != nil {
log.Print("Error closing DB connection", field.M{"app": a.name})
}
}()

query, err := db.Prepare("CREATE TABLE pets (name VARCHAR(20), owner VARCHAR(20), species VARCHAR(20), sex CHAR(1), birth DATE, death DATE);")
if err != nil {
return errors.Wrap(err, "Error preparing query")
}

// start a transaction
tx, err := db.Begin()
if err != nil {
return errors.Wrap(err, "Error begining transaction")
}

_, err = tx.Stmt(query).Exec()
if err != nil {
return errors.Wrap(err, "Error creating table into Aurora database")
}

if err = tx.Commit(); err != nil {
return errors.Wrap(err, "Error committing table creation")
return errors.Wrapf(err, "Error while creating the database: %s, app: %s", stderr, a.name)
}
return nil
}
Expand Down Expand Up @@ -410,6 +341,11 @@ func (a *RDSAuroraMySQLDB) Uninstall(ctx context.Context) error {
}
}

// Remove workload object created for executing commands
err = a.cli.AppsV1().Deployments(a.namespace).Delete(ctx, a.bastionDebugWorkloadName, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "Error deleting Workload %s, app=%s", a.bastionDebugWorkloadName, a.name)
}
return nil
}

Expand All @@ -426,10 +362,10 @@ func (a *RDSAuroraMySQLDB) getAWSConfig(ctx context.Context) (*awssdk.Config, st
return aws.GetConfig(ctx, config)
}

func (a *RDSAuroraMySQLDB) openDBConnection() (*sql.DB, error) {
return sql.Open("mysql", fmt.Sprintf("%s:%s@(%s)/%s", a.username, a.password, a.host, a.dbName))
}

func (a RDSAuroraMySQLDB) closeDBConnection(db *sql.DB) error {
return db.Close()
func (a RDSAuroraMySQLDB) execCommand(ctx context.Context, command []string) (string, string, error) {
podName, containerName, err := kube.GetPodContainerFromDeployment(ctx, a.cli, a.namespace, a.bastionDebugWorkloadName)
if err != nil || podName == "" {
return "", "", err
}
return kube.Exec(a.cli, a.namespace, podName, containerName, command, nil)
}

0 comments on commit 24d4559

Please sign in to comment.