Skip to content

Commit

Permalink
Update RDS postgres app implementation to execute postgres Query from…
Browse files Browse the repository at this point in the history
… pod (#1946)

* Update RDS postgres app implementation, execute postgres command from Deployment

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Fix Lint Remove unnecessary trailing new line

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Initialize testWorkloadName field

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Move BastionWorkload function to be used as utility

* Minor refactor

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Remove export of BastionWorkload() utility function

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Add app name in error messages

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Minor refactor

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Minor refactor

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Minor refactor

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Minor refactor

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Correct error message

* Correct error message

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

* Use tail -f for test pod

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>

---------

Signed-off-by: Akanksha Kumari <akankshakumari393@gmail.com>
  • Loading branch information
akankshakumari393 committed Mar 16, 2023
1 parent 3c4b039 commit cac55c9
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 48 deletions.
132 changes: 84 additions & 48 deletions pkg/app/rds_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ package app

import (
"context"
"database/sql"
"fmt"
"os"
"strconv"
"time"

awssdk "github.com/aws/aws-sdk-go/aws"
Expand All @@ -27,6 +27,7 @@ import (
"github.com/ghodss/yaml"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

Expand All @@ -37,33 +38,31 @@ import (
"github.com/kanisterio/kanister/pkg/field"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/log"

// Initialize pq driver
_ "github.com/lib/pq"
)

type RDSPostgresDB struct {
name string
cli kubernetes.Interface
namespace string
id string
host string
databases []string
username string
password string
accessID string
secretKey string
region string
sessionToken string
securityGroupID string
securityGroupName string
sqlDB *sql.DB
configMapName string
secretName string
name string
cli kubernetes.Interface
namespace string
id string
host string
databases []string
username string
password string
accessID string
secretKey string
region string
sessionToken string
securityGroupID string
securityGroupName string
configMapName string
secretName string
bastionDebugWorkloadName string
}

const (
dbInstanceType = "db.t3.micro"
dbInstanceType = "db.t3.micro"
postgresConnectionString = "PGPASSWORD=%s psql -h %s -p 5432 -U %s -d %s -t -c"
)

func NewRDSPostgresDB(name string, customRegion string) App {
Expand Down Expand Up @@ -131,6 +130,17 @@ func (pdb *RDSPostgresDB) Install(ctx context.Context, ns string) error {
return err
}

pdb.bastionDebugWorkloadName = fmt.Sprintf("%s-workload", pdb.name)

deploymentSpec := bastionDebugWorkloadSpec(ctx, pdb.bastionDebugWorkloadName, "postgres", pdb.namespace)
_, err = pdb.cli.AppsV1().Deployments(pdb.namespace).Create(ctx, deploymentSpec, metav1.CreateOptions{})
if err != nil {
return errors.Wrapf(err, "Failed to create deployment %s, app: %s", pdb.bastionDebugWorkloadName, pdb.name)
}

if err := kube.WaitOnDeploymentReady(ctx, pdb.cli, pdb.namespace, pdb.bastionDebugWorkloadName); err != nil {
return errors.Wrapf(err, "Failed while waiting for deployment %s to be ready, app: %s", pdb.bastionDebugWorkloadName, pdb.name)
}
// Create security group
log.Info().Print("Creating security group.", field.M{"app": pdb.name, "name": pdb.securityGroupName})
sg, err := ec2Cli.CreateSecurityGroup(ctx, pdb.securityGroupName, "kanister-test-security-group")
Expand Down Expand Up @@ -234,6 +244,7 @@ func (pdb *RDSPostgresDB) Object() crv1alpha1.ObjectReference {

// Ping makes and tests DB connection
func (pdb *RDSPostgresDB) Ping(ctx context.Context) error {
log.Print("Pinging rds postgres database", field.M{"app": pdb.name})
// Get connection info from configmap
dbconfig, err := pdb.cli.CoreV1().ConfigMaps(pdb.namespace).Get(ctx, pdb.configMapName, metav1.GetOptions{})
if err != nil {
Expand All @@ -255,63 +266,74 @@ func (pdb *RDSPostgresDB) Ping(ctx context.Context) error {
return errors.New("Databases are missing from configmap")
}

var connectionString string = fmt.Sprintf("host=%s user=%s password=%s dbname=%s sslmode=disable", dbconfig.Data["postgres.host"], dbconfig.Data["postgres.user"], dbsecret.Data["password"], databases[0])
isReadyQuery := fmt.Sprintf(postgresConnectionString+"'SELECT version();'", dbsecret.Data["password"], dbconfig.Data["postgres.host"], dbconfig.Data["postgres.user"], databases[0])

// Initialize connection object.
db, err := sql.Open("postgres", connectionString)
if err != nil {
return err
}
pingCommand := []string{"sh", "-c", isReadyQuery}

err = db.Ping()
_, stderr, err := pdb.execCommand(ctx, pingCommand)
if err != nil {
return err
return errors.Wrapf(err, "Error while Pinging the database: %s, app: %s", stderr, pdb.name)
}

pdb.sqlDB = db
log.Info().Print("Connected to database.", field.M{"app": pdb.name})
log.Print("Ping to the application was successful.", field.M{"app": pdb.name})
return nil
}

func (pdb RDSPostgresDB) Insert(ctx context.Context) error {
log.Print("Adding entry to database", field.M{"app": pdb.name})
now := time.Now().Format(time.RFC3339Nano)
stmt := "INSERT INTO inventory (name) VALUES ($1);"
_, err := pdb.sqlDB.Exec(stmt, now)
insertQuery := fmt.Sprintf(postgresConnectionString+
"\"INSERT INTO inventory (name) VALUES ('%s');\"", pdb.password, pdb.host, pdb.username, pdb.databases[0], now)

insertCommand := []string{"sh", "-c", insertQuery}
_, stderr, err := pdb.execCommand(ctx, insertCommand)
if err != nil {
return err
return errors.Wrapf(err, "Error while inserting data into table: %s, app: %s", stderr, pdb.name)
}
log.Info().Print("Inserted a row in test db.", field.M{"app": pdb.name})
return nil
}

func (pdb RDSPostgresDB) Count(ctx context.Context) (int, error) {
stmt := "SELECT COUNT(*) FROM inventory;"
row := pdb.sqlDB.QueryRow(stmt)
var count int
err := row.Scan(&count)
log.Print("Counting entries from database", field.M{"app": pdb.name})
countQuery := fmt.Sprintf(postgresConnectionString+
"\"SELECT COUNT(*) FROM inventory;\"", pdb.password, pdb.host, pdb.username, pdb.databases[0])

countCommand := []string{"sh", "-c", countQuery}
stdout, stderr, err := pdb.execCommand(ctx, countCommand)
if err != nil {
return 0, errors.Wrapf(err, "Error while counting data of table: %s, app: %s", stderr, pdb.name)
}

rowsReturned, err := strconv.Atoi(stdout)
if err != nil {
return 0, err
return 0, errors.Wrapf(err, "Error while converting response of count query: %s, app: %s", stderr, pdb.name)
}
log.Info().Print("Counting rows in test db.", field.M{"app": pdb.name, "count": count})
return count, nil

log.Info().Print("Counting rows in test db.", field.M{"app": pdb.name, "count": rowsReturned})
return rowsReturned, nil
}

func (pdb RDSPostgresDB) Reset(ctx context.Context) error {
_, err := pdb.sqlDB.Exec("DROP TABLE IF EXISTS inventory;")
log.Print("Resetting database", field.M{"app": pdb.name})
deleteQuery := fmt.Sprintf(postgresConnectionString+"\"DROP TABLE IF EXISTS inventory;\"", pdb.password, pdb.host, pdb.username, pdb.databases[0])
deleteCommand := []string{"sh", "-c", deleteQuery}
_, stderr, err := pdb.execCommand(ctx, deleteCommand)
if err != nil {
return err
return errors.Wrapf(err, "Error while deleting data from table: %s, app: %s", stderr, pdb.name)
}

log.Info().Print("Database reset successful!", field.M{"app": pdb.name})
return nil
}

// Initialize is used to initialize the database or create schema
func (pdb RDSPostgresDB) Initialize(ctx context.Context) error {
// Create table.
_, err := pdb.sqlDB.Exec("CREATE TABLE inventory (id serial PRIMARY KEY, name VARCHAR(50));")
log.Print("Initializing database", field.M{"app": pdb.name})
createQuery := fmt.Sprintf(postgresConnectionString+"\"CREATE TABLE inventory (id serial PRIMARY KEY, name VARCHAR(50));\"", pdb.password, pdb.host, pdb.username, pdb.databases[0])
createCommand := []string{"sh", "-c", createQuery}
_, stderr, err := pdb.execCommand(ctx, createCommand)
if err != nil {
return err
return errors.Wrapf(err, "Error while initializing the database: %s, app: %s", stderr, pdb.name)
}
return nil
}
Expand Down Expand Up @@ -390,6 +412,12 @@ func (pdb RDSPostgresDB) Uninstall(ctx context.Context) error {
}
}
}
// Remove workload object created for executing commands
err = pdb.cli.AppsV1().Deployments(pdb.namespace).Delete(ctx, pdb.bastionDebugWorkloadName, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "Error deleting Workload name=%s app=%s", pdb.bastionDebugWorkloadName, pdb.name)
}

return nil
}

Expand All @@ -413,3 +441,11 @@ func makeYamlList(dbs []string) string {
}
return dbsYaml
}

func (pdb RDSPostgresDB) execCommand(ctx context.Context, command []string) (string, string, error) {
podName, containerName, err := kube.GetPodContainerFromDeployment(ctx, pdb.cli, pdb.namespace, pdb.bastionDebugWorkloadName)
if err != nil || podName == "" {
return "", "", err
}
return kube.Exec(pdb.cli, pdb.namespace, podName, containerName, command, nil)
}
33 changes: 33 additions & 0 deletions pkg/app/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
package app

import (
"context"
"fmt"

appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
)

Expand Down Expand Up @@ -61,3 +65,32 @@ func getOpenShiftDBTemplate(appName string, templateVersion DBTemplate, storageT
func getLabelOfApp(appName string, storageType storage) string {
return fmt.Sprintf("app=%s-%s", appName, storageType)
}

// bastionDebugWorkloadSpec creates Deployment Resource Manifest from which RDS database queries can be executed
func bastionDebugWorkloadSpec(ctx context.Context, name string, image string, namespace string) *appsv1.Deployment {
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": name}},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": name,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: name,
Image: image,
Command: []string{"sh", "-c", "tail -f /dev/null"},
},
},
},
},
},
}
}

0 comments on commit cac55c9

Please sign in to comment.