Skip to content

Commit

Permalink
Cron job to sync datapoints from integrations (#151)
Browse files Browse the repository at this point in the history
* start working on syncing data points from itnegrations

* clean up

* fix charts

* fix uniqueness check

* query for providerid

* sync data and decode secret

* fix syncing

* use worker pattern to remove linear time

* implement checkpointing so we can safely rerun and know which failed

* fix tests

* fix tests
  • Loading branch information
adelowo authored Feb 9, 2025
1 parent 1eeffe0 commit fd0a63e
Show file tree
Hide file tree
Showing 39 changed files with 751 additions and 51 deletions.
9 changes: 9 additions & 0 deletions cmd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ayinke-llc/malak/config"
"github.com/ayinke-llc/malak/internal/datastore/postgres"
"github.com/ayinke-llc/malak/internal/integrations"
"github.com/ayinke-llc/malak/internal/integrations/brex"
"github.com/ayinke-llc/malak/internal/integrations/mercury"
"github.com/ayinke-llc/malak/internal/pkg/billing/stripe"
"github.com/ayinke-llc/malak/internal/pkg/cache/rediscache"
Expand Down Expand Up @@ -334,6 +335,14 @@ func buildIntegrationManager(integrationRepo malak.IntegrationRepository, cfg co

i.Add(provider, client)

case malak.IntegrationProviderBrex:
client, err := brex.New(cfg)
if err != nil {
return nil, err
}

i.Add(provider, client)

default:
logger.Warn("provider not yet implemented",
zap.String("provider", provider.String()))
Expand Down
1 change: 1 addition & 0 deletions cmd/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func addIntegrationCommand(c *cobra.Command, cfg *config.Config) {
}

cmd.AddCommand(createIntegration(c, cfg))
cmd.AddCommand(syncDataPointForIntegration(c, cfg))

c.AddCommand(cmd)
}
Expand Down
305 changes: 305 additions & 0 deletions cmd/integration_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
package main

import (
"context"
"database/sql"
"errors"
"strings"
"sync"
"time"

"github.com/ayinke-llc/hermes"
"github.com/ayinke-llc/malak"
"github.com/ayinke-llc/malak/config"
"github.com/ayinke-llc/malak/internal/datastore/postgres"
"github.com/spf13/cobra"
"github.com/uptrace/bun"
"go.uber.org/zap"
)

func syncDataPointForIntegration(_ *cobra.Command, cfg *config.Config) *cobra.Command {
cmd := &cobra.Command{
Use: "sync",
Short: `Sync integration data points`,
RunE: func(cmd *cobra.Command, args []string) error {
logger, err := getLogger(hermes.DeRef(cfg))
if err != nil {
return err
}

db, err := postgres.New(cfg, logger)
if err != nil {
logger.Error("could not connect to postgres database",
zap.Error(err))
return err
}

defer db.Close()

logger.Debug("syncing datapoints")

integrationRepo := postgres.NewIntegrationRepo(db)

integrationManager, err := buildIntegrationManager(integrationRepo, hermes.DeRef(cfg), logger)
if err != nil {
logger.Fatal("could not build integration manager", zap.Error(err))
}

secretsProvider, err := buildSecretsProvider(hermes.DeRef(cfg))
if err != nil {
logger.Fatal("could not build secrets provider", zap.Error(err))
}

workspaces := make([]*malak.Workspace, 0)

err = db.NewSelect().
Model(&workspaces).
Scan(context.Background())
if err != nil {
logger.Error("could not fetch workspaces", zap.Error(err))
return err
}

numWorkers := 10
jobs := make(chan syncJob, len(workspaces)*3)

var wg sync.WaitGroup

for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(cmd.Context(), w, jobs, &wg, logger, integrationRepo, db)
}

for _, workspace := range workspaces {
integrations, err := integrationRepo.List(cmd.Context(), workspace)
if err != nil {
logger.Error("could not fetch integrations for workspace",
zap.String("workspace_id", workspace.ID.String()),
zap.Error(err))
continue
}

logger.Info("processing integrations for workspace",
zap.String("workspace_id", workspace.ID.String()),
zap.Int("integration_count", len(integrations)))

for _, integration := range integrations {
if !integration.Integration.IsEnabled || !integration.IsEnabled || !integration.IsActive {
logger.Info("skipping integration",
zap.String("workspace_integration_id", integration.ID.String()),
zap.String("integration", integration.Integration.IntegrationName))
continue
}

shouldProcess, err := shouldProcessIntegration(cmd.Context(), db, workspace, hermes.Ref(integration), true)
if err != nil {
logger.Error("error checking integration status",
zap.String("workspace_id", workspace.ID.String()),
zap.String("integration_id", integration.ID.String()),
zap.Error(err))
continue
}

if !shouldProcess {
logger.Debug("skipping integration - already processed",
zap.String("workspace_id", workspace.ID.String()),
zap.String("integration_id", integration.ID.String()))
continue
}

client, err := integrationManager.Get(
malak.IntegrationProvider(
strings.ToLower(integration.Integration.IntegrationName)))
if err != nil {
logger.Error("could not get integration client",
zap.String("integration_name", integration.Integration.IntegrationName),
zap.String("workspace_id", workspace.ID.String()),
zap.String("integration_id", integration.ID.String()),
zap.Error(err))
continue
}

value, err := secretsProvider.Get(context.Background(), string(integration.Metadata.AccessToken))
if err != nil {
logger.Error("could not fetch value from secret vault",
zap.String("secret_provider", cfg.Secrets.Provider.String()),
zap.String("workspace_id", workspace.ID.String()),
zap.String("integration_id", integration.ID.String()),
zap.Error(err))
continue
}

jobs <- syncJob{
workspace: workspace,
integration: hermes.Ref(integration),
client: client,
accessToken: malak.AccessToken(value),
}
}
}

close(jobs)
wg.Wait()

return nil
},
}

return cmd
}

func shouldProcessIntegration(ctx context.Context,
db *bun.DB,
workspace *malak.Workspace,
integration *malak.WorkspaceIntegration,
resumeFailed bool) (bool, error) {

now := time.Now()
today := now.Format("2006-01-02")

var checkpoint malak.IntegrationSyncCheckpoint
err := db.NewSelect().
Model(&checkpoint).
Where("workspace_id = ? AND workspace_integration_id = ? AND DATE(created_at) = ?",
workspace.ID, integration.ID, today).
Order("created_at DESC").
Limit(1).
Scan(ctx)

// No checkpoint exists for today - should process
if errors.Is(err, sql.ErrNoRows) {
return true, nil
}

if err != nil {
return false, err
}

// Only check these conditions if we have a checkpoint for today
if resumeFailed {
return checkpoint.Status == "failed" || checkpoint.Status == "pending", nil
}

if checkpoint.Status == "failed" {
return true, nil
}

return false, nil
}

type syncJob struct {
workspace *malak.Workspace
integration *malak.WorkspaceIntegration
client malak.IntegrationProviderClient
accessToken malak.AccessToken
}

func worker(
ctx context.Context,
id int,
jobs <-chan syncJob,
wg *sync.WaitGroup,
logger *zap.Logger,
integrationRepo malak.IntegrationRepository,
db *bun.DB,
) {
defer wg.Done()

for job := range jobs {
logger.Info("worker processing integration",
zap.Int("worker_id", id),
zap.String("workspace_id", job.workspace.ID.String()),
zap.String("integration_id", job.integration.ID.String()))

generator := malak.NewReferenceGenerator()

// create new checkpoint for today
checkpoint := &malak.IntegrationSyncCheckpoint{
WorkspaceID: job.workspace.ID,
WorkspaceIntegrationID: job.integration.ID,
Status: "pending",
LastSyncAttempt: time.Now(),
Reference: generator.Generate(malak.EntityTypeIntegrationSyncCheckpoint),
}

_, err := db.NewInsert().
Model(checkpoint).
Exec(ctx)
if err != nil {
logger.Error("failed to create checkpoint",
zap.Error(err))
continue
}

dataPoints, err := job.client.Data(ctx,
job.accessToken,
&malak.IntegrationFetchDataOptions{
IntegrationID: job.integration.ID,
WorkspaceID: job.workspace.ID,
ReferenceGenerator: generator,
LastFetchedAt: job.integration.Metadata.LastFetchedAt,
})
if err != nil {
logger.Error("could not fetch data points from integration",
zap.Int("worker_id", id),
zap.String("workspace_id", job.workspace.ID.String()),
zap.String("integration_id", job.integration.ID.String()),
zap.Error(err))

// update current checkpoint with error
_, updateErr := db.NewUpdate().
Model(checkpoint).
Set("status = ?", "failed").
Set("error_message = ?", err.Error()).
Set("updated_at = NOW()").
Where("id = ?", checkpoint.ID).
Exec(ctx)
if updateErr != nil {
logger.Error("failed to update checkpoint", zap.Error(updateErr))
}
continue
}

logger.Info("fetched data points from integration",
zap.Int("worker_id", id),
zap.String("workspace_id", job.workspace.ID.String()),
zap.String("integration_id", job.integration.ID.String()),
zap.Int("data_points_count", len(dataPoints)))

job.integration.Metadata.LastFetchedAt = time.Now()

if err := integrationRepo.AddDataPoint(ctx, job.integration, dataPoints); err != nil {
logger.Error("could not save data points from integration",
zap.Int("worker_id", id),
zap.String("workspace_id", job.workspace.ID.String()),
zap.String("integration_id", job.integration.ID.String()),
zap.Error(err))

// update current checkpoint with error
_, updateErr := db.NewUpdate().
Model(checkpoint).
Set("status = ?", "failed").
Set("error_message = ?", err.Error()).
Set("updated_at = NOW()").
Where("id = ?", checkpoint.ID).
Exec(ctx)
if updateErr != nil {
logger.Error("failed to update checkpoint", zap.Error(updateErr))
}
continue
}

// update current checkpoint as successful
_, err = db.NewUpdate().
Model(checkpoint).
Set("status = ?", "success").
Set("last_successful_sync = NOW()").
Set("error_message = NULL").
Set("updated_at = NOW()").
Where("id = ?", checkpoint.ID).
Exec(ctx)
if err != nil {
logger.Error("failed to update checkpoint", zap.Error(err))
}
}
}
13 changes: 11 additions & 2 deletions integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ type Integration struct {
}

type WorkspaceIntegrationMetadata struct {
AccessToken AccessToken `json:"access_token,omitempty"`
AccessToken AccessToken `json:"access_token,omitempty"`
LastFetchedAt time.Time `json:"last_fetched_at,omitempty"`
}

type WorkspaceIntegration struct {
Expand Down Expand Up @@ -80,6 +81,7 @@ type IntegrationDataPoint struct {
ID uuid.UUID `bun:"type:uuid,default:uuid_generate_v4(),pk" json:"id,omitempty"`
WorkspaceIntegrationID uuid.UUID `json:"workspace_integration_id,omitempty"`
WorkspaceID uuid.UUID `json:"workspace_id,omitempty"`
IntegrationChartID uuid.UUID `json:"integration_chart_id,omitempty"`
Reference Reference `json:"reference,omitempty"`
PointName string `json:"point_name,omitempty"`
PointValue int64 `json:"point_value,omitempty"`
Expand All @@ -94,7 +96,7 @@ type IntegrationDataPoint struct {
}

type IntegrationChartMetadata struct {
ProviderID string
ProviderID string `json:"provider_id,omitempty"`
}

type IntegrationChart struct {
Expand All @@ -117,7 +119,11 @@ type IntegrationChart struct {
type IntegrationDataValues struct {
// here so it is easy to find the chart this data point belongs to
// without too much voodoo
// InternalName + ProviderID search in db
// We cannot use only InternalName becasue some integrations
// like mercury have the same InternalName twice ( each account has a savings and checkings which we track)
InternalName IntegrationChartInternalNameType
ProviderID string
Data IntegrationDataPoint
}

Expand Down Expand Up @@ -163,4 +169,7 @@ type IntegrationRepository interface {
Get(context.Context, FindWorkspaceIntegrationOptions) (*WorkspaceIntegration, error)
ToggleEnabled(context.Context, *WorkspaceIntegration) error
Update(context.Context, *WorkspaceIntegration) error

CreateCharts(context.Context, *WorkspaceIntegration, []IntegrationChartValues) error
AddDataPoint(context.Context, *WorkspaceIntegration, []IntegrationDataValues) error
}
Loading

0 comments on commit fd0a63e

Please sign in to comment.