Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snippet-service: rename resourceUpdater config to resourceAggregate #1348

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions charts/plgd-hub/templates/snippet-service/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ data:
{{- include "plgd-hub.certificateConfig" (list $ $natsTls $cert ) | indent 10 }}
useSystemCAPool: {{ .clients.eventBus.nats.tls.useSystemCAPool }}
storage:
cleanUpExpiredUpdates: {{ .clients.storage.cleanUpExpiredUpdates | quote }}
use: {{ include "plgd-hub.useDatabase" (list $ . .clients.storage.use) | quote }}
mongoDB:
uri: {{ include "plgd-hub.mongoDBUri" (list $ .clients.storage.mongoDB.uri ) | quote }}
Expand All @@ -71,21 +72,20 @@ data:
{{- $mongoDbTls := .clients.storage.mongoDB.tls }}
{{- include "plgd-hub.certificateConfig" (list $ $mongoDbTls $cert ) | indent 10 }}
useSystemCAPool: {{ .clients.storage.mongoDB.tls.useSystemCAPool }}
resourceUpdater:
cleanUpExpiredUpdates: {{ .clients.resourceUpdater.cleanUpExpiredUpdates | quote }}
resourceAggregate:
grpc:
{{- $resourceUpdater := .clients.resourceUpdater.grpc.address }}
address:{{ printf " " }}{{- include "plgd-hub.resourceAggregateAddress" (list $ $resourceUpdater ) | quote }}
sendMsgSize: {{ int64 .clients.resourceUpdater.grpc.sendMsgSize | default 4194304 }}
recvMsgSize: {{ int64 .clients.resourceUpdater.grpc.recvMsgSize | default 4194304 }}
{{- $resourceAggregate := .clients.resourceAggregate.grpc.address }}
address:{{ printf " " }}{{- include "plgd-hub.resourceAggregateAddress" (list $ $resourceAggregate ) | quote }}
sendMsgSize: {{ int64 .clients.resourceAggregate.grpc.sendMsgSize | default 4194304 }}
recvMsgSize: {{ int64 .clients.resourceAggregate.grpc.recvMsgSize | default 4194304 }}
keepAlive:
time: {{ .clients.resourceUpdater.grpc.keepAlive.time }}
timeout: {{ .clients.resourceUpdater.grpc.keepAlive.timeout }}
permitWithoutStream: {{ .clients.resourceUpdater.grpc.keepAlive.permitWithoutStream }}
time: {{ .clients.resourceAggregate.grpc.keepAlive.time }}
timeout: {{ .clients.resourceAggregate.grpc.keepAlive.timeout }}
permitWithoutStream: {{ .clients.resourceAggregate.grpc.keepAlive.permitWithoutStream }}
tls:
{{- $raClientTls := .clients.resourceUpdater.grpc.tls }}
{{- $raClientTls := .clients.resourceAggregate.grpc.tls }}
{{- include "plgd-hub.certificateConfig" (list $ $raClientTls $cert) | indent 10 }}
useSystemCAPool: {{ .clients.resourceUpdater.grpc.tls.useSystemCAPool }}
useSystemCAPool: {{ .clients.resourceAggregate.grpc.tls.useSystemCAPool }}
{{- include "plgd-hub.openTelemetryExporterConfig" (list $ $cert ) | nindent 6 }}
{{- end }}
{{- end }}
4 changes: 2 additions & 2 deletions charts/plgd-hub/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2516,6 +2516,7 @@ snippetservice:
certFile:
useSystemCAPool: false
storage:
cleanUpExpiredUpdates: "0 * * * *"
use: mongoDB
mongoDB:
uri:
Expand All @@ -2527,8 +2528,7 @@ snippetservice:
keyFile:
certFile:
useSystemCAPool: false
resourceUpdater:
cleanUpExpiredUpdates: "0 * * * *"
resourceAggregate:
grpc:
address: ""
sendMsgSize: 4194304
Expand Down
4 changes: 2 additions & 2 deletions snippet-service/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ apis:
idleTimeout: 30s
clients:
storage:
cleanUpExpiredUpdates: "0 * * * *"
use: mongoDB
mongoDB:
uri:
Expand Down Expand Up @@ -90,8 +91,7 @@ clients:
keyFile: "/secrets/private/cert.key"
certFile: "/secrets/public/cert.crt"
useSystemCAPool: false
resourceUpdater:
cleanUpExpiredUpdates: "0 * * * *"
resourceAggregate:
grpc:
address: ""
sendMsgSize: 4194304
Expand Down
25 changes: 18 additions & 7 deletions snippet-service/service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"github.com/google/uuid"
"github.com/plgd-dev/hub/v2/pkg/config"
"github.com/plgd-dev/hub/v2/pkg/log"
grpcClient "github.com/plgd-dev/hub/v2/pkg/net/grpc/client"
httpServer "github.com/plgd-dev/hub/v2/pkg/net/http/server"
otelClient "github.com/plgd-dev/hub/v2/pkg/opentelemetry/collector/client"
natsClient "github.com/plgd-dev/hub/v2/resource-aggregate/cqrs/eventbus/nats/client"
grpcService "github.com/plgd-dev/hub/v2/snippet-service/service/grpc"
storeConfig "github.com/plgd-dev/hub/v2/snippet-service/store/config"
"github.com/plgd-dev/hub/v2/snippet-service/updater"
)

type HTTPConfig struct {
Expand Down Expand Up @@ -58,11 +58,22 @@ func (c *EventBusConfig) Validate() error {
return nil
}

type ResourceAggregateConfig struct {
Connection grpcClient.Config `yaml:"grpc" json:"grpc"`
}

func (c *ResourceAggregateConfig) Validate() error {
if err := c.Connection.Validate(); err != nil {
return fmt.Errorf("grpc.%w", err)
}
return nil
}

type ClientsConfig struct {
Storage storeConfig.Config `yaml:"storage" json:"storage"`
OpenTelemetryCollector otelClient.Config `yaml:"openTelemetryCollector" json:"openTelemetryCollector"`
EventBus EventBusConfig `yaml:"eventBus" json:"eventBus"`
ResourceUpdater updater.ResourceUpdaterConfig `yaml:"resourceUpdater" json:"resourceUpdater"`
Storage storeConfig.Config `yaml:"storage" json:"storage"`
OpenTelemetryCollector otelClient.Config `yaml:"openTelemetryCollector" json:"openTelemetryCollector"`
EventBus EventBusConfig `yaml:"eventBus" json:"eventBus"`
ResourceAggregate ResourceAggregateConfig `yaml:"resourceAggregate" json:"resourceAggregate"`
}

func (c *ClientsConfig) Validate() error {
Expand All @@ -75,8 +86,8 @@ func (c *ClientsConfig) Validate() error {
if err := c.EventBus.Validate(); err != nil {
return fmt.Errorf("eventBus.%w", err)
}
if err := c.ResourceUpdater.Validate(); err != nil {
return fmt.Errorf("resourceUpdater.%w", err)
if err := c.ResourceAggregate.Validate(); err != nil {
return fmt.Errorf("resourceAggregate.%w", err)
}
return nil
}
Expand Down
37 changes: 34 additions & 3 deletions snippet-service/service/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/plgd-dev/hub/v2/snippet-service/service"
storeConfig "github.com/plgd-dev/hub/v2/snippet-service/store/config"
"github.com/plgd-dev/hub/v2/snippet-service/test"
"github.com/plgd-dev/hub/v2/snippet-service/updater"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -120,6 +119,38 @@ func TestStorageConfig(t *testing.T) {
}
}

func TestResourceAggregateConfig(t *testing.T) {
tests := []struct {
name string
cfg service.ResourceAggregateConfig
wantErr bool
}{
{
name: "valid",
cfg: test.MakeResourceAggregateConfig(),
},
{
name: "invalid - no connection",
cfg: func() service.ResourceAggregateConfig {
cfg := service.ResourceAggregateConfig{}
return cfg
}(),
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.cfg.Validate()
if tt.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
})
}
}

func TestClientsConfig(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -173,10 +204,10 @@ func TestClientsConfig(t *testing.T) {
wantErr: true,
},
{
name: "invalid ResourceUpdater",
name: "invalid ResourceAggregate",
cfg: func() service.ClientsConfig {
cfg := test.MakeClientsConfig()
cfg.ResourceUpdater = updater.ResourceUpdaterConfig{}
cfg.ResourceAggregate = service.ResourceAggregateConfig{}
return cfg
}(),
wantErr: true,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package updater
package service

import (
"fmt"
Expand All @@ -7,14 +7,12 @@ import (
"github.com/go-co-op/gocron/v2"
)

func NewExpiredUpdatesChecker(cleanUpExpiredUpdates string, withSeconds bool, updater *ResourceUpdater) (gocron.Scheduler, error) {
func NewExpiredUpdatesChecker(cleanUpExpiredUpdates string, withSeconds bool, onCheckTimeout func()) (gocron.Scheduler, error) {
s, err := gocron.NewScheduler(gocron.WithLocation(time.Local)) //nolint:gosmopolitan
if err != nil {
return nil, fmt.Errorf("cannot create cron job: %w", err)
}
_, err = s.NewJob(gocron.CronJob(cleanUpExpiredUpdates, withSeconds), gocron.NewTask(func() {
updater.TimeoutPendingResourceUpdates()
}))
_, err = s.NewJob(gocron.CronJob(cleanUpExpiredUpdates, withSeconds), gocron.NewTask(onCheckTimeout))
if err != nil {
return nil, fmt.Errorf("cannot create cron job: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions snippet-service/service/http/invokeConfiguration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ func TestRequestHandlerInvokeConfiguration(t *testing.T) {
}()

snippetCfg := test.MakeConfig(t)
snippetCfg.Clients.ResourceUpdater.CleanUpExpiredUpdates = "*/1 * * * * *"
snippetCfg.Clients.ResourceUpdater.ExtendCronParserBySeconds = true
snippetCfg.Clients.Storage.CleanUpExpiredUpdates = "*/1 * * * * *"
snippetCfg.Clients.Storage.ExtendCronParserBySeconds = true
_, shutdownHttp := test.New(t, snippetCfg)
defer shutdownHttp()
logger := log.NewLogger(snippetCfg.Log)
Expand Down
46 changes: 34 additions & 12 deletions snippet-service/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package service
import (
"context"
"fmt"
"sync/atomic"

"github.com/plgd-dev/hub/v2/pkg/config/database"
"github.com/plgd-dev/hub/v2/pkg/fn"
Expand Down Expand Up @@ -32,15 +33,27 @@ type Service struct {
resourceSubscriber *ResourceSubscriber
}

func createStore(ctx context.Context, config storeConfig.Config, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (store.Store, error) {
if config.Use == database.MongoDB {
s, err := mongodb.New(ctx, config.MongoDB, fileWatcher, logger, tracerProvider)
func createStore(ctx context.Context, config storeConfig.Config, onCheckForExpired func(), fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (store.Store, error) {
if config.Use != database.MongoDB {
return nil, fmt.Errorf("invalid store use('%v')", config.Use)
}
s, err := mongodb.New(ctx, config.MongoDB, fileWatcher, logger, tracerProvider)
if err != nil {
return nil, fmt.Errorf("mongodb: %w", err)
}
if config.CleanUpExpiredUpdates != "" {
scheduler, err := NewExpiredUpdatesChecker(config.CleanUpExpiredUpdates, config.ExtendCronParserBySeconds, onCheckForExpired)
if err != nil {
return nil, fmt.Errorf("mongodb: %w", err)
return nil, fmt.Errorf("cannot create scheduler: %w", err)
}
return s, nil
s.AddCloseFunc(func() {
err2 := scheduler.Shutdown()
if err2 != nil {
log.Errorf("failed to shutdown scheduler: %w", err2)
}
})
}
return nil, fmt.Errorf("invalid store use('%v')", config.Use)
return s, nil
}

func newHttpService(ctx context.Context, config HTTPConfig, validatorConfig validator.Config, tlsConfig certManagerServer.Config, ss *grpcService.SnippetServiceServer, fileWatcher *fsnotify.Watcher, logger log.Logger, tracerProvider trace.TracerProvider) (*httpService.Service, func(), error) {
Expand Down Expand Up @@ -84,8 +97,15 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg
var closerFn fn.FuncList
closerFn.AddFunc(otelClient.Close)
tracerProvider := otelClient.GetTracerProvider()
var resourceUpdater atomic.Pointer[updater.ResourceUpdater]

db, err := createStore(ctx, config.Clients.Storage, fileWatcher, logger, tracerProvider)
db, err := createStore(ctx, config.Clients.Storage, func() {
ru := resourceUpdater.Load()
if ru == nil {
return
}
ru.TimeoutPendingResourceUpdates()
}, fileWatcher, logger, tracerProvider)
if err != nil {
closerFn.Execute()
return nil, fmt.Errorf("cannot create store: %w", err)
Expand All @@ -96,19 +116,21 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg
}
})

resourceUpdater, err := updater.NewResourceUpdater(ctx, config.Clients.ResourceUpdater, db, fileWatcher, logger, tracerProvider)
ru, err := updater.NewResourceUpdater(ctx, config.Clients.ResourceAggregate.Connection, db, fileWatcher, logger, tracerProvider)
if err != nil {
closerFn.Execute()
return nil, fmt.Errorf("cannot create resource change handler: %w", err)
}
resourceUpdater.Store(ru)

closerFn.AddFunc(func() {
errC := resourceUpdater.Close()
errC := ru.Close()
if errC != nil {
log.Errorf("failed to close resource change handler: %w", errC)
}
})

resourceSubscriber, err := NewResourceSubscriber(ctx, config.Clients.EventBus.NATS, config.Clients.EventBus.SubscriptionID, fileWatcher, logger, resourceUpdater)
resourceSubscriber, err := NewResourceSubscriber(ctx, config.Clients.EventBus.NATS, config.Clients.EventBus.SubscriptionID, fileWatcher, logger, resourceUpdater.Load())
if err != nil {
closerFn.Execute()
return nil, fmt.Errorf("cannot create resource subscriber: %w", err)
Expand All @@ -120,7 +142,7 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg
}
})

snippetService := grpcService.NewSnippetServiceServer(db, resourceUpdater, config.APIs.GRPC.Authorization.OwnerClaim, config.HubID, logger)
snippetService := grpcService.NewSnippetServiceServer(db, resourceUpdater.Load(), config.APIs.GRPC.Authorization.OwnerClaim, config.HubID, logger)

grpcService, grpcServiceClose, err := newGrpcService(ctx, config.APIs.GRPC, snippetService, fileWatcher, logger, tracerProvider)
if err != nil {
Expand All @@ -144,7 +166,7 @@ func New(ctx context.Context, config Config, fileWatcher *fsnotify.Watcher, logg
Service: s,

store: db,
resourceUpdater: resourceUpdater,
resourceUpdater: resourceUpdater.Load(),
resourceSubscriber: resourceSubscriber,
}, nil
}
Expand Down
Loading
Loading