-
Notifications
You must be signed in to change notification settings - Fork 142
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
224 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
package partition | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"github.com/frain-dev/convoy/pkg/log" | ||
"github.com/jmoiron/sqlx" | ||
"time" | ||
) | ||
|
||
// Manager Core partition manager | ||
type Manager struct { | ||
db *sqlx.DB | ||
logger *log.Logger | ||
config Config | ||
} | ||
|
||
func (m *Manager) Initialize(ctx context.Context, config Config) error { | ||
// create the library's management table | ||
|
||
// upsert: add the new tables that are to be managed by the library | ||
|
||
return nil | ||
} | ||
|
||
func (m *Manager) CreateFuturePartitions(ctx context.Context, tableConfig TableConfig, ahead uint) error { | ||
// fetch active managed tables | ||
|
||
// for each parent table: create a future partition table | ||
|
||
// create the indexes and attach it to the parent table | ||
|
||
return nil | ||
} | ||
|
||
func (m *Manager) DropOldPartitions(ctx context.Context) error { | ||
// fetch active managed tables | ||
|
||
// for each parent table: fetch the tables to be dropped | ||
|
||
// run any hooks (gzip and export to object storage) | ||
|
||
// drop the table | ||
|
||
return nil | ||
} | ||
|
||
// createPartition creates a partition for a table | ||
func (m *Manager) createPartition(ctx context.Context, tableConfig TableConfig, bounds Bounds) error { | ||
// Generate partition name based on bounds | ||
partitionName := m.generatePartitionName(tableConfig, bounds) | ||
|
||
// Create SQL for partition | ||
sql, err := m.generatePartitionSQL(partitionName, tableConfig, bounds) | ||
|
||
// Execute partition creation | ||
_, err = m.db.ExecContext(ctx, sql) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Maintain defines a regularly run maintenance routine | ||
func (m *Manager) Maintain(ctx context.Context) error { | ||
// loop all tables and run maintenance | ||
|
||
for i := 0; i < len(m.config.Tables); i++ { | ||
table := m.config.Tables[i] | ||
|
||
// Check for necessary future partitions | ||
if err := m.CreateFuturePartitions(ctx, table, 1); err != nil { | ||
return fmt.Errorf("failed to create future partitions: %w", err) | ||
} | ||
|
||
// Drop old partitions if needed | ||
if err := m.DropOldPartitions(ctx); err != nil { | ||
return fmt.Errorf("failed to drop old partitions: %w", err) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// generatePartitionSQL generates the name of the partition table | ||
func (m *Manager) generatePartitionSQL(name string, tableConfig TableConfig, bounds Bounds) (string, error) { | ||
switch tableConfig.PartitionType { | ||
case "range": | ||
return m.generateRangePartitionSQL(name, bounds), nil | ||
case "list", "hash": | ||
return "", fmt.Errorf("list and hash partitions are not implemented yet %q", tableConfig.PartitionType) | ||
default: | ||
return "", fmt.Errorf("unsupported partition type %q", tableConfig.PartitionType) | ||
} | ||
} | ||
|
||
func (m *Manager) generateRangePartitionSQL(name string, bounds Bounds) string { | ||
return fmt.Sprintf(` | ||
Check failure on line 99 in pkg/partition/manager.go GitHub Actions / test (1.22.x, ubuntu-latest, 15, 6.2.6)
|
||
CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES FROM (%s) TO (%s) | ||
`, name, m.config.Tables[0], bounds.From.Format(time.DateOnly), bounds.To.Format(time.DateOnly)) | ||
} | ||
|
||
func (m *Manager) generatePartitionName(tableConfig TableConfig, bounds Bounds) string { | ||
return fmt.Sprintf("%s_%s", tableConfig.Name, bounds.From.Format(time.DateOnly)) | ||
} | ||
|
||
func run() { | ||
config := Config{ | ||
Tables: []TableConfig{ | ||
{ | ||
Name: "samples", | ||
TenantId: "project_id_asd", | ||
PartitionType: TypeRange, | ||
PartitionBy: []string{"project_id", "created_at"}, | ||
PartitionInterval: OneDay, | ||
RetentionPeriod: OneMonth, | ||
}, | ||
{ | ||
Name: "samples", | ||
TenantId: "project_id_124", | ||
PartitionType: TypeRange, | ||
PartitionBy: []string{"project_id", "created_at"}, | ||
PartitionInterval: OneDay, | ||
RetentionPeriod: OneMonth * 2, | ||
}, | ||
}, | ||
SchemaName: "public", | ||
} | ||
|
||
manager := NewManager(nil, config) | ||
|
||
// Initialize partition structure | ||
if err := manager.Initialize(context.Background(), config); err != nil { | ||
log.Fatal(err) | ||
} | ||
|
||
// Set up maintenance routine | ||
go func() { | ||
ticker := time.NewTicker(24 * time.Hour) | ||
for range ticker.C { | ||
if err := manager.Maintain(context.Background()); err != nil { | ||
log.Printf("maintenance error: %v", err) | ||
} | ||
} | ||
}() | ||
} | ||
|
||
func NewManager(db *sqlx.DB, config Config) *Manager { | ||
return &Manager{ | ||
db: db, | ||
config: config, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
package partition | ||
|
||
import ( | ||
"context" | ||
"time" | ||
) | ||
|
||
type PartitionerType string | ||
|
||
// tableNamingPatten | ||
const tableNamingPatten = "%s_%s" | ||
|
||
const ( | ||
TypeRange PartitionerType = "range" | ||
TypeHash PartitionerType = "hash" | ||
TypeList PartitionerType = "list" | ||
) | ||
|
||
const ( | ||
OneDay = 24 * time.Hour | ||
OneMonth = 30 * OneDay | ||
) | ||
|
||
type Partitioner interface { | ||
// Initialize Create initial partition structure | ||
Initialize(ctx context.Context, config Config) error | ||
|
||
// CreateFuturePartitions Create new partitions ahead of time | ||
CreateFuturePartitions(ctx context.Context, ahead uint) error | ||
|
||
// DropOldPartitions Drop old partitions based on retention policy | ||
DropOldPartitions(ctx context.Context) error | ||
|
||
// Maintain Manage partition maintenance | ||
Maintain(ctx context.Context) error | ||
} | ||
|
||
type Bounds struct { | ||
From, To time.Time | ||
} | ||
|
||
type TableConfig struct { | ||
// Name is the table | ||
Name string | ||
|
||
// TenantId | ||
TenantId string | ||
|
||
// Partition type and settings | ||
PartitionType PartitionerType // "range", "list", or "hash" | ||
|
||
// PartitionBy Columns to partition by, they are applied in order | ||
PartitionBy []string | ||
|
||
// PartitionInterval For range partitions (e.g., "1 month", "1 day") | ||
PartitionInterval time.Duration | ||
|
||
// PreCreateCount is the number of partitions to create ahead when the partition is first created | ||
PreCreateCount uint | ||
|
||
// Retention settings | ||
RetentionPeriod time.Duration | ||
} | ||
|
||
type Config struct { | ||
// SchemaName is the schema of the tables | ||
SchemaName string | ||
|
||
Tables []TableConfig | ||
} |