Skip to content

Commit

Permalink
move cleanUpExpiredUpdates to storage
Browse files Browse the repository at this point in the history
  • Loading branch information
jkralik committed Jul 15, 2024
1 parent 8b2ac79 commit b935749
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 113 deletions.
2 changes: 1 addition & 1 deletion charts/plgd-hub/templates/snippet-service/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ data:
{{- include "plgd-hub.certificateConfig" (list $ $natsTls $cert ) | indent 10 }}
useSystemCAPool: {{ .clients.eventBus.nats.tls.useSystemCAPool }}
storage:
cleanUpExpiredUpdates: {{ .clients.storage.cleanUpExpiredUpdates | quote }}
use: {{ include "plgd-hub.useDatabase" (list $ . .clients.storage.use) | quote }}
mongoDB:
uri: {{ include "plgd-hub.mongoDBUri" (list $ .clients.storage.mongoDB.uri ) | quote }}
Expand All @@ -72,7 +73,6 @@ data:
{{- include "plgd-hub.certificateConfig" (list $ $mongoDbTls $cert ) | indent 10 }}
useSystemCAPool: {{ .clients.storage.mongoDB.tls.useSystemCAPool }}
resourceAggregate:
cleanUpExpiredUpdates: {{ .clients.resourceAggregate.cleanUpExpiredUpdates | quote }}
grpc:
{{- $resourceAggregate := .clients.resourceAggregate.grpc.address }}
address:{{ printf " " }}{{- include "plgd-hub.resourceAggregateAddress" (list $ $resourceAggregate ) | quote }}
Expand Down
2 changes: 1 addition & 1 deletion charts/plgd-hub/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2516,6 +2516,7 @@ snippetservice:
certFile:
useSystemCAPool: false
storage:
cleanUpExpiredUpdates: "0 * * * *"
use: mongoDB
mongoDB:
uri:
Expand All @@ -2528,7 +2529,6 @@ snippetservice:
certFile:
useSystemCAPool: false
resourceAggregate:
cleanUpExpiredUpdates: "0 * * * *"
grpc:
address: ""
sendMsgSize: 4194304
Expand Down
2 changes: 1 addition & 1 deletion snippet-service/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ apis:
idleTimeout: 30s
clients:
storage:
cleanUpExpiredUpdates: "0 * * * *"
use: mongoDB
mongoDB:
uri:
Expand Down Expand Up @@ -91,7 +92,6 @@ clients:
certFile: "/secrets/public/cert.crt"
useSystemCAPool: false
resourceAggregate:
cleanUpExpiredUpdates: "0 * * * *"
grpc:
address: ""
sendMsgSize: 4194304
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package updater
package service

import (
"fmt"
Expand All @@ -7,14 +7,12 @@ import (
"github.com/go-co-op/gocron/v2"
)

func NewExpiredUpdatesChecker(cleanUpExpiredUpdates string, withSeconds bool, updater *ResourceUpdater) (gocron.Scheduler, error) {
func NewExpiredUpdatesChecker(cleanUpExpiredUpdates string, withSeconds bool, onCheckTimeout func()) (gocron.Scheduler, error) {
s, err := gocron.NewScheduler(gocron.WithLocation(time.Local)) //nolint:gosmopolitan
if err != nil {
return nil, fmt.Errorf("cannot create cron job: %w", err)
}
_, err = s.NewJob(gocron.CronJob(cleanUpExpiredUpdates, withSeconds), gocron.NewTask(func() {
updater.TimeoutPendingResourceUpdates()
}))
_, err = s.NewJob(gocron.CronJob(cleanUpExpiredUpdates, withSeconds), gocron.NewTask(onCheckTimeout))
if err != nil {
return nil, fmt.Errorf("cannot create cron job: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions snippet-service/service/http/invokeConfiguration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ func TestRequestHandlerInvokeConfiguration(t *testing.T) {
}()

snippetCfg := test.MakeConfig(t)
snippetCfg.Clients.ResourceAggregate.CleanUpExpiredUpdates = "*/1 * * * * *"
snippetCfg.Clients.ResourceAggregate.ExtendCronParserBySeconds = true
snippetCfg.Clients.Storage.CleanUpExpiredUpdates = "*/1 * * * * *"
snippetCfg.Clients.Storage.ExtendCronParserBySeconds = true
_, shutdownHttp := test.New(t, snippetCfg)
defer shutdownHttp()
logger := log.NewLogger(snippetCfg.Log)
Expand Down
46 changes: 34 additions & 12 deletions snippet-service/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package service
import (
"context"
"fmt"
"sync/atomic"

"github.com/plgd-dev/hub/v2/pkg/config/database"
"github.com/plgd-dev/hub/v2/pkg/fn"
Expand Down Expand Up @@ -32,15 +33,27 @@ type Service struct {
resourceSubscriber *ResourceSubscriber
}

func createStore(ctx context.Context, config storeConfig.Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (store.Store, error) {
if config.Use == database.MongoDB {
s, err := mongodb.New(ctx, config.MongoDB, fileWatcher, logger, tracerProvider)
func createStore(ctx context.Context, config storeConfig.Config, onCheckForExpired func(), fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (store.Store, error) {
if config.Use != database.MongoDB {
return nil, fmt.Errorf("invalid store use('%v')", config.Use)
}
s, err := mongodb.New(ctx, config.MongoDB, fileWatcher, logger, tracerProvider)
if err != nil {
return nil, fmt.Errorf("mongodb: %w", err)
}
if config.CleanUpExpiredUpdates != "" {
scheduler, err := NewExpiredUpdatesChecker(config.CleanUpExpiredUpdates, config.ExtendCronParserBySeconds, onCheckForExpired)
if err != nil {
return nil, fmt.Errorf("mongodb: %w", err)
return nil, fmt.Errorf("cannot create scheduler: %w", err)
}
return s, nil
s.AddCloseFunc(func() {
err2 := scheduler.Shutdown()
if err2 != nil {
log.Errorf("failed to shutdown scheduler: %w", err2)
}
})
}
return nil, fmt.Errorf("invalid store use('%v')", config.Use)
return s, nil
}

func newHttpService(ctx context.Context, config HTTPConfig, validatorConfig validator.Config, tlsConfig certManagerServer.Config, ss *grpcService.SnippetServiceServer, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*httpService.Service, func(), error) {
Expand Down Expand Up @@ -84,8 +97,15 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg
var closerFn fn.FuncList
closerFn.AddFunc(otelClient.Close)
tracerProvider := otelClient.GetTracerProvider()
var resourceUpdater atomic.Pointer[updater.ResourceUpdater]

db, err := createStore(ctx, config.Clients.Storage, fileWatcher, logger, tracerProvider)
db, err := createStore(ctx, config.Clients.Storage, func() {
ru := resourceUpdater.Load()
if ru == nil {
return
}
ru.TimeoutPendingResourceUpdates()
}, fileWatcher, logger, tracerProvider)
if err != nil {
closerFn.Execute()
return nil, fmt.Errorf("cannot create store: %w", err)
Expand All @@ -96,19 +116,21 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg
}
})

resourceUpdater, err := updater.NewResourceUpdater(ctx, config.Clients.ResourceAggregate, db, fileWatcher, logger, tracerProvider)
ru, err := updater.NewResourceUpdater(ctx, config.Clients.ResourceAggregate, db, fileWatcher, logger, tracerProvider)
if err != nil {
closerFn.Execute()
return nil, fmt.Errorf("cannot create resource change handler: %w", err)
}
resourceUpdater.Store(ru)

closerFn.AddFunc(func() {
errC := resourceUpdater.Close()
errC := ru.Close()
if errC != nil {
log.Errorf("failed to close resource change handler: %w", errC)
}
})

resourceSubscriber, err := NewResourceSubscriber(ctx, config.Clients.EventBus.NATS, config.Clients.EventBus.SubscriptionID, fileWatcher, logger, resourceUpdater)
resourceSubscriber, err := NewResourceSubscriber(ctx, config.Clients.EventBus.NATS, config.Clients.EventBus.SubscriptionID, fileWatcher, logger, resourceUpdater.Load())
if err != nil {
closerFn.Execute()
return nil, fmt.Errorf("cannot create resource subscriber: %w", err)
Expand All @@ -120,7 +142,7 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg
}
})

snippetService := grpcService.NewSnippetServiceServer(db, resourceUpdater, config.APIs.GRPC.Authorization.OwnerClaim, config.HubID, logger)
snippetService := grpcService.NewSnippetServiceServer(db, resourceUpdater.Load(), config.APIs.GRPC.Authorization.OwnerClaim, config.HubID, logger)

grpcService, grpcServiceClose, err := newGrpcService(ctx, config.APIs.GRPC, snippetService, fileWatcher, logger, tracerProvider)
if err != nil {
Expand All @@ -144,7 +166,7 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg
Service: s,

store: db,
resourceUpdater: resourceUpdater,
resourceUpdater: resourceUpdater.Load(),
resourceSubscriber: resourceSubscriber,
}, nil
}
Expand Down
33 changes: 24 additions & 9 deletions snippet-service/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ func TestServiceNew(t *testing.T) {
cfg: service.Config{
Clients: service.ClientsConfig{
Storage: storeConfig.Config{
Use: "invalid",
Config: database.Config[*storeMongo.Config, *storeCqlDB.Config]{
Use: "invalid",
},
},
},
},
Expand All @@ -88,10 +90,12 @@ func TestServiceNew(t *testing.T) {
cfg: service.Config{
Clients: service.ClientsConfig{
Storage: storeConfig.Config{
Use: database.MongoDB,
MongoDB: &storeMongo.Config{
Mongo: mongodb.Config{
URI: "invalid",
Config: database.Config[*storeMongo.Config, *storeCqlDB.Config]{
Use: database.MongoDB,
MongoDB: &storeMongo.Config{
Mongo: mongodb.Config{
URI: "invalid",
},
},
},
},
Expand All @@ -104,13 +108,24 @@ func TestServiceNew(t *testing.T) {
cfg: service.Config{
Clients: service.ClientsConfig{
Storage: storeConfig.Config{
Use: database.CqlDB,
CqlDB: &storeCqlDB.Config{},
Config: database.Config[*storeMongo.Config, *storeCqlDB.Config]{
Use: database.CqlDB,
CqlDB: &storeCqlDB.Config{},
},
},
},
},
wantErr: true,
},
{
name: "invalid expired updates checker config",
cfg: func() service.Config {
cfg := test.MakeConfig(t)
cfg.Clients.Storage.CleanUpExpiredUpdates = "invalid"
return cfg
}(),
wantErr: true,
},
{
name: "invalid resource subscriber config",
cfg: func() service.Config {
Expand Down Expand Up @@ -186,8 +201,8 @@ func TestService(t *testing.T) {

snippetCfg := test.MakeConfig(t)
const interval = time.Second
snippetCfg.Clients.ResourceAggregate.CleanUpExpiredUpdates = "*/1 * * * * *"
snippetCfg.Clients.ResourceAggregate.ExtendCronParserBySeconds = true
snippetCfg.Clients.Storage.CleanUpExpiredUpdates = "*/1 * * * * *"
snippetCfg.Clients.Storage.ExtendCronParserBySeconds = true
_, shutdownSnippetService := test.New(t, snippetCfg)
defer shutdownSnippetService()

Expand Down
37 changes: 36 additions & 1 deletion snippet-service/store/config/config.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,44 @@
package config

import (
"fmt"
"time"

"github.com/go-co-op/gocron/v2"
"github.com/plgd-dev/hub/v2/pkg/config/database"
"github.com/plgd-dev/hub/v2/pkg/log"
"github.com/plgd-dev/hub/v2/snippet-service/store/cqldb"
"github.com/plgd-dev/hub/v2/snippet-service/store/mongodb"
)

type Config = database.Config[*mongodb.Config, *cqldb.Config]
type Config struct {
CleanUpExpiredUpdates string `yaml:"cleanUpExpiredUpdates" json:"cleanUpExpiredUpdates"`
ExtendCronParserBySeconds bool `yaml:"-" json:"-"`
database.Config[*mongodb.Config, *cqldb.Config] `yaml:",inline" json:",inline"`
}

func (c *Config) Validate() error {
if err := c.Config.Validate(); err != nil {
return err
}
if c.CleanUpExpiredUpdates == "" {
return nil
}
s, err := gocron.NewScheduler(gocron.WithLocation(time.Local)) //nolint:gosmopolitan
if err != nil {
return fmt.Errorf("cannot create cron job: %w", err)
}
defer func() {
if errS := s.Shutdown(); errS != nil {
log.Errorf("failed to shutdown cron job: %w", errS)
}
}()
_, err = s.NewJob(gocron.CronJob(c.CleanUpExpiredUpdates, c.ExtendCronParserBySeconds),
gocron.NewTask(func() {
// do nothing
}))
if err != nil {
return fmt.Errorf("cleanUpExpiredUpdates('%v') - %w", c.CleanUpExpiredUpdates, err)
}
return nil
}
32 changes: 17 additions & 15 deletions snippet-service/test/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ func MakeAPIsConfig() service.APIsConfig {

func MakeResourceAggregateConfig() updater.ResourceUpdaterConfig {
return updater.ResourceUpdaterConfig{
Connection: config.MakeGrpcClientConfig(config.RESOURCE_AGGREGATE_HOST),
CleanUpExpiredUpdates: "0 * * * *",
ExtendCronParserBySeconds: false,
Connection: config.MakeGrpcClientConfig(config.RESOURCE_AGGREGATE_HOST),
}
}

Expand All @@ -65,19 +63,23 @@ func MakeStoreConfig() storeConfig.Config {
return storeConfig.Config{
// TODO: add cqldb support
// Use: config.ACTIVE_DATABASE(),
Use: database.MongoDB,
MongoDB: &storeMongo.Config{
Mongo: mongodb.Config{
MaxPoolSize: 16,
MaxConnIdleTime: time.Minute * 4,
URI: config.MONGODB_URI,
Database: "snippetService",
TLS: config.MakeTLSClientConfig(),
CleanUpExpiredUpdates: "0 * * * *",
ExtendCronParserBySeconds: false,
Config: database.Config[*storeMongo.Config, *storeCqlDB.Config]{
Use: database.MongoDB,
MongoDB: &storeMongo.Config{
Mongo: mongodb.Config{
MaxPoolSize: 16,
MaxConnIdleTime: time.Minute * 4,
URI: config.MONGODB_URI,
Database: "snippetService",
TLS: config.MakeTLSClientConfig(),
},
},
CqlDB: &storeCqlDB.Config{
Embedded: config.MakeCqlDBConfig(),
Table: "snippets",
},
},
CqlDB: &storeCqlDB.Config{
Embedded: config.MakeCqlDBConfig(),
Table: "snippets",
},
}
}
Expand Down
26 changes: 1 addition & 25 deletions snippet-service/updater/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,17 @@ package updater

import (
"fmt"
"time"

"github.com/go-co-op/gocron/v2"
"github.com/plgd-dev/hub/v2/pkg/log"
grpcClient "github.com/plgd-dev/hub/v2/pkg/net/grpc/client"
)

type ResourceUpdaterConfig struct {
Connection grpcClient.Config `yaml:"grpc" json:"grpc"`
CleanUpExpiredUpdates string `yaml:"cleanUpExpiredUpdates" json:"cleanUpExpiredUpdates"`
ExtendCronParserBySeconds bool `yaml:"-" json:"-"`
Connection grpcClient.Config `yaml:"grpc" json:"grpc"`
}

func (c *ResourceUpdaterConfig) Validate() error {
if err := c.Connection.Validate(); err != nil {
return fmt.Errorf("grpc.%w", err)
}
if c.CleanUpExpiredUpdates == "" {
return nil
}
s, err := gocron.NewScheduler(gocron.WithLocation(time.Local)) //nolint:gosmopolitan
if err != nil {
return fmt.Errorf("cannot create cron job: %w", err)
}
defer func() {
if errS := s.Shutdown(); errS != nil {
log.Errorf("failed to shutdown cron job: %w", errS)
}
}()
_, err = s.NewJob(gocron.CronJob(c.CleanUpExpiredUpdates, c.ExtendCronParserBySeconds),
gocron.NewTask(func() {
// do nothing
}))
if err != nil {
return fmt.Errorf("cleanUpExpiredUpdates('%v') - %w", c.CleanUpExpiredUpdates, err)
}
return nil
}
Loading

0 comments on commit b935749

Please sign in to comment.