Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retention Policies with Partitioning #2194

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions pkg/partition/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package partition

import (
"context"
"fmt"
"github.com/frain-dev/convoy/pkg/log"
"github.com/jmoiron/sqlx"
"time"
)

// Manager Core partition manager
type Manager struct {
db *sqlx.DB
logger *log.Logger
config Config
}

func (m *Manager) Initialize(ctx context.Context, config Config) error {
// create the library's management table

// upsert: add the new tables that are to be managed by the library

return nil
}

func (m *Manager) CreateFuturePartitions(ctx context.Context, tableConfig TableConfig, ahead uint) error {
// fetch active managed tables

// for each parent table: create a future partition table

// create the indexes and attach it to the parent table

return nil
}

func (m *Manager) DropOldPartitions(ctx context.Context) error {
// fetch active managed tables

// for each parent table: fetch the tables to be dropped

// run any hooks (gzip and export to object storage)

// drop the table

return nil
}

// createPartition creates a partition for a table
func (m *Manager) createPartition(ctx context.Context, tableConfig TableConfig, bounds Bounds) error {
// Generate partition name based on bounds
partitionName := m.generatePartitionName(tableConfig, bounds)

// Create SQL for partition
sql, err := m.generatePartitionSQL(partitionName, tableConfig, bounds)

// Execute partition creation
_, err = m.db.ExecContext(ctx, sql)
if err != nil {
return err
}

return nil
}

// Maintain defines a regularly run maintenance routine
func (m *Manager) Maintain(ctx context.Context) error {
// loop all tables and run maintenance

for i := 0; i < len(m.config.Tables); i++ {
table := m.config.Tables[i]

// Check for necessary future partitions
if err := m.CreateFuturePartitions(ctx, table, 1); err != nil {
return fmt.Errorf("failed to create future partitions: %w", err)
}

// Drop old partitions if needed
if err := m.DropOldPartitions(ctx); err != nil {
return fmt.Errorf("failed to drop old partitions: %w", err)
}
}

return nil
}

// generatePartitionSQL generates the name of the partition table
func (m *Manager) generatePartitionSQL(name string, tableConfig TableConfig, bounds Bounds) (string, error) {
switch tableConfig.PartitionType {
case "range":
return m.generateRangePartitionSQL(name, bounds), nil
case "list", "hash":
return "", fmt.Errorf("list and hash partitions are not implemented yet %q", tableConfig.PartitionType)
default:
return "", fmt.Errorf("unsupported partition type %q", tableConfig.PartitionType)
}
}

func (m *Manager) generateRangePartitionSQL(name string, bounds Bounds) string {
return fmt.Sprintf(`

Check failure on line 99 in pkg/partition/manager.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest, 15, 6.2.6)

fmt.Sprintf format %s has arg m.config.Tables[0] of wrong type github.com/frain-dev/convoy/pkg/partition.TableConfig

Check failure on line 99 in pkg/partition/manager.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest, 15, 6.2.6)

fmt.Sprintf format %s has arg m.config.Tables[0] of wrong type github.com/frain-dev/convoy/pkg/partition.TableConfig
CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES FROM (%s) TO (%s)
`, name, m.config.Tables[0], bounds.From.Format(time.DateOnly), bounds.To.Format(time.DateOnly))
}

func (m *Manager) generatePartitionName(tableConfig TableConfig, bounds Bounds) string {
return fmt.Sprintf("%s_%s", tableConfig.Name, bounds.From.Format(time.DateOnly))
}

func run() {
config := Config{
Tables: []TableConfig{
{
Name: "samples",
TenantId: "project_id_asd",
PartitionType: TypeRange,
PartitionBy: []string{"project_id", "created_at"},
PartitionInterval: OneDay,
RetentionPeriod: OneMonth,
},
{
Name: "samples",
TenantId: "project_id_124",
PartitionType: TypeRange,
PartitionBy: []string{"project_id", "created_at"},
PartitionInterval: OneDay,
RetentionPeriod: OneMonth * 2,
},
},
SchemaName: "public",
}

manager := NewManager(nil, config)

// Initialize partition structure
if err := manager.Initialize(context.Background(), config); err != nil {
log.Fatal(err)
}

// Set up maintenance routine
go func() {
ticker := time.NewTicker(24 * time.Hour)
for range ticker.C {
if err := manager.Maintain(context.Background()); err != nil {
log.Printf("maintenance error: %v", err)
}
}
}()
}

func NewManager(db *sqlx.DB, config Config) *Manager {
return &Manager{
db: db,
config: config,
}
}
70 changes: 70 additions & 0 deletions pkg/partition/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package partition

import (
"context"
"time"
)

type PartitionerType string

// tableNamingPatten
const tableNamingPatten = "%s_%s"

const (
TypeRange PartitionerType = "range"
TypeHash PartitionerType = "hash"
TypeList PartitionerType = "list"
)

const (
OneDay = 24 * time.Hour
OneMonth = 30 * OneDay
)

type Partitioner interface {
// Initialize Create initial partition structure
Initialize(ctx context.Context, config Config) error

// CreateFuturePartitions Create new partitions ahead of time
CreateFuturePartitions(ctx context.Context, ahead uint) error

// DropOldPartitions Drop old partitions based on retention policy
DropOldPartitions(ctx context.Context) error

// Maintain Manage partition maintenance
Maintain(ctx context.Context) error
}

type Bounds struct {
From, To time.Time
}

type TableConfig struct {
// Name is the table
Name string

// TenantId
TenantId string

// Partition type and settings
PartitionType PartitionerType // "range", "list", or "hash"

// PartitionBy Columns to partition by, they are applied in order
PartitionBy []string

// PartitionInterval For range partitions (e.g., "1 month", "1 day")
PartitionInterval time.Duration

// PreCreateCount is the number of partitions to create ahead when the partition is first created
PreCreateCount uint

// Retention settings
RetentionPeriod time.Duration
}

type Config struct {
// SchemaName is the schema of the tables
SchemaName string

Tables []TableConfig
}
Loading