Skip to content

Commit

Permalink
fix: add a cleanup job for email events (#1924)
Browse files Browse the repository at this point in the history
* add a cleanup job for email events

* made worker configurable and 5 min period
  • Loading branch information
johannes94 committed Jul 3, 2024
1 parent c356db7 commit e5aa390
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 41 deletions.
20 changes: 19 additions & 1 deletion emailsender/cmd/app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"context"
"errors"
"flag"
"github.com/stackrox/acs-fleet-manager/emailsender/pkg/db"
"net/http"
"os"
"os/signal"
"time"

"github.com/stackrox/acs-fleet-manager/emailsender/pkg/db"
"github.com/stackrox/acs-fleet-manager/emailsender/pkg/workers"

"golang.org/x/sys/unix"

Expand Down Expand Up @@ -44,6 +47,7 @@ func main() {
}

ctx := context.Background()
shutdownCtx, cancelShutdownCtx := context.WithCancel(context.Background())

// initialize components
dbConnection := db.NewDatabaseConnection(dbCfg)
Expand All @@ -58,6 +62,18 @@ func main() {
os.Exit(1)
}

cleanupWorker := workers.CleanupEmailSent{
DbConn: dbConnection,
Period: time.Second * time.Duration(cfg.EmailCleanupPeriodSeconds) * 60,
ExpiredAfter: time.Hour * time.Duration(cfg.EmailCleanupExpiryDays) * 24,
}
go func() {
err := cleanupWorker.Run(shutdownCtx)
if err != nil && !errors.Is(err, context.Canceled) {
glog.Errorf("failed to cleanup expired email events: %v", err)
}
}()

emailSender := email.NewEmailSender(cfg.SenderAddress, sesClient, rateLimiter)
emailHandler := api.NewEmailHandler(emailSender)

Expand Down Expand Up @@ -96,6 +112,8 @@ func main() {

glog.Info("Application started. Will shut down gracefully on interrupt terminated OS signals")
sig := <-sigs

cancelShutdownCtx()
if err := server.Shutdown(ctx); err != nil {
glog.Errorf("API Shutdown error: %v", err)
}
Expand Down
30 changes: 16 additions & 14 deletions emailsender/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,22 @@ const (

// Config contains this application's runtime configuration.
type Config struct {
ClusterID string `env:"CLUSTER_ID"`
ServerAddress string `env:"SERVER_ADDRESS" envDefault:":8080"`
EnableHTTPS bool `env:"ENABLE_HTTPS" envDefault:"false"`
HTTPSCertFile string `env:"HTTPS_CERT_FILE" envDefault:""`
HTTPSKeyFile string `env:"HTTPS_KEY_FILE" envDefault:""`
MetricsAddress string `env:"METRICS_ADDRESS" envDefault:":9090"`
AuthConfigFile string `env:"AUTH_CONFIG_FILE" envDefault:"config/emailsender-authz.yaml"`
AuthConfigFromKubernetes bool `env:"AUTH_CONFIG_FROM_KUBERNETES" envDefault:"false"`
SenderAddress string `env:"SENDER_ADDRESS" envDefault:"noreply@mail.rhacs-dev.com"`
LimitEmailPerTenant int `env:"LIMIT_EMAIL_PER_TENANT" envDefault:"250"`
SesMaxBackoffDelay time.Duration `env:"SES_MAX_BACKOFF_DELAY" envDefault:"5s"`
SesMaxAttempts int `env:"SES_MAX_ATTEMPTS" envDefault:"3"`
AuthConfig AuthConfig
DatabaseConfig DbConfig
ClusterID string `env:"CLUSTER_ID"`
ServerAddress string `env:"SERVER_ADDRESS" envDefault:":8080"`
EnableHTTPS bool `env:"ENABLE_HTTPS" envDefault:"false"`
HTTPSCertFile string `env:"HTTPS_CERT_FILE" envDefault:""`
HTTPSKeyFile string `env:"HTTPS_KEY_FILE" envDefault:""`
MetricsAddress string `env:"METRICS_ADDRESS" envDefault:":9090"`
AuthConfigFile string `env:"AUTH_CONFIG_FILE" envDefault:"config/emailsender-authz.yaml"`
AuthConfigFromKubernetes bool `env:"AUTH_CONFIG_FROM_KUBERNETES" envDefault:"false"`
SenderAddress string `env:"SENDER_ADDRESS" envDefault:"noreply@mail.rhacs-dev.com"`
LimitEmailPerTenant int `env:"LIMIT_EMAIL_PER_TENANT" envDefault:"250"`
SesMaxBackoffDelay time.Duration `env:"SES_MAX_BACKOFF_DELAY" envDefault:"5s"`
SesMaxAttempts int `env:"SES_MAX_ATTEMPTS" envDefault:"3"`
EmailCleanupPeriodSeconds int `env:"EMAIL_CLEANUP_PERIOD_SECONDS" envDefault:"300"`
EmailCleanupExpiryDays int `env:"EMAIL_CLEANUP_EXPIRY_DAYS" envDefault:"2"`
AuthConfig AuthConfig
DatabaseConfig DbConfig
}

type DbConfig struct {
Expand Down
15 changes: 14 additions & 1 deletion emailsender/pkg/db/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ package db

import (
"fmt"
"gorm.io/gorm"
"time"

"gorm.io/gorm"

commonDB "github.com/stackrox/acs-fleet-manager/pkg/db"
)

// DatabaseClient defines methods for fetching or updating models in DB
type DatabaseClient interface {
InsertEmailSentByTenant(tenantID string) error
CountEmailSentByTenantSince(tenantID string, since time.Time) (int64, error)
CleanupEmailSentByTenant(before time.Time) (int64, error)
}

// DatabaseConnection contains dependency for communicating with DB
Expand Down Expand Up @@ -50,3 +52,14 @@ func (d *DatabaseConnection) CountEmailSentByTenantSince(tenantID string, since
}
return count, nil
}

// CleanupEmailSentByTenant removes all EmailSendByTenant rows that were created
// before the given input time returns the number of rows affected and DB errors
func (d *DatabaseConnection) CleanupEmailSentByTenant(before time.Time) (int64, error) {
res := d.DB.Where("created_at < ?", before).Delete(&EmailSentByTenant{})
if err := res.Error; err != nil {
return 0, fmt.Errorf("failed to cleanup expired emails, %w", err)
}

return res.RowsAffected, nil
}
28 changes: 28 additions & 0 deletions emailsender/pkg/db/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package db

import "time"

type MockDatabaseClient struct {
CalledInsertEmailSentByTenant bool
CalledCountEmailSentByTenantFrom bool
CalledCleanupEmailSentByTenant bool

InsertEmailSentByTenantFunc func(tenantID string) error
CountEmailSentByTenantFromFunc func(tenantID string, from time.Time) (int64, error)
CleanupEmailSentByTenantFunc func(before time.Time) (int64, error)
}

func (m *MockDatabaseClient) InsertEmailSentByTenant(tenantID string) error {
m.CalledInsertEmailSentByTenant = true
return m.InsertEmailSentByTenantFunc(tenantID)
}

func (m *MockDatabaseClient) CountEmailSentByTenantSince(tenantID string, from time.Time) (int64, error) {
m.CalledCountEmailSentByTenantFrom = true
return m.CountEmailSentByTenantFromFunc(tenantID, from)
}

func (m *MockDatabaseClient) CleanupEmailSentByTenant(before time.Time) (int64, error) {
m.CalledCleanupEmailSentByTenant = true
return m.CleanupEmailSentByTenantFunc(before)
}
34 changes: 9 additions & 25 deletions emailsender/pkg/email/ratelimiter_test.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,18 @@
package email

import (
"github.com/stretchr/testify/assert"
"testing"
"time"

"github.com/stackrox/acs-fleet-manager/emailsender/pkg/db"
"github.com/stretchr/testify/assert"
)

var limitPerTenant = 20
var testTenantID = "test-tenant-id"

type MockDatabaseClient struct {
calledInsertEmailSentByTenant bool
calledCountEmailSentByTenantFrom bool

InsertEmailSentByTenantFunc func(tenantID string) error
CountEmailSentByTenantFromFunc func(tenantID string, from time.Time) (int64, error)
}

func (m *MockDatabaseClient) InsertEmailSentByTenant(tenantID string) error {
m.calledInsertEmailSentByTenant = true
return m.InsertEmailSentByTenantFunc(tenantID)
}

func (m *MockDatabaseClient) CountEmailSentByTenantSince(tenantID string, from time.Time) (int64, error) {
m.calledCountEmailSentByTenantFrom = true
return m.CountEmailSentByTenantFromFunc(tenantID, from)
}

func TestAllowTrue_Success(t *testing.T) {
mockDatabaseClient := &MockDatabaseClient{
mockDatabaseClient := &db.MockDatabaseClient{
CountEmailSentByTenantFromFunc: func(tenantID string, from time.Time) (int64, error) {
return int64(limitPerTenant - 1), nil
},
Expand All @@ -42,11 +26,11 @@ func TestAllowTrue_Success(t *testing.T) {
allowed := service.IsAllowed(testTenantID)

assert.True(t, allowed)
assert.True(t, mockDatabaseClient.calledCountEmailSentByTenantFrom)
assert.True(t, mockDatabaseClient.CalledCountEmailSentByTenantFrom)
}

func TestAllowFalse_LimitReached(t *testing.T) {
mockDatabaseClient := &MockDatabaseClient{
mockDatabaseClient := &db.MockDatabaseClient{
CountEmailSentByTenantFromFunc: func(tenantID string, from time.Time) (int64, error) {
return int64(limitPerTenant + 1), nil
},
Expand All @@ -60,11 +44,11 @@ func TestAllowFalse_LimitReached(t *testing.T) {
allowed := service.IsAllowed(testTenantID)

assert.False(t, allowed)
assert.True(t, mockDatabaseClient.calledCountEmailSentByTenantFrom)
assert.True(t, mockDatabaseClient.CalledCountEmailSentByTenantFrom)
}

func TestPersistEmailSendEvent(t *testing.T) {
mockDatabaseClient := &MockDatabaseClient{
mockDatabaseClient := &db.MockDatabaseClient{
InsertEmailSentByTenantFunc: func(tenantID string) error {
return nil
},
Expand All @@ -78,5 +62,5 @@ func TestPersistEmailSendEvent(t *testing.T) {
err := service.PersistEmailSendEvent(testTenantID)

assert.NoError(t, err)
assert.True(t, mockDatabaseClient.calledInsertEmailSentByTenant)
assert.True(t, mockDatabaseClient.CalledInsertEmailSentByTenant)
}
40 changes: 40 additions & 0 deletions emailsender/pkg/workers/cleanup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package workers

import (
"context"
"fmt"
"time"

"github.com/golang/glog"
"github.com/stackrox/acs-fleet-manager/emailsender/pkg/db"
)

// CleanupEmailSent is a worker used to periodically cleanup EmailSentByTenant events
// stored in the database connection that are no longer needed to enforce rate limitting
type CleanupEmailSent struct {
DbConn db.DatabaseClient
Period time.Duration
ExpiredAfter time.Duration
}

// Run periodically executes a cleanup query against the given DB Connection
func (c *CleanupEmailSent) Run(ctx context.Context) error {
ticker := time.NewTicker(c.Period)

glog.Info("Starting CleanupEmailSent worker...")
for {
select {
case <-ctx.Done():
ticker.Stop()
return fmt.Errorf("stopped cleanup worker: %w", context.Canceled)
case <-ticker.C:
numDeleted, err := c.DbConn.CleanupEmailSentByTenant(time.Now().Add(-c.ExpiredAfter))
if err != nil {
glog.Errorf("failed to cleanup EmailSentByTenant: %v", err)
}

glog.Infof("deleted %d expired EmailSentByTenant events from DB", numDeleted)
}
}

}
48 changes: 48 additions & 0 deletions emailsender/pkg/workers/cleanup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package workers

import (
"context"
"testing"
"time"

"github.com/stackrox/acs-fleet-manager/emailsender/pkg/db"
"github.com/stretchr/testify/require"
)

var testTimeout = time.Second * 10

func TestCleanupEmailSent(t *testing.T) {
mockDB := &db.MockDatabaseClient{
CleanupEmailSentByTenantFunc: func(before time.Time) (int64, error) { return 5, nil },
}

cleanup := &CleanupEmailSent{
Period: time.Second * 1,
ExpiredAfter: time.Hour * 48,
DbConn: mockDB,
}

ctx, cancel := context.WithTimeout(context.TODO(), time.Second*3)
defer cancel()

timeoutTimer := time.NewTimer(testTimeout)
defer timeoutTimer.Stop()

var errChannel = make(chan error)

go func() {
err := cleanup.Run(ctx)
errChannel <- err
}()

select {
case err := <-errChannel:
// Expect DB cleanup to be called at least once since this has been running for 3 seconds
// until the context gets canceled
require.True(t, mockDB.CalledCleanupEmailSentByTenant, "expected db cleanup to be called, but was not")
require.ErrorIs(t, err, context.Canceled)
case <-timeoutTimer.C:
t.Fatal("cleanup did not stop on canceled context")
}

}

0 comments on commit e5aa390

Please sign in to comment.