Skip to content

Commit

Permalink
Create and manage Tigris bucket for PG/Barman backups (#3693)
Browse files Browse the repository at this point in the history
Co-authored-by: Ben Iofel <bi@fly.io>
Co-authored-by: Shaun Davis <davissp14@gmail.com>
  • Loading branch information
3 people committed Jul 15, 2024
1 parent f4193cc commit 53c70ff
Show file tree
Hide file tree
Showing 14 changed files with 786 additions and 58 deletions.
102 changes: 68 additions & 34 deletions flypg/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (
)

var (
volumeName = "pg_data"
volumePath = "/data"
Duration10s, _ = time.ParseDuration("10s")
Duration15s, _ = time.ParseDuration("15s")
CheckPathPg = "/flycheck/pg"
CheckPathRole = "/flycheck/role"
CheckPathVm = "/flycheck/vm"
volumeName = "pg_data"
volumePath = "/data"
Duration10s, _ = time.ParseDuration("10s")
Duration15s, _ = time.ParseDuration("15s")
CheckPathPg = "/flycheck/pg"
CheckPathRole = "/flycheck/role"
CheckPathVm = "/flycheck/vm"
BarmanSecretName = "S3_ARCHIVE_CONFIG"
)

const (
Expand All @@ -48,12 +49,20 @@ type CreateClusterInput struct {
Password string
Region string
VolumeSize *int
VMSize *fly.VMSize
SnapshotID *string
Manager string
Autostart bool
ScaleToZero bool
ForkFrom string
// VMSize is deprecated, specify Guest instead.
VMSize *fly.VMSize
Guest *fly.MachineGuest
SnapshotID *string
Manager string
Autostart bool
ScaleToZero bool
ForkFrom string
BackupsEnabled bool
BarmanSecret string
BarmanRemoteRestoreConfig string
RestoreTargetName string
RestoreTargetTime string
RestoreTargetInclusive bool
}

func NewLauncher(client flyutil.Client) *Launcher {
Expand Down Expand Up @@ -82,13 +91,35 @@ func (l *Launcher) LaunchMachinesPostgres(ctx context.Context, config *CreateClu
// In case the user hasn't specified a name, use the app name generated by the API
config.AppName = app.Name

if config.ImageRef == "" {
imageRepo := "flyio/postgres"

if config.Manager == ReplicationManager {
imageRepo = "flyio/postgres-flex"
}

imageRef, err := client.GetLatestImageTag(ctx, imageRepo, config.SnapshotID)
if err != nil {
return err
}
config.ImageRef = imageRef
}

var addr *fly.IPAddress

if config.Manager == ReplicationManager {
addr, err = l.client.AllocateIPAddress(ctx, config.AppName, "private_v6", config.Region, config.Organization, "")
if err != nil {
return err
}

// TODO - We need to verify target image before we do this.
// Create the Tigris bucket for backup storage
if config.BackupsEnabled {
if err := CreateTigrisBucket(ctx, config); err != nil {
return err
}
}
}

secrets, err := l.setSecrets(ctx, config)
Expand All @@ -108,22 +139,14 @@ func (l *Launcher) LaunchMachinesPostgres(ctx context.Context, config *CreateClu
nodes := make([]*fly.Machine, 0)

for i := 0; i < config.InitialClusterSize; i++ {
machineConf := l.getPostgresConfig(config)
var (
machineConf *fly.MachineConfig
snapshot *string
)

machineConf.Image = config.ImageRef
if machineConf.Image == "" {
imageRepo := "flyio/postgres"

if config.Manager == ReplicationManager {
imageRepo = "flyio/postgres-flex"
}
machineConf = l.getPostgresConfig(config)

imageRef, err := client.GetLatestImageTag(ctx, imageRepo, config.SnapshotID)
if err != nil {
return err
}
machineConf.Image = imageRef
}
machineConf.Image = config.ImageRef

concurrency := &fly.MachineServiceConcurrency{
Type: "connections",
Expand All @@ -132,8 +155,9 @@ func (l *Launcher) LaunchMachinesPostgres(ctx context.Context, config *CreateClu
}

if config.Manager == ReplicationManager {
var bouncerPort int = 5432
var pgPort int = 5433
var bouncerPort = 5432
var pgPort = 5433

machineConf.Services = []fly.MachineService{
{
Protocol: "tcp",
Expand Down Expand Up @@ -169,7 +193,7 @@ func (l *Launcher) LaunchMachinesPostgres(ctx context.Context, config *CreateClu
}
}

snapshot := config.SnapshotID
snapshot = config.SnapshotID
verb := "Provisioning"

if snapshot != nil {
Expand Down Expand Up @@ -296,10 +320,14 @@ func (l *Launcher) getPostgresConfig(config *CreateClusterInput) *fly.MachineCon
}

// Set VM resources
machineConfig.Guest = &fly.MachineGuest{
CPUKind: config.VMSize.CPUClass,
CPUs: int(config.VMSize.CPUCores),
MemoryMB: config.VMSize.MemoryMB,
if config.Guest != nil {
machineConfig.Guest = config.Guest
} else {
machineConfig.Guest = &fly.MachineGuest{
CPUKind: config.VMSize.CPUClass,
CPUs: int(config.VMSize.CPUCores),
MemoryMB: config.VMSize.MemoryMB,
}
}

// Metrics
Expand Down Expand Up @@ -417,6 +445,12 @@ func (l *Launcher) setSecrets(ctx context.Context, config *CreateClusterInput) (
"OPERATOR_PASSWORD": opPassword,
}

if config.BarmanSecret != "" {
secrets[BarmanSecretName] = config.BarmanSecret
} else if config.BarmanRemoteRestoreConfig != "" {
secrets["S3_ARCHIVE_REMOTE_RESTORE_CONFIG"] = config.BarmanRemoteRestoreConfig
}

if config.Manager == ReplicationManager {
pub, priv, err := ed25519.GenerateKey(nil)
if err != nil {
Expand Down
100 changes: 100 additions & 0 deletions flypg/tigris.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package flypg

import (
"context"
"fmt"
"net/url"
"reflect"
"strings"

"github.com/superfly/flyctl/iostreams"

extensions_core "github.com/superfly/flyctl/internal/command/extensions/core"
)

func CreateTigrisBucket(ctx context.Context, config *CreateClusterInput) error {
if !config.BackupsEnabled {
return nil
}

var (
io = iostreams.FromContext(ctx)
)
fmt.Fprintln(io.Out, "Creating Tigris bucket for backup storage")

options := map[string]interface{}{
"Public": false,
"Accelerate": false,
}
options["website"] = map[string]interface{}{
"domain_name": "",
}
name := config.AppName + "-postgres"
params := extensions_core.ExtensionParams{
AppName: config.AppName,
Organization: config.Organization,
Provider: "tigris",
OverrideName: &name,
}
params.Options = options

var extension extensions_core.Extension
provisionExtension := true
index := 1

for provisionExtension {
var err error
extension, err = extensions_core.ProvisionExtension(ctx, params)
if err != nil {
if strings.Contains(err.Error(), "unavailable") || strings.Contains(err.Error(), "Name has already been taken") {
name := fmt.Sprintf("%s-postgres-%d", config.AppName, index)
params.OverrideName = &name
index++
} else {
return err
}
} else {
provisionExtension = false
}
}

environment := extension.Data.Environment
if environment == nil || reflect.ValueOf(environment).IsNil() {
return nil
}

env := extension.Data.Environment.(map[string]interface{})

accessKeyId, ok := env["AWS_ACCESS_KEY_ID"].(string)
if !ok || accessKeyId == "" {
return fmt.Errorf("AWS_ACCESS_KEY_ID is unset")
}

accessSecret, ok := env["AWS_SECRET_ACCESS_KEY"].(string)
if !ok || accessSecret == "" {
return fmt.Errorf("AWS_SECRET_ACCESS_KEY is unset")
}

endpoint, ok := env["AWS_ENDPOINT_URL_S3"].(string)
if !ok || endpoint == "" {
return fmt.Errorf("AWS_ENDPOINT_URL_S3 is unset")
}

bucketName, ok := env["BUCKET_NAME"].(string)
if !ok || bucketName == "" {
return fmt.Errorf("BUCKET_NAME is unset")
}

bucketDirectory := config.AppName

endpointURL, err := url.Parse(endpoint)
if err != nil {
return err
}

endpointURL.User = url.UserPassword(accessKeyId, accessSecret)
endpointURL.Path = "/" + bucketName + "/" + bucketDirectory
config.BarmanSecret = endpointURL.String()

return nil
}
2 changes: 1 addition & 1 deletion internal/command/postgres/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func machineAttachCluster(ctx context.Context, params AttachParams, flycast *str
return fmt.Errorf("no active machines found")
}

if err := hasRequiredVersionOnMachines(machines, MinPostgresHaVersion, MinPostgresFlexVersion, MinPostgresStandaloneVersion); err != nil {
if err := hasRequiredVersionOnMachines(params.AppName, machines, MinPostgresHaVersion, MinPostgresFlexVersion, MinPostgresStandaloneVersion); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 53c70ff

Please sign in to comment.