diff --git a/internal/settings/schema.go b/internal/settings/schema.go new file mode 100644 index 00000000..f3f2b08a --- /dev/null +++ b/internal/settings/schema.go @@ -0,0 +1,87 @@ +package settings + +import ( + "encoding/json" + "fmt" + + "github.com/vshn/provider-exoscale/operator/mapper" + "k8s.io/apimachinery/pkg/runtime" +) + +type Schemas interface { + SetDefaults(schema string, input runtime.RawExtension) (runtime.RawExtension, error) +} + +// ParseSchemas takes an object containing a map of json schemas and parses it +func ParseSchemas(raw []byte) (Schemas, error) { + req := struct { + Settings schemas + }{} + + err := json.Unmarshal(raw, &req) + if err != nil { + return nil, err + } + return req.Settings, nil +} + +type schemas map[string]schema + +type schema struct { + Default interface{} + Properties schemas +} + +// SetDefaults takes a setting for a DBaaS and will set the defaults of the schema with name `name` +func (s schemas) SetDefaults(name string, input runtime.RawExtension) (runtime.RawExtension, error) { + sc, ok := s[name] + if !ok { + return runtime.RawExtension{}, fmt.Errorf("unknown schema: %q", name) + } + + inMap, err := mapper.ToMap(input) + if err != nil { + return runtime.RawExtension{}, fmt.Errorf("failed to parse input: %w", err) + } + + setDefaults(sc, inMap) + + out, err := mapper.ToRawExtension(&inMap) + if err != nil { + return runtime.RawExtension{}, fmt.Errorf("failed to parse defaulted setting: %w", err) + } + return out, nil +} + +func setDefaults(sc schema, input map[string]interface{}) bool { + hasSetDefaults := false + + for key, val := range sc.Properties { + if len(val.Properties) > 0 { + submap := map[string]interface{}{} + + if _, ok := input[key]; ok { + submap, ok = input[key].(map[string]interface{}) + if !ok { + continue + } + } + + if setDefaults(val, submap) { + input[key] = submap + hasSetDefaults = true + } + } else { + _, ok := input[key] + if ok { + continue + } + + if val.Default != nil { + input[key] = val.Default + hasSetDefaults = true + } + } + } + return hasSetDefaults +} diff --git a/internal/settings/schema_test.go b/internal/settings/schema_test.go new file mode 100644 index 00000000..7a90cd3b --- /dev/null +++ b/internal/settings/schema_test.go @@ -0,0 +1,111 @@ +package settings + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/vshn/provider-exoscale/operator/mapper" + "k8s.io/apimachinery/pkg/runtime" +) + +var exampleSchemas = []byte(`{ + "settings": { + "simple": { + "type": "object", + "properties": { + "foo": { + "default": true, + "type": "boolean" + }, + "bar": { + "default": "buzz", + "type": "string" + }, + "count": { + "default": 42, + "type": "integer" + }, + "nodefault": { + "type": ["string", "null"] + } + } + }, + "nested": { + "type": "object", + "properties": { + "foo": { + "default": true, + "type": "boolean" + }, + "obj": { + "type": "object", + "properties": { + "bar": { + "default": "buzz", + "type": "string" + }, + "obj": { + "type": "object", + "properties": { + "count": { + "default": 42, + "type": "integer" + } + } + } + } + } + } + } + } +}`) + +func TestSetDefaultSimple(t *testing.T) { + schemas, err := ParseSchemas(exampleSchemas) + require.NoError(t, err, "failed to parse example schema") + + input := runtime.RawExtension{ + Raw: []byte(`{"bar": "blub"}`), + } + out, err := schemas.SetDefaults("simple", input) + require.NoError(t, err, "failed to set defaults") + + outMap, err := mapper.ToMap(out) + require.NoError(t, err, "failed to set defaults") + + assert.EqualValues(t, true, outMap["foo"]) + assert.EqualValues(t, "blub", outMap["bar"]) + assert.EqualValues(t, 42, outMap["count"]) + _, ok := outMap["nodefault"] + assert.Falsef(t, ok, "should not set values withou defaults") +} + +func TestSetDefaultNested(t *testing.T) { + schemas, err := ParseSchemas(exampleSchemas) + require.NoError(t, err, "failed to parse example schema") + + input := runtime.RawExtension{ + Raw: []byte(`{"bar": "blub"}`), + } + out, err := schemas.SetDefaults("nested", input) + require.NoError(t, err, "failed to set defaults") + + outMap, err := mapper.ToMap(out) + require.NoError(t, err, "failed to set defaults") + + assert.EqualValues(t, true, outMap["foo"]) + assert.EqualValues(t, "blub", outMap["bar"]) + + sub1, ok := outMap["obj"] + require.Truef(t, ok, "should set sub object") + sub1Map, ok := sub1.(map[string]interface{}) + require.Truef(t, ok, "should set sub object as map") + assert.EqualValues(t, "buzz", sub1Map["bar"]) + + sub2, ok := sub1Map["obj"] + require.Truef(t, ok, "should set sub-sub object") + sub2Map, ok := sub2.(map[string]interface{}) + require.Truef(t, ok, "should set sub-sub object as map") + assert.EqualValues(t, 42, sub2Map["count"]) +} diff --git a/operator/kafkacontroller/observe.go b/operator/kafkacontroller/observe.go index 7860ffa3..435066be 100644 --- a/operator/kafkacontroller/observe.go +++ b/operator/kafkacontroller/observe.go @@ -12,7 +12,6 @@ import ( exoscaleapi "github.com/exoscale/egoscale/v2/api" "github.com/exoscale/egoscale/v2/oapi" "github.com/google/go-cmp/cmp" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/utils/pointer" controllerruntime "sigs.k8s.io/controller-runtime" @@ -67,7 +66,13 @@ func (c connection) Observe(ctx context.Context, mg resource.Managed) (managed.E return managed.ExternalObservation{}, fmt.Errorf("failed to get kafka connection details: %w", err) } - upToDate, diff := diffParameters(external, instance.Spec.ForProvider) + currentParams, err := setSettingsDefaults(ctx, c.exo, &instance.Spec.ForProvider) + if err != nil { + log.Error(err, "unable to set kafka settings schema") + currentParams = &instance.Spec.ForProvider + } + + upToDate, diff := diffParameters(external, *currentParams) return managed.ExternalObservation{ ResourceExists: true, @@ -183,7 +188,7 @@ func diffParameters(external *oapi.DbaasServiceKafka, expected exoscalev1.KafkaP return false, err.Error() } - actualKafkaRestSettings, err := getActualKafkaRestSettings(external.KafkaRestSettings, expected.KafkaRestSettings) + actualKafkaRestSettings, err := mapper.ToRawExtension(external.KafkaRestSettings) if err != nil { return false, err.Error() } @@ -209,39 +214,3 @@ func diffParameters(external *oapi.DbaasServiceKafka, expected exoscalev1.KafkaP settingComparer := cmp.Comparer(mapper.CompareSettings) return cmp.Equal(expected, actual, settingComparer), cmp.Diff(expected, actual, settingComparer) } - -// getActualKafkaRestSettings reads the Kafa REST settings and strips out all non relevant default settings -// Exoscale always returns all defaults, not just the fields we set, so we need to strip them so that we can compare the actual and expected setting. -func getActualKafkaRestSettings(actual *map[string]interface{}, expected runtime.RawExtension) (runtime.RawExtension, error) { - if actual == nil { - return runtime.RawExtension{}, nil - } - expectedMap, err := mapper.ToMap(expected) - if err != nil { - return runtime.RawExtension{}, fmt.Errorf("error parsing kafka REST settings: %w", err) - } - s := stripRestSettingsDefaults(*actual, expectedMap) - return mapper.ToRawExtension(&s) -} - -// defaultRestSettings are the default settings for Kafka REST. -var defaultRestSettings = map[string]interface{}{ - "consumer_enable_auto_commit": true, - "producer_acks": "1", // Yes, that's a "1" as a string. I don't know why, that's just how it is.. - "consumer_request_max_bytes": float64(67108864), // When parsing json into map[string]interface{} we get floats. - "simpleconsumer_pool_size_max": float64(25), - "producer_linger_ms": float64(0), - "consumer_request_timeout_ms": float64(1000), -} - -func stripRestSettingsDefaults(actual map[string]interface{}, expected map[string]interface{}) map[string]interface{} { - res := map[string]interface{}{} - for k, v := range actual { - d, isDefault := defaultRestSettings[k] - _, isExpected := expected[k] - if !isDefault || d != v || isExpected { - res[k] = v - } - } - return res -} diff --git a/operator/kafkacontroller/observe_test.go b/operator/kafkacontroller/observe_test.go index ccdb8d3f..2ffcb7a8 100644 --- a/operator/kafkacontroller/observe_test.go +++ b/operator/kafkacontroller/observe_test.go @@ -60,6 +60,7 @@ func TestObserve_UpToDate_ConnectionDetails(t *testing.T) { found.ConnectionInfo.AccessKey = pointer.String("KEY") mockGetKafkaCall(exoMock, "foo", found, nil) + mockGetKafkaSettingsCall(exoMock, nil) mockCACall(exoMock) assert.NotPanics(t, func() { @@ -108,6 +109,7 @@ func TestObserve_UpToDate_ConnectionDetails_with_REST(t *testing.T) { found.KafkaRestEnabled = pointer.Bool(true) found.ConnectionInfo.RestUri = pointer.String("https://admin:BGAUNBS2afjwQ@test.foobar.com:21701") mockGetKafkaCall(exoMock, "foo", found, nil) + mockGetKafkaSettingsCall(exoMock, nil) mockCACall(exoMock) assert.NotPanics(t, func() { @@ -153,6 +155,7 @@ func TestObserve_UpToDate_Status(t *testing.T) { } mockGetKafkaCall(exoMock, "foo", found, nil) + mockGetKafkaSettingsCall(exoMock, nil) mockCACall(exoMock) assert.NotPanics(t, func() { @@ -183,6 +186,7 @@ func TestObserve_UpToDate_Condition_NotReady(t *testing.T) { found.State = &state mockGetKafkaCall(exoMock, "foo", found, nil) + mockGetKafkaSettingsCall(exoMock, nil) mockCACall(exoMock) assert.NotPanics(t, func() { @@ -211,6 +215,7 @@ func TestObserve_UpToDate_Condition_Ready(t *testing.T) { found.State = &state mockGetKafkaCall(exoMock, "foo", found, nil) + mockGetKafkaSettingsCall(exoMock, nil) mockCACall(exoMock) assert.NotPanics(t, func() { @@ -238,6 +243,7 @@ func TestObserve_UpToDate_WithVersion(t *testing.T) { found.Version = pointer.String("3.2.1") mockGetKafkaCall(exoMock, "foo", found, nil) + mockGetKafkaSettingsCall(exoMock, nil) mockCACall(exoMock) assert.NotPanics(t, func() { @@ -262,6 +268,7 @@ func TestObserve_UpToDate_EmptyRestSettings(t *testing.T) { found.KafkaRestEnabled = pointer.Bool(true) mockGetKafkaCall(exoMock, "foo", found, nil) + mockGetKafkaSettingsCall(exoMock, nil) mockCACall(exoMock) assert.NotPanics(t, func() { @@ -291,6 +298,7 @@ func TestObserve_UpToDate_RestSettings(t *testing.T) { found.KafkaRestEnabled = pointer.Bool(true) mockGetKafkaCall(exoMock, "foo", found, nil) + mockGetKafkaSettingsCall(exoMock, nil) mockCACall(exoMock) assert.NotPanics(t, func() { @@ -313,6 +321,7 @@ func TestObserve_Outdated(t *testing.T) { found.Maintenance.Dow = "tuesday" mockGetKafkaCall(exoMock, "foo", found, nil) + mockGetKafkaSettingsCall(exoMock, nil) mockCACall(exoMock) assert.NotPanics(t, func() { @@ -343,6 +352,7 @@ func TestObserve_Outdated_Settings(t *testing.T) { } mockGetKafkaCall(exoMock, "foo", found, nil) + mockGetKafkaSettingsCall(exoMock, nil) mockCACall(exoMock) assert.NotPanics(t, func() { @@ -371,6 +381,7 @@ func TestObserve_Outdated_RestSettings(t *testing.T) { found.KafkaRestEnabled = pointer.Bool(true) mockGetKafkaCall(exoMock, "foo", found, nil) + mockGetKafkaSettingsCall(exoMock, nil) mockCACall(exoMock) assert.NotPanics(t, func() { @@ -444,6 +455,15 @@ func sampleAPIKafka(name string) *oapi.DbaasServiceKafka { return &res } +var defaultRestSettings = map[string]interface{}{ + "consumer_enable_auto_commit": true, + "producer_acks": "1", // Yes, that's a "1" as a string. I don't know why, that's just how it is.. + "consumer_request_max_bytes": float64(67108864), // When parsing json into map[string]interface{} we get floats. + "simpleconsumer_pool_size_max": float64(25), + "producer_linger_ms": float64(0), + "consumer_request_timeout_ms": float64(1000), +} + func mockGetKafkaCall(m *operatortest.ClientWithResponsesInterface, name string, found *oapi.DbaasServiceKafka, err error) { m.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName(name)). Return(&oapi.GetDbaasServiceKafkaResponse{ @@ -453,6 +473,13 @@ func mockGetKafkaCall(m *operatortest.ClientWithResponsesInterface, name string, Once() } +func mockGetKafkaSettingsCall(m *operatortest.ClientWithResponsesInterface, err error) { + m.On("GetDbaasSettingsKafkaWithResponse", mock.Anything). + Return(&oapi.GetDbaasSettingsKafkaResponse{ + Body: rawSettingsResponse, + }, err). + Once() +} func mockCACall(m *operatortest.ClientWithResponsesInterface) { m.On("GetDbaasCaCertificateWithResponse", mock.Anything). Return(&oapi.GetDbaasCaCertificateResponse{ diff --git a/operator/kafkacontroller/settings.go b/operator/kafkacontroller/settings.go new file mode 100644 index 00000000..d46223c1 --- /dev/null +++ b/operator/kafkacontroller/settings.go @@ -0,0 +1,45 @@ +package kafkacontroller + +import ( + "context" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + + "github.com/exoscale/egoscale/v2/oapi" + "github.com/vshn/provider-exoscale/internal/settings" +) + +type settingsFetcher interface { + GetDbaasSettingsKafkaWithResponse(ctx context.Context, reqEditors ...oapi.RequestEditorFn) (*oapi.GetDbaasSettingsKafkaResponse, error) +} + +func setSettingsDefaults(ctx context.Context, f settingsFetcher, in *exoscalev1.KafkaParameters) (*exoscalev1.KafkaParameters, error) { + s, err := fetchSettingSchema(ctx, f) + if err != nil { + return nil, err + } + res := in.DeepCopy() + + res.KafkaRestSettings, err = s.SetDefaults("kafka-rest", res.KafkaRestSettings) + if err != nil { + return nil, err + } + res.KafkaSettings, err = s.SetDefaults("kafka", res.KafkaSettings) + if err != nil { + return nil, err + } + + return res, nil +} + +func fetchSettingSchema(ctx context.Context, f settingsFetcher) (settings.Schemas, error) { + resp, err := f.GetDbaasSettingsKafkaWithResponse(ctx) + if err != nil { + return nil, err + } + schemas, err := settings.ParseSchemas(resp.Body) + if err != nil { + return nil, err + } + return schemas, nil +} diff --git a/operator/kafkacontroller/settings_test.go b/operator/kafkacontroller/settings_test.go new file mode 100644 index 00000000..53d5f490 --- /dev/null +++ b/operator/kafkacontroller/settings_test.go @@ -0,0 +1,76 @@ +package kafkacontroller + +import ( + "context" + "testing" + + "github.com/exoscale/egoscale/v2/oapi" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/operator/mapper" + "k8s.io/apimachinery/pkg/runtime" +) + +//nolint:golint,unused +var emptyKafkaRestSettings = map[string]interface{}{ + "simpleconsumer_pool_size_max": 25, + "producer_linger_ms": 0, + "consumer_request_timeout_ms": 1000, + "consumer_enable_auto_commit": true, + "producer_acks": 1, + "consumer_request_max_bytes": 6.7108864e+07, +} + +type fakeSettingsFetcher struct{} + +func (fakeSettingsFetcher) GetDbaasSettingsKafkaWithResponse(ctx context.Context, reqEditors ...oapi.RequestEditorFn) (*oapi.GetDbaasSettingsKafkaResponse, error) { + return &oapi.GetDbaasSettingsKafkaResponse{ + Body: rawSettingsResponse, + }, nil +} + +func mustToRawExt(t *testing.T, set map[string]interface{}) runtime.RawExtension { + res, err := mapper.ToRawExtension(&set) + require.NoError(t, err, "failed to parse input setting") + return res +} + +func mustToMap(t *testing.T, raw runtime.RawExtension) map[string]interface{} { + res, err := mapper.ToMap(raw) + require.NoError(t, err, "failed to parse set defaults") + return res +} + +func TestDefaultSettings(t *testing.T) { + + found := exoscalev1.KafkaParameters{ + Maintenance: exoscalev1.MaintenanceSpec{}, + Zone: "gva-2", + DBaaSParameters: exoscalev1.DBaaSParameters{ + TerminationProtection: false, + Size: exoscalev1.SizeSpec{ + Plan: "startup-4", + }, + }, + KafkaRestSettings: mustToRawExt(t, map[string]interface{}{ + "producer_compression_type": "gzip", + "producer_acks": 4, + }), + KafkaSettings: mustToRawExt(t, map[string]interface{}{ + "group_max_session_timeout_ms": 42, + }), + } + + withDefaults, err := setSettingsDefaults(context.Background(), fakeSettingsFetcher{}, &found) + require.NoError(t, err, "failed to set defaults") + + restWD := mustToMap(t, withDefaults.KafkaRestSettings) + assert.EqualValues(t, 25, restWD["simpleconsumer_pool_size_max"]) + assert.EqualValues(t, "gzip", restWD["producer_compression_type"]) + assert.EqualValues(t, 4, restWD["producer_acks"]) + + assert.EqualValues(t, 42, mustToMap(t, withDefaults.KafkaSettings)["group_max_session_timeout_ms"]) +} + +var rawSettingsResponse = []byte(`{"settings":{"kafka":{"properties":{"group_max_session_timeout_ms":{"description":"The maximum allowed session timeout for registered consumers. Longer timeouts give consumers more time to process messages in between heartbeats at the cost of a longer time to detect failures.","maximum":1800000,"type":"integer","title":"group.max.session.timeout.ms","minimum":0,"example":1800000},"log_flush_interval_messages":{"description":"The number of messages accumulated on a log partition before messages are flushed to disk","maximum":9223372036854775807,"type":"integer","title":"log.flush.interval.messages","minimum":1,"example":9223372036854775807},"max_connections_per_ip":{"description":"The maximum number of connections allowed from each ip address (defaults to 2147483647).","maximum":2147483647,"type":"integer","title":"max.connections.per.ip","minimum":256},"log_index_size_max_bytes":{"description":"The maximum size in bytes of the offset index","maximum":104857600,"type":"integer","title":"log.index.size.max.bytes","minimum":1048576,"example":10485760},"auto_create_topics_enable":{"description":"Enable auto creation of topics","type":"boolean","title":"auto.create.topics.enable","example":true},"log_index_interval_bytes":{"description":"The interval with which Kafka adds an entry to the offset index","maximum":104857600,"type":"integer","title":"log.index.interval.bytes","minimum":0,"example":4096},"replica_fetch_max_bytes":{"description":"The number of bytes of messages to attempt to fetch for each partition (defaults to 1048576). This is not an absolute maximum, if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made.","maximum":104857600,"type":"integer","title":"replica.fetch.max.bytes","minimum":1048576},"num_partitions":{"description":"Number of partitions for autocreated topics","maximum":1000,"type":"integer","title":"num.partitions","minimum":1},"transaction_state_log_segment_bytes":{"description":"The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads (defaults to 104857600 (100 mebibytes)).","maximum":2147483647,"type":"integer","title":"transaction.state.log.segment.bytes","minimum":1048576,"example":104857600},"replica_fetch_response_max_bytes":{"description":"Maximum bytes expected for the entire fetch response (defaults to 10485760). Records are fetched in batches, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made. As such, this is not an absolute maximum.","maximum":1048576000,"type":"integer","title":"replica.fetch.response.max.bytes","minimum":10485760},"log_message_timestamp_type":{"description":"Define whether the timestamp in the message is message create time or log append time.","enum":["CreateTime","LogAppendTime"],"type":"string","title":"log.message.timestamp.type"},"connections_max_idle_ms":{"description":"Idle connections timeout: the server socket processor threads close the connections that idle for longer than this.","maximum":3600000,"type":"integer","title":"connections.max.idle.ms","minimum":1000,"example":540000},"log_flush_interval_ms":{"description":"The maximum time in ms that a message in any topic is kept in memory before flushed to disk. If not set, the value in log.flush.scheduler.interval.ms is used","maximum":9223372036854775807,"type":"integer","title":"log.flush.interval.ms","minimum":0},"log_preallocate":{"description":"Should pre allocate file when create new segment?","type":"boolean","title":"log.preallocate","example":false},"log_segment_delete_delay_ms":{"description":"The amount of time to wait before deleting a file from the filesystem","maximum":3600000,"type":"integer","title":"log.segment.delete.delay.ms","minimum":0,"example":60000},"message_max_bytes":{"description":"The maximum size of message that the server can receive.","maximum":100001200,"type":"integer","title":"message.max.bytes","minimum":0,"example":1048588},"log_cleaner_min_cleanable_ratio":{"description":"Controls log compactor frequency. Larger value means more frequent compactions but also more space wasted for logs. Consider setting log.cleaner.max.compaction.lag.ms to enforce compactions sooner, instead of setting a very high value for this option.","maximum":0.9,"type":"number","title":"log.cleaner.min.cleanable.ratio","minimum":0.2,"example":0.5},"group_initial_rebalance_delay_ms":{"description":"The amount of time, in milliseconds, the group coordinator will wait for more consumers to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins. The default value for this is 3 seconds. During development and testing it might be desirable to set this to 0 in order to not delay test execution time.","maximum":300000,"type":"integer","title":"group.initial.rebalance.delay.ms","minimum":0,"example":3000},"log_cleanup_policy":{"description":"The default cleanup policy for segments beyond the retention window","enum":["delete","compact","compact,delete"],"type":"string","title":"log.cleanup.policy","example":"delete"},"log_roll_jitter_ms":{"description":"The maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in log.roll.jitter.hours is used","maximum":9223372036854775807,"type":"integer","title":"log.roll.jitter.ms","minimum":0},"transaction_remove_expired_transaction_cleanup_interval_ms":{"description":"The interval at which to remove transactions that have expired due to transactional.id.expiration.ms passing (defaults to 3600000 (1 hour)).","maximum":3600000,"type":"integer","title":"transaction.remove.expired.transaction.cleanup.interval.ms","minimum":600000,"example":3600000},"default_replication_factor":{"description":"Replication factor for autocreated topics","maximum":10,"type":"integer","title":"default.replication.factor","minimum":1},"log_roll_ms":{"description":"The maximum time before a new log segment is rolled out (in milliseconds).","maximum":9223372036854775807,"type":"integer","title":"log.roll.ms","minimum":1},"producer_purgatory_purge_interval_requests":{"description":"The purge interval (in number of requests) of the producer request purgatory(defaults to 1000).","maximum":10000,"type":"integer","title":"producer.purgatory.purge.interval.requests","minimum":10},"log_retention_bytes":{"description":"The maximum size of the log before deleting messages","maximum":9223372036854775807,"type":"integer","title":"log.retention.bytes","minimum":-1},"log_cleaner_min_compaction_lag_ms":{"description":"The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.","maximum":9223372036854775807,"type":"integer","title":"log.cleaner.min.compaction.lag.ms","minimum":0},"min_insync_replicas":{"description":"When a producer sets acks to 'all' (or '-1'), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful.","maximum":7,"type":"integer","title":"min.insync.replicas","minimum":1,"example":1},"compression_type":{"description":"Specify the final compression type for a given topic. This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd'). It additionally accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the original compression codec set by the producer.","enum":["gzip","snappy","lz4","zstd","uncompressed","producer"],"type":"string","title":"compression.type"},"log_message_timestamp_difference_max_ms":{"description":"The maximum difference allowed between the timestamp when a broker receives a message and the timestamp specified in the message","maximum":9223372036854775807,"type":"integer","title":"log.message.timestamp.difference.max.ms","minimum":0},"log_message_downconversion_enable":{"description":"This configuration controls whether down-conversion of message formats is enabled to satisfy consume requests. ","type":"boolean","title":"log.message.downconversion.enable","example":true},"max_incremental_fetch_session_cache_slots":{"description":"The maximum number of incremental fetch sessions that the broker will maintain.","maximum":10000,"type":"integer","title":"max.incremental.fetch.session.cache.slots","minimum":1000,"example":1000},"log_cleaner_max_compaction_lag_ms":{"description":"The maximum amount of time message will remain uncompacted. Only applicable for logs that are being compacted","maximum":9223372036854775807,"type":"integer","title":"log.cleaner.max.compaction.lag.ms","minimum":30000},"log_retention_hours":{"description":"The number of hours to keep a log file before deleting it","maximum":2147483647,"type":"integer","title":"log.retention.hours","minimum":-1},"group_min_session_timeout_ms":{"description":"The minimum allowed session timeout for registered consumers. Longer timeouts give consumers more time to process messages in between heartbeats at the cost of a longer time to detect failures.","maximum":60000,"type":"integer","title":"group.min.session.timeout.ms","minimum":0,"example":6000},"socket_request_max_bytes":{"description":"The maximum number of bytes in a socket request (defaults to 104857600).","maximum":209715200,"type":"integer","title":"socket.request.max.bytes","minimum":10485760},"log_cleaner_delete_retention_ms":{"description":"How long are delete records retained?","maximum":315569260000,"type":"integer","title":"log.cleaner.delete.retention.ms","minimum":0,"example":86400000},"log_segment_bytes":{"description":"The maximum size of a single log file","maximum":1073741824,"type":"integer","title":"log.segment.bytes","minimum":10485760},"offsets_retention_minutes":{"description":"Log retention window in minutes for offsets topic","maximum":2147483647,"type":"integer","title":"offsets.retention.minutes","minimum":1,"example":10080},"log_retention_ms":{"description":"The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in log.retention.minutes is used. If set to -1, no time limit is applied.","maximum":9223372036854775807,"type":"integer","title":"log.retention.ms","minimum":-1}},"additionalProperties":false,"default":{},"type":"object","title":"Kafka broker configuration values"},"kafka-connect":{"properties":{"producer_buffer_memory":{"description":"The total bytes of memory the producer can use to buffer records waiting to be sent to the broker (defaults to 33554432).","maximum":134217728,"type":"integer","title":"The total bytes of memory the producer can use to buffer records waiting to be sent to the broker","minimum":5242880,"example":8388608},"consumer_max_poll_interval_ms":{"description":"The maximum delay in milliseconds between invocations of poll() when using consumer group management (defaults to 300000).","maximum":2147483647,"type":"integer","title":"The maximum delay between polls when using consumer group management","minimum":1,"example":300000},"producer_compression_type":{"description":"Specify the default compression type for producers. This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd'). It additionally accepts 'none' which is the default and equivalent to no compression.","enum":["gzip","snappy","lz4","zstd","none"],"type":"string","title":"The default compression type for producers"},"connector_client_config_override_policy":{"description":"Defines what client configurations can be overridden by the connector. Default is None","enum":["None","All"],"type":"string","title":"Client config override policy"},"offset_flush_interval_ms":{"description":"The interval at which to try committing offsets for tasks (defaults to 60000).","maximum":100000000,"type":"integer","title":"The interval at which to try committing offsets for tasks","minimum":1,"example":60000},"consumer_fetch_max_bytes":{"description":"Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum.","maximum":104857600,"type":"integer","title":"The maximum amount of data the server should return for a fetch request","minimum":1048576,"example":52428800},"consumer_max_partition_fetch_bytes":{"description":"Records are fetched in batches by the consumer.If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. ","maximum":104857600,"type":"integer","title":"The maximum amount of data per-partition the server will return.","minimum":1048576,"example":1048576},"offset_flush_timeout_ms":{"description":"Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt (defaults to 5000).","maximum":2147483647,"type":"integer","title":"Offset flush timeout","minimum":1,"example":5000},"consumer_auto_offset_reset":{"description":"What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. Default is earliest","enum":["earliest","latest"],"type":"string","title":"Consumer auto offset reset"},"producer_max_request_size":{"description":"This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.","maximum":67108864,"type":"integer","title":"The maximum size of a request in bytes","minimum":131072,"example":1048576},"producer_batch_size":{"description":"This setting gives the upper bound of the batch size to be sent. If there are fewer than this many bytes accumulated for this partition, the producer will 'linger' for the linger.ms time waiting for more records to show up. A batch size of zero will disable batching entirely (defaults to 16384).","maximum":5242880,"type":"integer","title":"The batch size in bytes the producer will attempt to collect for the same partition before publishing to broker","minimum":0,"example":1024},"session_timeout_ms":{"description":"The timeout in milliseconds used to detect failures when using Kafka’s group management facilities (defaults to 10000).","maximum":2147483647,"type":"integer","title":"The timeout used to detect failures when using Kafka’s group management facilities","minimum":1,"example":10000},"producer_linger_ms":{"description":"This setting gives the upper bound on the delay for batching: once there is batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if there are fewer than this many bytes accumulated for this partition the producer will 'linger' for the specified time waiting for more records to show up. Defaults to 0.","maximum":5000,"type":"integer","title":"Wait for up to the given delay to allow batching records together","minimum":0,"example":100},"consumer_isolation_level":{"description":"Transaction read isolation level. read_uncommitted is the default, but read_committed can be used if consume-exactly-once behavior is desired.","enum":["read_uncommitted","read_committed"],"type":"string","title":"Consumer isolation level"},"consumer_max_poll_records":{"description":"The maximum number of records returned in a single call to poll() (defaults to 500).","maximum":10000,"type":"integer","title":"The maximum number of records returned by a single poll","minimum":1,"example":500}},"additionalProperties":false,"type":"object","title":"Kafka Connect configuration values"},"kafka-rest":{"properties":{"producer_compression_type":{"description":"Specify the default compression type for producers. This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd'). It additionally accepts 'none' which is the default and equivalent to no compression.","enum":["gzip","snappy","lz4","zstd","none"],"type":"string","title":"producer.compression.type"},"consumer_enable_auto_commit":{"description":"If true the consumer's offset will be periodically committed to Kafka in the background","default":true,"type":"boolean","title":"consumer.enable.auto.commit"},"producer_acks":{"description":"The number of acknowledgments the producer requires the leader to have received before considering a request complete. If set to 'all' or '-1', the leader will wait for the full set of in-sync replicas to acknowledge the record.","enum":["all","-1","0","1"],"default":"1","type":"string","title":"producer.acks"},"consumer_request_max_bytes":{"description":"Maximum number of bytes in unencoded message keys and values by a single request","default":67108864,"maximum":671088640,"type":"integer","title":"consumer.request.max.bytes","minimum":0},"simpleconsumer_pool_size_max":{"description":"Maximum number of SimpleConsumers that can be instantiated per broker","default":25,"maximum":250,"type":"integer","title":"simpleconsumer.pool.size.max","minimum":10},"producer_linger_ms":{"description":"Wait for up to the given delay to allow batching records together","default":0,"maximum":5000,"type":"integer","title":"producer.linger.ms","minimum":0},"consumer_request_timeout_ms":{"description":"The maximum total time to wait for messages for a request if the maximum number of messages has not yet been reached","enum":[1000,15000,30000],"default":1000,"maximum":30000,"type":"integer","title":"consumer.request.timeout.ms","minimum":1000}},"additionalProperties":false,"type":"object","title":"Kafka REST configuration"},"schema-registry":{"properties":{"topic_name":{"description":"The durable single partition topic that acts as the durable log for the data. This topic must be compacted to avoid losing data due to retention policy. Please note that changing this configuration in an existing Schema Registry / Karapace setup leads to previous schemas being inaccessible, data encoded with them potentially unreadable and schema ID sequence put out of order. It's only possible to do the switch while Schema Registry / Karapace is disabled. Defaults to _schemas.","type":"string","minLength":1,"user_error":"Must consist of alpha-numeric characters, underscores, dashes or dots, max 249 characters","title":"topic_name","maxLength":249,"example":"_schemas","pattern":"^(?!\\.$|\\.\\.$)[-_.A-Za-z0-9]+$"},"leader_eligibility":{"description":"If true, Karapace / Schema Registry on the service nodes can participate in leader election. It might be needed to disable this when the schemas topic is replicated to a secondary cluster and Karapace / Schema Registry there must not participate in leader election. Defaults to true.","type":"boolean","title":"leader_eligibility","example":true}},"additionalProperties":false,"type":"object","title":"Schema Registry configuration"}}}`) diff --git a/operator/mysqlcontroller/observe.go b/operator/mysqlcontroller/observe.go index 820a1656..44a44a8b 100644 --- a/operator/mysqlcontroller/observe.go +++ b/operator/mysqlcontroller/observe.go @@ -72,10 +72,15 @@ func (p *pipeline) Observe(ctx context.Context, mg resource.Managed) (managed.Ex if err != nil { return managed.ExternalObservation{}, fmt.Errorf("cannot parse parameters: %w", err) } + currentParams, err := setSettingsDefaults(ctx, p.exo, &mySQLInstance.Spec.ForProvider) + if err != nil { + log.Error(err, "unable to set mysql settings schema") + currentParams = &mySQLInstance.Spec.ForProvider + } return managed.ExternalObservation{ ResourceExists: true, - ResourceUpToDate: isUpToDate(&mySQLInstance.Spec.ForProvider, params, log), + ResourceUpToDate: isUpToDate(currentParams, params, log), ConnectionDetails: connDetails, }, nil } diff --git a/operator/mysqlcontroller/settings.go b/operator/mysqlcontroller/settings.go new file mode 100644 index 00000000..7b76892f --- /dev/null +++ b/operator/mysqlcontroller/settings.go @@ -0,0 +1,41 @@ +package mysqlcontroller + +import ( + "context" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + + "github.com/exoscale/egoscale/v2/oapi" + "github.com/vshn/provider-exoscale/internal/settings" +) + +type settingsFetcher interface { + GetDbaasSettingsMysqlWithResponse(ctx context.Context, reqEditors ...oapi.RequestEditorFn) (*oapi.GetDbaasSettingsMysqlResponse, error) +} + +func setSettingsDefaults(ctx context.Context, f settingsFetcher, in *exoscalev1.MySQLParameters) (*exoscalev1.MySQLParameters, error) { + s, err := fetchSettingSchema(ctx, f) + if err != nil { + return nil, err + } + res := in.DeepCopy() + + res.MySQLSettings, err = s.SetDefaults("mysql", res.MySQLSettings) + if err != nil { + return nil, err + } + + return res, nil +} + +func fetchSettingSchema(ctx context.Context, f settingsFetcher) (settings.Schemas, error) { + resp, err := f.GetDbaasSettingsMysqlWithResponse(ctx) + if err != nil { + return nil, err + } + schemas, err := settings.ParseSchemas(resp.Body) + if err != nil { + return nil, err + } + return schemas, nil +} diff --git a/operator/mysqlcontroller/settings_test.go b/operator/mysqlcontroller/settings_test.go new file mode 100644 index 00000000..398e6c81 --- /dev/null +++ b/operator/mysqlcontroller/settings_test.go @@ -0,0 +1,52 @@ +package mysqlcontroller + +import ( + "context" + "testing" + + "github.com/exoscale/egoscale/v2/oapi" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/operator/mapper" + "k8s.io/apimachinery/pkg/runtime" +) + +type fakeSettingsFetcher struct{} + +func (fakeSettingsFetcher) GetDbaasSettingsMysqlWithResponse(ctx context.Context, reqEditors ...oapi.RequestEditorFn) (*oapi.GetDbaasSettingsMysqlResponse, error) { + return &oapi.GetDbaasSettingsMysqlResponse{ + Body: rawResponse, + }, nil +} + +func mustToRawExt(t *testing.T, set map[string]interface{}) runtime.RawExtension { + res, err := mapper.ToRawExtension(&set) + require.NoError(t, err, "failed to parse input setting") + return res +} + +func TestDefaultSettings(t *testing.T) { + found := exoscalev1.MySQLParameters{ + Maintenance: exoscalev1.MaintenanceSpec{}, + Zone: "gva-2", + DBaaSParameters: exoscalev1.DBaaSParameters{ + TerminationProtection: false, + Size: exoscalev1.SizeSpec{ + Plan: "startup-4", + }, + }, + MySQLSettings: mustToRawExt(t, map[string]interface{}{ + "net_write_timeout": 24, + }), + } + + withDefaults, err := setSettingsDefaults(context.Background(), fakeSettingsFetcher{}, &found) + require.NoError(t, err, "failed to set defaults") + setingsWithDefaults, err := mapper.ToMap(withDefaults.MySQLSettings) + require.NoError(t, err, "failed to parse set defaults") + assert.EqualValues(t, 24, setingsWithDefaults["net_write_timeout"]) + assert.Len(t, setingsWithDefaults, 1) +} + +var rawResponse = []byte(`{"settings":{"mysql":{"properties":{"net_write_timeout":{"description":"The number of seconds to wait for a block to be written to a connection before aborting the write.","maximum":3600,"type":"integer","title":"net_write_timeout","minimum":1,"example":30},"internal_tmp_mem_storage_engine":{"description":"The storage engine for in-memory internal temporary tables.","enum":["TempTable","MEMORY"],"type":"string","title":"internal_tmp_mem_storage_engine","example":"TempTable"},"sql_mode":{"description":"Global SQL mode. Set to empty to use MySQL server defaults. When creating a new service and not setting this field Aiven default SQL mode (strict, SQL standard compliant) will be assigned.","type":"string","user_error":"Must be uppercase alphabetic characters, underscores and commas","title":"sql_mode","maxLength":1024,"example":"ANSI,TRADITIONAL","pattern":"^[A-Z_]*(,[A-Z_]+)*$"},"information_schema_stats_expiry":{"description":"The time, in seconds, before cached statistics expire","maximum":31536000,"type":"integer","title":"information_schema_stats_expiry","minimum":900,"example":86400},"sort_buffer_size":{"description":"Sort buffer size in bytes for ORDER BY optimization. Default is 262144 (256K)","maximum":1073741824,"type":"integer","title":"sort_buffer_size","minimum":32768,"example":262144},"innodb_thread_concurrency":{"description":"Defines the maximum number of threads permitted inside of InnoDB. Default is 0 (infinite concurrency - no limit)","maximum":1000,"type":"integer","title":"innodb_thread_concurrency","minimum":0,"example":10},"innodb_write_io_threads":{"description":"The number of I/O threads for write operations in InnoDB. Default is 4. Changing this parameter will lead to a restart of the MySQL service.","maximum":64,"type":"integer","title":"innodb_write_io_threads","minimum":1,"example":10},"innodb_ft_min_token_size":{"description":"Minimum length of words that are stored in an InnoDB FULLTEXT index. Changing this parameter will lead to a restart of the MySQL service.","maximum":16,"type":"integer","title":"innodb_ft_min_token_size","minimum":0,"example":3},"innodb_change_buffer_max_size":{"description":"Maximum size for the InnoDB change buffer, as a percentage of the total size of the buffer pool. Default is 25","maximum":50,"type":"integer","title":"innodb_change_buffer_max_size","minimum":0,"example":30},"innodb_flush_neighbors":{"description":"Specifies whether flushing a page from the InnoDB buffer pool also flushes other dirty pages in the same extent (default is 1): 0 - dirty pages in the same extent are not flushed, 1 - flush contiguous dirty pages in the same extent, 2 - flush dirty pages in the same extent","maximum":2,"type":"integer","title":"innodb_flush_neighbors","minimum":0,"example":0},"tmp_table_size":{"description":"Limits the size of internal in-memory tables. Also set max_heap_table_size. Default is 16777216 (16M)","maximum":1073741824,"type":"integer","title":"tmp_table_size","minimum":1048576,"example":16777216},"slow_query_log":{"description":"Slow query log enables capturing of slow queries. Setting slow_query_log to false also truncates the mysql.slow_log table. Default is off","type":"boolean","title":"slow_query_log","example":true},"connect_timeout":{"description":"The number of seconds that the mysqld server waits for a connect packet before responding with Bad handshake","maximum":3600,"type":"integer","title":"connect_timeout","minimum":2,"example":10},"net_read_timeout":{"description":"The number of seconds to wait for more data from a connection before aborting the read.","maximum":3600,"type":"integer","title":"net_read_timeout","minimum":1,"example":30},"innodb_lock_wait_timeout":{"description":"The length of time in seconds an InnoDB transaction waits for a row lock before giving up.","maximum":3600,"type":"integer","title":"innodb_lock_wait_timeout","minimum":1,"example":50},"wait_timeout":{"description":"The number of seconds the server waits for activity on a noninteractive connection before closing it.","maximum":2147483,"type":"integer","title":"wait_timeout","minimum":1,"example":28800},"innodb_rollback_on_timeout":{"description":"When enabled a transaction timeout causes InnoDB to abort and roll back the entire transaction. Changing this parameter will lead to a restart of the MySQL service.","type":"boolean","title":"innodb_rollback_on_timeout","example":true},"group_concat_max_len":{"description":"The maximum permitted result length in bytes for the GROUP_CONCAT() function.","maximum":18446744073709551615,"type":"integer","title":"group_concat_max_len","minimum":4,"example":1024},"net_buffer_length":{"description":"Start sizes of connection buffer and result buffer. Default is 16384 (16K). Changing this parameter will lead to a restart of the MySQL service.","maximum":1048576,"type":"integer","title":"net_buffer_length","minimum":1024,"example":16384},"innodb_print_all_deadlocks":{"description":"When enabled, information about all deadlocks in InnoDB user transactions is recorded in the error log. Disabled by default.","type":"boolean","title":"innodb_print_all_deadlocks","example":true},"innodb_online_alter_log_max_size":{"description":"The upper limit in bytes on the size of the temporary log files used during online DDL operations for InnoDB tables.","maximum":1099511627776,"type":"integer","title":"innodb_online_alter_log_max_size","minimum":65536,"example":134217728},"interactive_timeout":{"description":"The number of seconds the server waits for activity on an interactive connection before closing it.","maximum":604800,"type":"integer","title":"interactive_timeout","minimum":30,"example":3600},"innodb_log_buffer_size":{"description":"The size in bytes of the buffer that InnoDB uses to write to the log files on disk.","maximum":4294967295,"type":"integer","title":"innodb_log_buffer_size","minimum":1048576,"example":16777216},"max_allowed_packet":{"description":"Size of the largest message in bytes that can be received by the server. Default is 67108864 (64M)","maximum":1073741824,"type":"integer","title":"max_allowed_packet","minimum":102400,"example":67108864},"max_heap_table_size":{"description":"Limits the size of internal in-memory tables. Also set tmp_table_size. Default is 16777216 (16M)","maximum":1073741824,"type":"integer","title":"max_heap_table_size","minimum":1048576,"example":16777216},"innodb_ft_server_stopword_table":{"description":"This option is used to specify your own InnoDB FULLTEXT index stopword list for all InnoDB tables.","type":["null","string"],"title":"innodb_ft_server_stopword_table","maxLength":1024,"example":"db_name/table_name","pattern":"^.+/.+$"},"innodb_read_io_threads":{"description":"The number of I/O threads for read operations in InnoDB. Default is 4. Changing this parameter will lead to a restart of the MySQL service.","maximum":64,"type":"integer","title":"innodb_read_io_threads","minimum":1,"example":10},"sql_require_primary_key":{"description":"Require primary key to be defined for new tables or old tables modified with ALTER TABLE and fail if missing. It is recommended to always have primary keys because various functionality may break if any large table is missing them.","type":"boolean","title":"sql_require_primary_key","example":true},"default_time_zone":{"description":"Default server time zone as an offset from UTC (from -12:00 to +12:00), a time zone name, or 'SYSTEM' to use the MySQL server default.","type":"string","minLength":2,"title":"default_time_zone","maxLength":100,"example":"+03:00"},"long_query_time":{"description":"The slow_query_logs work as SQL statements that take more than long_query_time seconds to execute. Default is 10s","maximum":3600,"type":"number","title":"long_query_time","minimum":0.0,"example":10}},"additionalProperties":false,"type":"object","title":"mysql.conf configuration values"}}}`) diff --git a/operator/opensearchcontroller/observe.go b/operator/opensearchcontroller/observe.go index 529e05fc..d04f16c9 100644 --- a/operator/opensearchcontroller/observe.go +++ b/operator/opensearchcontroller/observe.go @@ -66,9 +66,15 @@ func (p *pipeline) Observe(ctx context.Context, mg resource.Managed) (managed.Ex return managed.ExternalObservation{}, fmt.Errorf("cannot parse parameters: %w", err) } + currentParams, err := setSettingsDefaults(ctx, p.exo, &openSearchInstance.Spec.ForProvider) + if err != nil { + log.Error(err, "unable to set opensearch settings schema") + currentParams = &openSearchInstance.Spec.ForProvider + } + return managed.ExternalObservation{ ResourceExists: true, - ResourceUpToDate: isUpToDate(&openSearchInstance.Spec.ForProvider, params, log), + ResourceUpToDate: isUpToDate(currentParams, params, log), ConnectionDetails: connDetails, }, nil } diff --git a/operator/opensearchcontroller/settings.go b/operator/opensearchcontroller/settings.go new file mode 100644 index 00000000..c638e907 --- /dev/null +++ b/operator/opensearchcontroller/settings.go @@ -0,0 +1,41 @@ +package opensearchcontroller + +import ( + "context" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + + "github.com/exoscale/egoscale/v2/oapi" + "github.com/vshn/provider-exoscale/internal/settings" +) + +type settingsFetcher interface { + GetDbaasSettingsOpensearchWithResponse(ctx context.Context, reqEditors ...oapi.RequestEditorFn) (*oapi.GetDbaasSettingsOpensearchResponse, error) +} + +func setSettingsDefaults(ctx context.Context, f settingsFetcher, in *exoscalev1.OpenSearchParameters) (*exoscalev1.OpenSearchParameters, error) { + s, err := fetchSettingSchema(ctx, f) + if err != nil { + return nil, err + } + res := in.DeepCopy() + + res.OpenSearchSettings, err = s.SetDefaults("opensearch", res.OpenSearchSettings) + if err != nil { + return nil, err + } + + return res, nil +} + +func fetchSettingSchema(ctx context.Context, f settingsFetcher) (settings.Schemas, error) { + resp, err := f.GetDbaasSettingsOpensearchWithResponse(ctx) + if err != nil { + return nil, err + } + schemas, err := settings.ParseSchemas(resp.Body) + if err != nil { + return nil, err + } + return schemas, nil +} diff --git a/operator/opensearchcontroller/settings_test.go b/operator/opensearchcontroller/settings_test.go new file mode 100644 index 00000000..39685e78 --- /dev/null +++ b/operator/opensearchcontroller/settings_test.go @@ -0,0 +1,52 @@ +package opensearchcontroller + +import ( + "context" + "testing" + + "github.com/exoscale/egoscale/v2/oapi" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/operator/mapper" + "k8s.io/apimachinery/pkg/runtime" +) + +type fakeSettingsFetcher struct{} + +func (fakeSettingsFetcher) GetDbaasSettingsOpensearchWithResponse(ctx context.Context, reqEditors ...oapi.RequestEditorFn) (*oapi.GetDbaasSettingsOpensearchResponse, error) { + return &oapi.GetDbaasSettingsOpensearchResponse{ + Body: rawResponse, + }, nil +} + +func mustToRawExt(t *testing.T, set map[string]interface{}) runtime.RawExtension { + res, err := mapper.ToRawExtension(&set) + require.NoError(t, err, "failed to parse input setting") + return res +} + +func TestDefaultSettings(t *testing.T) { + found := exoscalev1.OpenSearchParameters{ + Maintenance: exoscalev1.MaintenanceSpec{}, + Zone: "gva-2", + DBaaSParameters: exoscalev1.DBaaSParameters{ + TerminationProtection: false, + Size: exoscalev1.SizeSpec{ + Plan: "startup-4", + }, + }, + OpenSearchSettings: mustToRawExt(t, map[string]interface{}{ + "thread_pool_search_throttled_size": 42, + }), + } + + withDefaults, err := setSettingsDefaults(context.Background(), fakeSettingsFetcher{}, &found) + require.NoError(t, err, "failed to set defaults") + setingsWithDefaults, err := mapper.ToMap(withDefaults.OpenSearchSettings) + require.NoError(t, err, "failed to parse set defaults") + assert.EqualValues(t, 42, setingsWithDefaults["thread_pool_search_throttled_size"]) + assert.Len(t, setingsWithDefaults, 1) +} + +var rawResponse = []byte(`{"settings":{"opensearch":{"properties":{"thread_pool_search_throttled_size":{"description":"Size for the thread pool. See documentation for exact details. Do note this may have maximum value depending on CPU count - value is automatically lowered if set to higher than maximum value.","maximum":128,"type":"integer","title":"search_throttled thread pool size","minimum":1},"thread_pool_analyze_size":{"description":"Size for the thread pool. See documentation for exact details. Do note this may have maximum value depending on CPU count - value is automatically lowered if set to higher than maximum value.","maximum":128,"type":"integer","title":"analyze thread pool size","minimum":1},"thread_pool_get_size":{"description":"Size for the thread pool. See documentation for exact details. Do note this may have maximum value depending on CPU count - value is automatically lowered if set to higher than maximum value.","maximum":128,"type":"integer","title":"get thread pool size","minimum":1},"thread_pool_get_queue_size":{"description":"Size for the thread pool queue. See documentation for exact details.","maximum":2000,"type":"integer","title":"get thread pool queue size","minimum":10},"indices_recovery_max_concurrent_file_chunks":{"description":"Number of file chunks sent in parallel for each recovery. Defaults to 2.","maximum":5,"type":"integer","title":"indices.recovery.max_concurrent_file_chunks","minimum":2},"indices_queries_cache_size":{"description":"Percentage value. Default is 10%. Maximum amount of heap used for query cache. This is an expert setting. Too low value will decrease query performance and increase performance for other operations; too high value will cause issues with other OpenSearch functionality.","maximum":40,"type":"integer","title":"indices.queries.cache.size","minimum":3},"thread_pool_search_size":{"description":"Size for the thread pool. See documentation for exact details. Do note this may have maximum value depending on CPU count - value is automatically lowered if set to higher than maximum value.","maximum":128,"type":"integer","title":"search thread pool size","minimum":1},"indices_recovery_max_bytes_per_sec":{"description":"Limits total inbound and outbound recovery traffic for each node. Applies to both peer recoveries as well as snapshot recoveries (i.e., restores from a snapshot). Defaults to 40mb","maximum":400,"type":"integer","title":"indices.recovery.max_bytes_per_sec","minimum":40},"http_max_initial_line_length":{"description":"The max length of an HTTP URL, in bytes","maximum":65536,"type":"integer","title":"http.max_initial_line_length","minimum":1024,"example":4096},"thread_pool_write_queue_size":{"description":"Size for the thread pool queue. See documentation for exact details.","maximum":2000,"type":"integer","title":"write thread pool queue size","minimum":10},"script_max_compilations_rate":{"description":"Script compilation circuit breaker limits the number of inline script compilations within a period of time. Default is use-context","type":"string","title":"Script max compilation rate - circuit breaker to prevent/minimize OOMs","maxLength":1024,"example":"75/5m"},"search_max_buckets":{"description":"Maximum number of aggregation buckets allowed in a single response. OpenSearch default value is used when this is not defined.","maximum":20000,"type":["integer","null"],"title":"search.max_buckets","minimum":1,"example":10000},"reindex_remote_whitelist":{"description":"Whitelisted addresses for reindexing. Changing this value will cause all OpenSearch instances to restart.","type":["array","null"],"title":"reindex_remote_whitelist","items":{"type":["string","null"],"title":"Address (hostname:port or IP:port)","maxLength":261,"example":"anotherservice.aivencloud.com:12398"},"maxItems":32},"override_main_response_version":{"description":"Compatibility mode sets OpenSearch to report its version as 7.10 so clients continue to work. Default is false","type":"boolean","title":"compatibility.override_main_response_version","example":true},"http_max_header_size":{"description":"The max size of allowed headers, in bytes","maximum":262144,"type":"integer","title":"http.max_header_size","minimum":1024,"example":8192},"email_sender_name":{"description":"This should be identical to the Sender name defined in Opensearch dashboards","type":["string"],"user_error":"Must consist of lower-case alpha-numeric characters and dashes, max 40 characters","title":"Sender email name placeholder to be used in Opensearch Dashboards and Opensearch keystore","maxLength":40,"example":"alert-sender","pattern":"^[a-zA-Z0-9-_]+$"},"indices_fielddata_cache_size":{"description":"Relative amount. Maximum amount of heap memory used for field data cache. This is an expert setting; decreasing the value too much will increase overhead of loading field data; too much memory used for field data cache will decrease amount of heap available for other operations.","default":null,"maximum":100,"type":["integer","null"],"title":"indices.fielddata.cache.size","minimum":3},"action_destructive_requires_name":{"type":["boolean","null"],"title":"Require explicit index names when deleting","example":true},"email_sender_username":{"type":["string"],"user_error":"Must be a valid email address","title":"Sender email address for Opensearch alerts","maxLength":320,"example":"jane@example.com","pattern":"^[A-Za-z0-9_\\-\\.+\\'&]+@(([\\da-zA-Z])([_\\w-]{,62})\\.){,127}(([\\da-zA-Z])[_\\w-]{,61})?([\\da-zA-Z]\\.((xn\\-\\-[a-zA-Z\\d]+)|([a-zA-Z\\d]{2,})))$"},"indices_memory_index_buffer_size":{"description":"Percentage value. Default is 10%. Total amount of heap used for indexing buffer, before writing segments to disk. This is an expert setting. Too low value will slow down indexing; too high value will increase indexing performance but causes performance issues for query performance.","maximum":40,"type":"integer","title":"indices.memory.index_buffer_size","minimum":3},"thread_pool_force_merge_size":{"description":"Size for the thread pool. See documentation for exact details. Do note this may have maximum value depending on CPU count - value is automatically lowered if set to higher than maximum value.","maximum":128,"type":"integer","title":"force_merge thread pool size","minimum":1},"cluster_routing_allocation_node_concurrent_recoveries":{"description":"How many concurrent incoming/outgoing shard recoveries (normally replicas) are allowed to happen on a node. Defaults to 2.","maximum":16,"type":"integer","title":"Concurrent incoming/outgoing shard recoveries per node","minimum":2},"email_sender_password":{"description":"Sender email password for Opensearch alerts to authenticate with SMTP server","type":["string"],"title":"Sender email password for Opensearch alerts to authenticate with SMTP server","maxLength":1024,"example":"very-secure-mail-password","pattern":"^[^\\x00-\\x1F]+$"},"thread_pool_analyze_queue_size":{"description":"Size for the thread pool queue. See documentation for exact details.","maximum":2000,"type":"integer","title":"analyze thread pool queue size","minimum":10},"action_auto_create_index_enabled":{"description":"Explicitly allow or block automatic creation of indices. Defaults to true","type":"boolean","title":"action.auto_create_index","example":false},"http_max_content_length":{"description":"Maximum content length for HTTP requests to the OpenSearch HTTP API, in bytes.","maximum":2147483647,"type":"integer","title":"http.max_content_length","minimum":1},"thread_pool_write_size":{"description":"Size for the thread pool. See documentation for exact details. Do note this may have maximum value depending on CPU count - value is automatically lowered if set to higher than maximum value.","maximum":128,"type":"integer","title":"write thread pool size","minimum":1},"thread_pool_search_queue_size":{"description":"Size for the thread pool queue. See documentation for exact details.","maximum":2000,"type":"integer","title":"search thread pool queue size","minimum":10},"indices_query_bool_max_clause_count":{"description":"Maximum number of clauses Lucene BooleanQuery can have. The default value (1024) is relatively high, and increasing it may cause performance issues. Investigate other approaches first before increasing this value.","maximum":4096,"type":"integer","title":"indices.query.bool.max_clause_count","minimum":64},"thread_pool_search_throttled_queue_size":{"description":"Size for the thread pool queue. See documentation for exact details.","maximum":2000,"type":"integer","title":"search_throttled thread pool queue size","minimum":10},"cluster_max_shards_per_node":{"description":"Controls the number of shards allowed in the cluster per data node","maximum":10000,"type":"integer","title":"cluster.max_shards_per_node","minimum":100,"example":1000}},"additionalProperties":false,"type":"object","title":"OpenSearch settings","dependencies":{"email_sender_name":{"required":["email_sender_username","email_sender_password"]},"email_sender_username":{"required":["email_sender_name","email_sender_password"]},"email_sender_password":{"required":["email_sender_username","email_sender_name"]}}}}}`) diff --git a/operator/postgresqlcontroller/observe.go b/operator/postgresqlcontroller/observe.go index 2e712c88..c4550b19 100644 --- a/operator/postgresqlcontroller/observe.go +++ b/operator/postgresqlcontroller/observe.go @@ -53,9 +53,15 @@ func (p *pipeline) Observe(ctx context.Context, mg resource.Managed) (managed.Ex if err != nil { return managed.ExternalObservation{}, errors.Wrap(err, "cannot read connection details") } + + currentParams, err := setSettingsDefaults(ctx, p.exo, &pgInstance.Spec.ForProvider) + if err != nil { + log.Error(err, "unable to set postgres settings schema") + currentParams = &pgInstance.Spec.ForProvider + } return managed.ExternalObservation{ ResourceExists: true, - ResourceUpToDate: isUpToDate(&pgInstance.Spec.ForProvider, pp, log), + ResourceUpToDate: isUpToDate(currentParams, pp, log), ConnectionDetails: connDetails, }, nil } diff --git a/operator/postgresqlcontroller/settings.go b/operator/postgresqlcontroller/settings.go new file mode 100644 index 00000000..fb65d9e3 --- /dev/null +++ b/operator/postgresqlcontroller/settings.go @@ -0,0 +1,41 @@ +package postgresqlcontroller + +import ( + "context" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + + "github.com/exoscale/egoscale/v2/oapi" + "github.com/vshn/provider-exoscale/internal/settings" +) + +type settingsFetcher interface { + GetDbaasSettingsPgWithResponse(ctx context.Context, reqEditors ...oapi.RequestEditorFn) (*oapi.GetDbaasSettingsPgResponse, error) +} + +func setSettingsDefaults(ctx context.Context, f settingsFetcher, in *exoscalev1.PostgreSQLParameters) (*exoscalev1.PostgreSQLParameters, error) { + s, err := fetchSettingSchema(ctx, f) + if err != nil { + return nil, err + } + res := in.DeepCopy() + + res.PGSettings, err = s.SetDefaults("pg", res.PGSettings) + if err != nil { + return nil, err + } + + return res, nil +} + +func fetchSettingSchema(ctx context.Context, f settingsFetcher) (settings.Schemas, error) { + resp, err := f.GetDbaasSettingsPgWithResponse(ctx) + if err != nil { + return nil, err + } + schemas, err := settings.ParseSchemas(resp.Body) + if err != nil { + return nil, err + } + return schemas, nil +} diff --git a/operator/postgresqlcontroller/settings_test.go b/operator/postgresqlcontroller/settings_test.go new file mode 100644 index 00000000..7cc969a1 --- /dev/null +++ b/operator/postgresqlcontroller/settings_test.go @@ -0,0 +1,52 @@ +package postgresqlcontroller + +import ( + "context" + "testing" + + "github.com/exoscale/egoscale/v2/oapi" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/operator/mapper" + "k8s.io/apimachinery/pkg/runtime" +) + +type fakeSettingsFetcher struct{} + +func (fakeSettingsFetcher) GetDbaasSettingsPgWithResponse(ctx context.Context, reqEditors ...oapi.RequestEditorFn) (*oapi.GetDbaasSettingsPgResponse, error) { + return &oapi.GetDbaasSettingsPgResponse{ + Body: rawResponse, + }, nil +} + +func mustToRawExt(t *testing.T, set map[string]interface{}) runtime.RawExtension { + res, err := mapper.ToRawExtension(&set) + require.NoError(t, err, "failed to parse input setting") + return res +} + +func TestDefaultSettings(t *testing.T) { + found := exoscalev1.PostgreSQLParameters{ + Maintenance: exoscalev1.MaintenanceSpec{}, + Zone: "gva-2", + DBaaSParameters: exoscalev1.DBaaSParameters{ + TerminationProtection: false, + Size: exoscalev1.SizeSpec{ + Plan: "startup-4", + }, + }, + PGSettings: mustToRawExt(t, map[string]interface{}{ + "track_activity_query_size": 1025, + }), + } + + withDefaults, err := setSettingsDefaults(context.Background(), fakeSettingsFetcher{}, &found) + require.NoError(t, err, "failed to set defaults") + setingsWithDefaults, err := mapper.ToMap(withDefaults.PGSettings) + require.NoError(t, err, "failed to parse set defaults") + assert.EqualValues(t, 1025, setingsWithDefaults["track_activity_query_size"]) + assert.Len(t, setingsWithDefaults, 1) +} + +var rawResponse = []byte(`{"settings":{"pg":{"properties":{"track_activity_query_size":{"description":"Specifies the number of bytes reserved to track the currently executing command for each active session.","maximum":10240,"type":"integer","title":"track_activity_query_size","minimum":1024,"example":1024},"log_autovacuum_min_duration":{"description":"Causes each action executed by autovacuum to be logged if it ran for at least the specified number of milliseconds. Setting this to zero logs all autovacuum actions. Minus-one (the default) disables logging autovacuum actions.","maximum":2147483647,"type":"integer","title":"log_autovacuum_min_duration","minimum":-1},"autovacuum_vacuum_cost_limit":{"description":"Specifies the cost limit value that will be used in automatic VACUUM operations. If -1 is specified (which is the default), the regular vacuum_cost_limit value will be used.","maximum":10000,"type":"integer","title":"autovacuum_vacuum_cost_limit","minimum":-1},"timezone":{"description":"PostgreSQL service timezone","type":"string","title":"timezone","maxLength":64,"example":"Europe/Helsinki"},"track_io_timing":{"description":"Enables timing of database I/O calls. This parameter is off by default, because it will repeatedly query the operating system for the current time, which may cause significant overhead on some platforms.","enum":["off","on"],"type":"string","title":"track_io_timing","example":"off"},"pg_stat_monitor.pgsm_enable_query_plan":{"description":"Enables or disables query plan monitoring","type":"boolean","title":"pg_stat_monitor.pgsm_enable_query_plan","example":false},"max_files_per_process":{"description":"PostgreSQL maximum number of files that can be open per process","maximum":4096,"type":"integer","title":"max_files_per_process","minimum":1000},"pg_stat_monitor.pgsm_max_buckets":{"description":"Sets the maximum number of buckets ","maximum":10,"type":"integer","title":"pg_stat_monitor.pgsm_max_buckets","minimum":1,"example":10},"bgwriter_delay":{"description":"Specifies the delay between activity rounds for the background writer in milliseconds. Default is 200.","maximum":10000,"type":"integer","title":"bgwriter_delay","minimum":10,"example":200},"autovacuum_max_workers":{"description":"Specifies the maximum number of autovacuum processes (other than the autovacuum launcher) that may be running at any one time. The default is three. This parameter can only be set at server start.","maximum":20,"type":"integer","title":"autovacuum_max_workers","minimum":1},"bgwriter_flush_after":{"description":"Whenever more than bgwriter_flush_after bytes have been written by the background writer, attempt to force the OS to issue these writes to the underlying storage. Specified in kilobytes, default is 512. Setting of 0 disables forced writeback.","maximum":2048,"type":"integer","title":"bgwriter_flush_after","minimum":0,"example":512},"default_toast_compression":{"description":"Specifies the default TOAST compression method for values of compressible columns (the default is lz4).","enum":["lz4","pglz"],"type":"string","title":"default_toast_compression","example":"lz4"},"deadlock_timeout":{"description":"This is the amount of time, in milliseconds, to wait on a lock before checking to see if there is a deadlock condition.","maximum":1800000,"type":"integer","title":"deadlock_timeout","minimum":500,"example":1000},"idle_in_transaction_session_timeout":{"description":"Time out sessions with open transactions after this number of milliseconds","maximum":604800000,"type":"integer","title":"idle_in_transaction_session_timeout","minimum":0},"max_pred_locks_per_transaction":{"description":"PostgreSQL maximum predicate locks per transaction","maximum":5120,"type":"integer","title":"max_pred_locks_per_transaction","minimum":64},"max_replication_slots":{"description":"PostgreSQL maximum replication slots","maximum":64,"type":"integer","title":"max_replication_slots","minimum":8},"autovacuum_vacuum_threshold":{"description":"Specifies the minimum number of updated or deleted tuples needed to trigger a VACUUM in any one table. The default is 50 tuples","maximum":2147483647,"type":"integer","title":"autovacuum_vacuum_threshold","minimum":0},"max_parallel_workers_per_gather":{"description":"Sets the maximum number of workers that can be started by a single Gather or Gather Merge node","maximum":96,"type":"integer","title":"max_parallel_workers_per_gather","minimum":0},"bgwriter_lru_multiplier":{"description":"The average recent need for new buffers is multiplied by bgwriter_lru_multiplier to arrive at an estimate of the number that will be needed during the next round, (up to bgwriter_lru_maxpages). 1.0 represents a “just in time” policy of writing exactly the number of buffers predicted to be needed. Larger values provide some cushion against spikes in demand, while smaller values intentionally leave writes to be done by server processes. The default is 2.0.","maximum":10,"type":"number","title":"bgwriter_lru_multiplier","minimum":0,"example":2.0},"pg_partman_bgw.interval":{"description":"Sets the time interval to run pg_partman's scheduled tasks","maximum":604800,"type":"integer","title":"pg_partman_bgw.interval","minimum":3600,"example":3600},"autovacuum_naptime":{"description":"Specifies the minimum delay between autovacuum runs on any given database. The delay is measured in seconds, and the default is one minute","maximum":86400,"type":"integer","title":"autovacuum_naptime","minimum":1},"log_line_prefix":{"description":"Choose from one of the available log-formats. These can support popular log analyzers like pgbadger, pganalyze etc.","enum":["'pid=%p,user=%u,db=%d,app=%a,client=%h '","'%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h '","'%m [%p] %q[user=%u,db=%d,app=%a] '"],"type":"string","title":"log_line_prefix"},"log_temp_files":{"description":"Log statements for each temporary file created larger than this number of kilobytes, -1 disables","maximum":2147483647,"type":"integer","title":"log_temp_files","minimum":-1},"max_locks_per_transaction":{"description":"PostgreSQL maximum locks per transaction","maximum":6400,"type":"integer","title":"max_locks_per_transaction","minimum":64},"autovacuum_vacuum_scale_factor":{"description":"Specifies a fraction of the table size to add to autovacuum_vacuum_threshold when deciding whether to trigger a VACUUM. The default is 0.2 (20% of table size)","maximum":1.0,"type":"number","title":"autovacuum_vacuum_scale_factor","minimum":0.0},"wal_writer_delay":{"description":"WAL flush interval in milliseconds. Note that setting this value to lower than the default 200ms may negatively impact performance","maximum":200,"type":"integer","title":"wal_writer_delay","minimum":10,"example":50},"track_commit_timestamp":{"description":"Record commit time of transactions.","enum":["off","on"],"type":"string","title":"track_commit_timestamp","example":"off"},"track_functions":{"description":"Enables tracking of function call counts and time used.","enum":["all","pl","none"],"type":"string","title":"track_functions"},"wal_sender_timeout":{"description":"Terminate replication connections that are inactive for longer than this amount of time, in milliseconds. Setting this value to zero disables the timeout.","anyOf":[{"maximum":0,"minimum":0},{"maximum":10800000,"minimum":5000}],"type":"integer","user_error":"Must be either 0 or between 5000 and 10800000.","title":"wal_sender_timeout","example":60000},"autovacuum_vacuum_cost_delay":{"description":"Specifies the cost delay value that will be used in automatic VACUUM operations. If -1 is specified, the regular vacuum_cost_delay value will be used. The default value is 20 milliseconds","maximum":100,"type":"integer","title":"autovacuum_vacuum_cost_delay","minimum":-1},"max_stack_depth":{"description":"Maximum depth of the stack in bytes","maximum":6291456,"type":"integer","title":"max_stack_depth","minimum":2097152},"max_parallel_workers":{"description":"Sets the maximum number of workers that the system can support for parallel queries","maximum":96,"type":"integer","title":"max_parallel_workers","minimum":0},"pg_partman_bgw.role":{"description":"Controls which role to use for pg_partman's scheduled background tasks.","type":"string","user_error":"Must consist of alpha-numeric characters, dots, underscores or dashes, may not start with dash or dot, max 64 characters","title":"pg_partman_bgw.role","maxLength":64,"example":"myrolename","pattern":"^[_A-Za-z0-9][-._A-Za-z0-9]{0,63}$"},"max_wal_senders":{"description":"PostgreSQL maximum WAL senders","maximum":64,"type":"integer","title":"max_wal_senders","minimum":20},"max_logical_replication_workers":{"description":"PostgreSQL maximum logical replication workers (taken from the pool of max_parallel_workers)","maximum":64,"type":"integer","title":"max_logical_replication_workers","minimum":4},"autovacuum_analyze_scale_factor":{"description":"Specifies a fraction of the table size to add to autovacuum_analyze_threshold when deciding whether to trigger an ANALYZE. The default is 0.2 (20% of table size)","maximum":1.0,"type":"number","title":"autovacuum_analyze_scale_factor","minimum":0.0},"max_prepared_transactions":{"description":"PostgreSQL maximum prepared transactions","maximum":10000,"type":"integer","title":"max_prepared_transactions","minimum":0},"autovacuum_analyze_threshold":{"description":"Specifies the minimum number of inserted, updated or deleted tuples needed to trigger an ANALYZE in any one table. The default is 50 tuples.","maximum":2147483647,"type":"integer","title":"autovacuum_analyze_threshold","minimum":0},"max_worker_processes":{"description":"Sets the maximum number of background processes that the system can support","maximum":96,"type":"integer","title":"max_worker_processes","minimum":8},"pg_stat_statements.track":{"description":"Controls which statements are counted. Specify top to track top-level statements (those issued directly by clients), all to also track nested statements (such as statements invoked within functions), or none to disable statement statistics collection. The default value is top.","enum":["all","top","none"],"type":["string"],"title":"pg_stat_statements.track"},"temp_file_limit":{"description":"PostgreSQL temporary file limit in KiB, -1 for unlimited","maximum":2147483647,"type":"integer","title":"temp_file_limit","minimum":-1,"example":5000000},"bgwriter_lru_maxpages":{"description":"In each round, no more than this many buffers will be written by the background writer. Setting this to zero disables background writing. Default is 100.","maximum":1073741823,"type":"integer","title":"bgwriter_lru_maxpages","minimum":0,"example":100},"log_error_verbosity":{"description":"Controls the amount of detail written in the server log for each message that is logged.","enum":["TERSE","DEFAULT","VERBOSE"],"type":"string","title":"log_error_verbosity"},"autovacuum_freeze_max_age":{"description":"Specifies the maximum age (in transactions) that a table's pg_class.relfrozenxid field can attain before a VACUUM operation is forced to prevent transaction ID wraparound within the table. Note that the system will launch autovacuum processes to prevent wraparound even when autovacuum is otherwise disabled. This parameter will cause the server to be restarted.","maximum":1500000000,"type":"integer","title":"autovacuum_freeze_max_age","minimum":200000000,"example":200000000},"log_min_duration_statement":{"description":"Log statements that take more than this number of milliseconds to run, -1 disables","maximum":86400000,"type":"integer","title":"log_min_duration_statement","minimum":-1},"max_standby_streaming_delay":{"description":"Max standby streaming delay in milliseconds","maximum":43200000,"type":"integer","title":"max_standby_streaming_delay","minimum":1},"jit":{"description":"Controls system-wide use of Just-in-Time Compilation (JIT).","type":"boolean","title":"jit","example":true},"max_standby_archive_delay":{"description":"Max standby archive delay in milliseconds","maximum":43200000,"type":"integer","title":"max_standby_archive_delay","minimum":1},"max_slot_wal_keep_size":{"description":"PostgreSQL maximum WAL size (MB) reserved for replication slots. Default is -1 (unlimited). wal_keep_size minimum WAL size setting takes precedence over this.","maximum":2147483647,"type":"integer","title":"max_slot_wal_keep_size","minimum":-1}},"additionalProperties":false,"type":"object","title":"postgresql.conf configuration values"},"pglookout":{"properties":{"max_failover_replication_time_lag":{"description":"Number of seconds of master unavailability before triggering database failover to standby","default":60,"maximum":9223372036854775807,"type":"integer","title":"max_failover_replication_time_lag","minimum":10}},"additionalProperties":false,"default":{"max_failover_replication_time_lag":60},"type":"object","title":"PGLookout settings"},"pgbouncer":{"properties":{"min_pool_size":{"maximum":10000,"type":"integer","title":"Add more server connections to pool if below this number. Improves behavior when usual load comes suddenly back after period of total inactivity. The value is effectively capped at the pool size.","minimum":0,"example":0},"ignore_startup_parameters":{"type":"array","title":"List of parameters to ignore when given in startup packet","example":["extra_float_digits","search_path"],"items":{"enum":["extra_float_digits","search_path"],"type":"string","title":"Enum of parameters to ignore when given in startup packet"},"maxItems":32},"server_lifetime":{"maximum":86400,"type":"integer","title":"The pooler will close an unused server connection that has been connected longer than this. [seconds]","minimum":60,"example":3600},"autodb_pool_mode":{"enum":["session","transaction","statement"],"type":"string","title":"PGBouncer pool mode","example":"session"},"server_idle_timeout":{"maximum":86400,"type":"integer","title":"If a server connection has been idle more than this many seconds it will be dropped. If 0 then timeout is disabled. [seconds]","minimum":0,"example":600},"autodb_max_db_connections":{"maximum":2147483647,"type":"integer","title":"Do not allow more than this many server connections per database (regardless of user). Setting it to 0 means unlimited.","minimum":0,"example":0},"server_reset_query_always":{"type":"boolean","title":"Run server_reset_query (DISCARD ALL) in all pooling modes","example":false},"autodb_pool_size":{"maximum":10000,"type":"integer","title":"If non-zero then create automatically a pool of that size per user when a pool doesn't exist.","minimum":0,"example":0},"autodb_idle_timeout":{"maximum":86400,"type":"integer","title":"If the automatically created database pools have been unused this many seconds, they are freed. If 0 then timeout is disabled. [seconds]","minimum":0,"example":3600}},"additionalProperties":false,"type":"object","title":"PGBouncer connection pooling settings"},"timescaledb":{"properties":{"max_background_workers":{"description":"The number of background workers for timescaledb operations. You should configure this setting to the sum of your number of databases and the total number of concurrent background workers you want running at any given point in time.","maximum":4096,"type":"integer","title":"timescaledb.max_background_workers","minimum":1,"example":8}},"additionalProperties":false,"type":"object","title":"TimescaleDB extension configuration values"}}}`) diff --git a/operator/rediscontroller/controller_test.go b/operator/rediscontroller/controller_test.go index 6d680946..ef289bc4 100644 --- a/operator/rediscontroller/controller_test.go +++ b/operator/rediscontroller/controller_test.go @@ -41,6 +41,11 @@ func (ts *RedisControllerTestSuite) SetupTest() { nopRecorder := event.NewNopRecorder() ts.p = newPipeline(ts.Client, nopRecorder, ts.ExoClientMock) + ts.ExoClientMock.On("GetDbaasSettingsRedisWithResponse", mock.Anything). + Return(&oapi.GetDbaasSettingsRedisResponse{ + Body: rawResponse, + }, nil) + ts.reconciler = createReconciler(ts.Manager, "testredis", nopRecorder, &connector{ kube: ts.Client, recorder: nopRecorder, @@ -64,7 +69,9 @@ func (ts *RedisControllerTestSuite) TestCreate() { Return(&oapi.CreateDbaasServiceRedisResponse{Body: []byte{}}, nil). Once() - redisResponse := ts.getRedisResponse(mg, settings) + respSet := emptyRedisSettings + respSet["maxmemory_policy"] = "noeviction" + redisResponse := ts.getRedisResponse(mg, respSet) ts.ExoClientMock.On("GetDbaasServiceRedisWithResponse", mock.Anything, oapi.DbaasServiceName(name)). Return(redisResponse, nil) @@ -108,6 +115,43 @@ func (ts *RedisControllerTestSuite) TestCreate() { ts.ExoClientMock.AssertExpectations(ts.T()) } +func (ts *RedisControllerTestSuite) TestNoChange() { + name := "test-nochange" + settings := map[string]any{ + "maxmemory_policy": "noeviction", + } + + mg := newRedisInstance(name, settings) + mg.Status = exoscalev1.RedisStatus{ + ResourceStatus: xpv1.ResourceStatus{}, + AtProvider: exoscalev1.RedisObservation{}, + } + respSet := emptyRedisSettings + respSet["maxmemory_policy"] = "noeviction" + redisResponse := ts.getRedisResponse(mg, respSet) + + ts.ExoClientMock.On("GetDbaasServiceRedisWithResponse", mock.Anything, oapi.DbaasServiceName(name)). + Return(redisResponse, nil).Once() + + ts.EnsureResources(mg) + + _, err := ts.reconcile(reconcile.Request{NamespacedName: types.NamespacedName{Name: mg.Name, Namespace: mg.Namespace}}) + ts.Assert().NoError(err) + + instance := &exoscalev1.Redis{} + ts.FetchResource(types.NamespacedName{Name: mg.Name, Namespace: mg.Namespace}, instance) + + synced := false + for _, condition := range instance.Status.Conditions { + ts.Assert().Equal(corev1.ConditionTrue, condition.Status) + ts.Assert().Equal(xpv1.ReasonReconcileSuccess, condition.Reason) + synced = true + } + ts.Assert().True(synced, "not synced") + + ts.ExoClientMock.AssertExpectations(ts.T()) +} + func (ts *RedisControllerTestSuite) TestUpdate() { name := "test-update" settings := map[string]any{ @@ -119,7 +163,9 @@ func (ts *RedisControllerTestSuite) TestUpdate() { ResourceStatus: xpv1.ResourceStatus{}, AtProvider: exoscalev1.RedisObservation{}, } - redisResponse := ts.getRedisResponse(mg, settings) + respSet := emptyRedisSettings + respSet["maxmemory_policy"] = "noeviction" + redisResponse := ts.getRedisResponse(mg, respSet) ts.ExoClientMock.On("UpdateDbaasServiceRedisWithResponse", mock.Anything, oapi.DbaasServiceName(name), mock.Anything). Return(&oapi.UpdateDbaasServiceRedisResponse{Body: []byte{}}, nil). @@ -131,7 +177,7 @@ func (ts *RedisControllerTestSuite) TestUpdate() { ts.EnsureResources(mg) - updatedResponse := ts.getRedisResponse(mg, settings) + updatedResponse := ts.getRedisResponse(mg, respSet) updatedResponse.JSON200.Maintenance.Dow = oapi.DbaasServiceMaintenanceDowFriday ts.ExoClientMock.On("GetDbaasServiceRedisWithResponse", mock.Anything, oapi.DbaasServiceName(name)). Return(updatedResponse, nil) @@ -185,7 +231,9 @@ func (ts *RedisControllerTestSuite) TestDelete() { ResourceStatus: xpv1.ResourceStatus{}, AtProvider: exoscalev1.RedisObservation{}, } - redisResponse := ts.getRedisResponse(mg, settings) + respSet := emptyRedisSettings + respSet["maxmemory_policy"] = "noeviction" + redisResponse := ts.getRedisResponse(mg, respSet) ts.EnsureResources(mg) diff --git a/operator/rediscontroller/observe.go b/operator/rediscontroller/observe.go index 7f53d2c2..3f34f56b 100644 --- a/operator/rediscontroller/observe.go +++ b/operator/rediscontroller/observe.go @@ -67,9 +67,15 @@ func (p pipeline) Observe(ctx context.Context, mg resource.Managed) (managed.Ext return managed.ExternalObservation{}, fmt.Errorf("unable to parse connection details: %w", err) } + currentParams, err := setSettingsDefaults(ctx, p.exo, &redisInstance.Spec.ForProvider) + if err != nil { + log.Error(err, "unable to set redis settings schema") + currentParams = &redisInstance.Spec.ForProvider + } + observation := managed.ExternalObservation{ ResourceExists: true, - ResourceUpToDate: isUpToDate(&redisInstance.Spec.ForProvider, rp, log), + ResourceUpToDate: isUpToDate(currentParams, rp, log), ResourceLateInitialized: false, ConnectionDetails: cd, } diff --git a/operator/rediscontroller/settings.go b/operator/rediscontroller/settings.go new file mode 100644 index 00000000..d664fc68 --- /dev/null +++ b/operator/rediscontroller/settings.go @@ -0,0 +1,41 @@ +package rediscontroller + +import ( + "context" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + + "github.com/exoscale/egoscale/v2/oapi" + "github.com/vshn/provider-exoscale/internal/settings" +) + +type settingsFetcher interface { + GetDbaasSettingsRedisWithResponse(ctx context.Context, reqEditors ...oapi.RequestEditorFn) (*oapi.GetDbaasSettingsRedisResponse, error) +} + +func setSettingsDefaults(ctx context.Context, f settingsFetcher, in *exoscalev1.RedisParameters) (*exoscalev1.RedisParameters, error) { + s, err := fetchSettingSchema(ctx, f) + if err != nil { + return nil, err + } + res := in.DeepCopy() + + res.RedisSettings, err = s.SetDefaults("redis", res.RedisSettings) + if err != nil { + return nil, err + } + + return res, nil +} + +func fetchSettingSchema(ctx context.Context, f settingsFetcher) (settings.Schemas, error) { + resp, err := f.GetDbaasSettingsRedisWithResponse(ctx) + if err != nil { + return nil, err + } + schemas, err := settings.ParseSchemas(resp.Body) + if err != nil { + return nil, err + } + return schemas, nil +} diff --git a/operator/rediscontroller/settings_test.go b/operator/rediscontroller/settings_test.go new file mode 100644 index 00000000..97b0f619 --- /dev/null +++ b/operator/rediscontroller/settings_test.go @@ -0,0 +1,64 @@ +package rediscontroller + +import ( + "context" + "testing" + + "github.com/exoscale/egoscale/v2/oapi" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/operator/mapper" +) + +var rawResponse = []byte(`{"settings":{"redis":{"type":"object","title":"Redis settings","properties":{"ssl":{"default":true,"type":"boolean","title":"Require SSL to access Redis"},"lfu_log_factor":{"default":10,"maximum":100,"type":"integer","title":"Counter logarithm factor for volatile-lfu and allkeys-lfu maxmemory-policies","minimum":0},"maxmemory_policy":{"enum":["noeviction","allkeys-lru","volatile-lru","allkeys-random","volatile-random","volatile-ttl","volatile-lfu","allkeys-lfu"],"default":"noeviction","type":["string","null"],"title":"Redis maxmemory-policy"},"io_threads":{"maximum":32,"type":"integer","title":"Redis IO thread count","minimum":1,"example":1},"lfu_decay_time":{"default":1,"maximum":120,"type":"integer","title":"LFU maxmemory-policy counter decay time in minutes","minimum":1},"pubsub_client_output_buffer_limit":{"description":"Set output buffer limit for pub / sub clients in MB. The value is the hard limit, the soft limit is 1/4 of the hard limit. When setting the limit, be mindful of the available memory in the selected service plan.","maximum":512,"type":"integer","title":"Pub/sub client output buffer hard limit in MB","minimum":32,"example":64},"notify_keyspace_events":{"default":"","type":"string","title":"Set notify-keyspace-events option","maxLength":32,"pattern":"^[KEg\\$lshzxeA]*$"},"persistence":{"description":"When persistence is 'rdb', Redis does RDB dumps each 10 minutes if any key is changed. Also RDB dumps are done according to backup schedule for backup purposes. When persistence is 'off', no RDB dumps and backups are done, so data can be lost at any moment if service is restarted for any reason, or if service is powered off. Also service can't be forked.","enum":["off","rdb"],"type":"string","title":"Redis persistence"},"timeout":{"default":300,"maximum":31536000,"type":"integer","title":"Redis idle connection timeout in seconds","minimum":0},"acl_channels_default":{"description":"Determines default pub/sub channels' ACL for new users if ACL is not supplied. When this option is not defined, all_channels is assumed to keep backward compatibility. This option doesn't affect Redis configuration acl-pubsub-default.","enum":["allchannels","resetchannels"],"type":"string","title":"Default ACL for pub/sub channels used when Redis user is created"},"number_of_databases":{"description":"Set number of redis databases. Changing this will cause a restart of redis service.","maximum":128,"type":"integer","title":"Number of redis databases","minimum":1,"example":16}}}}}`) + +//nolint:golint,unused +var emptyRedisSettings = map[string]interface{}{ + "lfu_decay_time": 1, + "ssl": true, + "lfu_log_factor": 10, + "notify_keyspace_events": "", + "timeout": 300, + "maxmemory_policy": "noeviction", +} + +type fakeSettingsFetcher struct{} + +func (fakeSettingsFetcher) GetDbaasSettingsRedisWithResponse(ctx context.Context, reqEditors ...oapi.RequestEditorFn) (*oapi.GetDbaasSettingsRedisResponse, error) { + return &oapi.GetDbaasSettingsRedisResponse{ + Body: rawResponse, + }, nil +} + +func TestDefaultSettings(t *testing.T) { + foundSettings := map[string]interface{}{ + "lfu_decay_time": 2, + "persistence": "rdb", + } + foundSettingRaw, err := mapper.ToRawExtension(&foundSettings) + require.NoError(t, err, "failed to parse input setting") + found := exoscalev1.RedisParameters{ + Maintenance: exoscalev1.MaintenanceSpec{}, + Zone: "gva-2", + DBaaSParameters: exoscalev1.DBaaSParameters{ + TerminationProtection: false, + Size: exoscalev1.SizeSpec{ + Plan: "startup-4", + }, + }, + RedisSettings: foundSettingRaw, + } + + withDefaults, err := setSettingsDefaults(context.Background(), fakeSettingsFetcher{}, &found) + require.NoError(t, err, "failed to set defaults") + setingsWithDefaults, err := mapper.ToMap(withDefaults.RedisSettings) + require.NoError(t, err, "failed to parse set defaults") + assert.EqualValues(t, 2, setingsWithDefaults["lfu_decay_time"]) + assert.EqualValues(t, "rdb", setingsWithDefaults["persistence"]) + assert.EqualValues(t, true, setingsWithDefaults["ssl"]) + assert.EqualValues(t, 10, setingsWithDefaults["lfu_log_factor"]) + assert.EqualValues(t, "", setingsWithDefaults["notify_keyspace_events"]) + assert.EqualValues(t, 300, setingsWithDefaults["timeout"]) + assert.EqualValues(t, "noeviction", setingsWithDefaults["maxmemory_policy"]) +}