Skip to content

Commit

Permalink
fix(operator): Allow structured metadata only if V13 schema provided
Browse files Browse the repository at this point in the history
  • Loading branch information
periklis committed Jul 10, 2024
1 parent 845359d commit 87c38a2
Show file tree
Hide file tree
Showing 8 changed files with 593 additions and 24 deletions.
5 changes: 5 additions & 0 deletions operator/internal/manifests/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/grafana/loki/operator/internal/manifests/internal/config"
"github.com/grafana/loki/operator/internal/manifests/storage"
)

// BuildDistributor returns a list of k8s objects for Loki Distributor
Expand Down Expand Up @@ -44,6 +45,10 @@ func BuildDistributor(opts Options) ([]client.Object, error) {
}
}

if err := storage.ConfigureDeployment(deployment, opts.ObjectStorage); err != nil {
return nil, err
}

if err := configureHashRingEnv(&deployment.Spec.Template.Spec, opts); err != nil {
return nil, err
}
Expand Down
56 changes: 33 additions & 23 deletions operator/internal/manifests/internal/config/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -371,7 +371,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -797,7 +797,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -1155,7 +1155,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -1514,7 +1514,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -1911,7 +1911,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -2241,7 +2241,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -2680,7 +2680,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -3004,7 +3004,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -3501,7 +3501,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_addr: ${HASH_RING_INSTANCE_ADDR}
Expand Down Expand Up @@ -3762,7 +3762,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_addr: ${HASH_RING_INSTANCE_ADDR}
Expand Down Expand Up @@ -4024,7 +4024,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -4287,7 +4287,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -4586,7 +4586,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -4880,7 +4880,7 @@ limits_config:
query_timeout: 1m
volume_enabled: true
volume_max_series: 1000
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -5124,11 +5124,12 @@ func defaultOptions() Options {

func TestBuild_ConfigAndRuntimeConfig_Schemas(t *testing.T) {
for _, tc := range []struct {
name string
schemaConfig []lokiv1.ObjectStorageSchema
shippers []string
expSchemaConfig string
expStorageConfig string
name string
schemaConfig []lokiv1.ObjectStorageSchema
shippers []string
expSchemaConfig string
expStorageConfig string
expStructuredMetadata string
}{
{
name: "default_config_v11_schema",
Expand Down Expand Up @@ -5156,6 +5157,8 @@ func TestBuild_ConfigAndRuntimeConfig_Schemas(t *testing.T) {
resync_interval: 5m
index_gateway_client:
server_address: dns:///loki-index-gateway-grpc-lokistack-dev.default.svc.cluster.local:9095`,
expStructuredMetadata: `
allow_structured_metadata: false`,
},
{
name: "v12_schema",
Expand Down Expand Up @@ -5183,6 +5186,8 @@ func TestBuild_ConfigAndRuntimeConfig_Schemas(t *testing.T) {
resync_interval: 5m
index_gateway_client:
server_address: dns:///loki-index-gateway-grpc-lokistack-dev.default.svc.cluster.local:9095`,
expStructuredMetadata: `
allow_structured_metadata: false`,
},
{
name: "v13_schema",
Expand Down Expand Up @@ -5210,6 +5215,8 @@ func TestBuild_ConfigAndRuntimeConfig_Schemas(t *testing.T) {
resync_interval: 5m
index_gateway_client:
server_address: dns:///loki-index-gateway-grpc-lokistack-dev.default.svc.cluster.local:9095`,
expStructuredMetadata: `
allow_structured_metadata: true`,
},
{
name: "multiple_schema",
Expand Down Expand Up @@ -5266,6 +5273,8 @@ func TestBuild_ConfigAndRuntimeConfig_Schemas(t *testing.T) {
resync_interval: 5m
index_gateway_client:
server_address: dns:///loki-index-gateway-grpc-lokistack-dev.default.svc.cluster.local:9095`,
expStructuredMetadata: `
allow_structured_metadata: true`,
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand Down Expand Up @@ -5366,7 +5375,7 @@ limits_config:
query_timeout: 1m
volume_enabled: true
volume_max_series: 1000
allow_structured_metadata: true
${STORAGE_STRUCTURED_METADATA}
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -5416,6 +5425,7 @@ analytics:
`
expCfg = strings.Replace(expCfg, "${SCHEMA_CONFIG}", tc.expSchemaConfig, 1)
expCfg = strings.Replace(expCfg, "${STORAGE_CONFIG}", tc.expStorageConfig, 1)
expCfg = strings.Replace(expCfg, "${STORAGE_STRUCTURED_METADATA}", tc.expStructuredMetadata, 1)

opts := defaultOptions()
opts.ObjectStorage.Schemas = tc.schemaConfig
Expand Down Expand Up @@ -5540,7 +5550,7 @@ limits_config:
query_timeout: 1m
volume_enabled: true
volume_max_series: 1000
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down Expand Up @@ -5712,7 +5722,7 @@ limits_config:
shard_streams:
enabled: true
desired_rate: 3MB
allow_structured_metadata: true
allow_structured_metadata: false
memberlist:
abort_if_cluster_join_fails: true
advertise_port: 7946
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ limits_config:
enabled: true
desired_rate: {{ . }}MB
{{- end }}
allow_structured_metadata: true
allow_structured_metadata: {{ .ObjectStorage.AllowStructuredMetadata }}
{{- with .GossipRing }}
memberlist:
abort_if_cluster_join_fails: true
Expand Down
49 changes: 49 additions & 0 deletions operator/internal/manifests/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

lokiv1 "github.com/grafana/loki/operator/apis/loki/v1"
"github.com/grafana/loki/operator/internal/manifests/internal/config"
"github.com/grafana/loki/operator/internal/manifests/storage"
)

func TestNewTimeoutConfig_ReturnsDefaults_WhenLimitsSpecEmpty(t *testing.T) {
Expand Down Expand Up @@ -190,3 +191,51 @@ func TestNewTimeoutConfig_ReturnsDefaults_WhenTenantQueryTimeoutParseError(t *te
_, err := NewTimeoutConfig(s.Spec.Limits)
require.Error(t, err)
}

func TestAllowStructuredMetadata_ReturnsTrue(t *testing.T) {
opts := Options{
ObjectStorage: storage.Options{
Schemas: []lokiv1.ObjectStorageSchema{
{Version: lokiv1.ObjectStorageSchemaV13},
},
},
}

require.True(t, opts.ObjectStorage.AllowStructuredMetadata())
}

func TestAllowStructuredMetadata_ReturnsTrue_WithFutureVersions(t *testing.T) {
opts := Options{
ObjectStorage: storage.Options{
Schemas: []lokiv1.ObjectStorageSchema{
{Version: lokiv1.ObjectStorageSchemaVersion("v14")},
},
},
}

require.True(t, opts.ObjectStorage.AllowStructuredMetadata())
}

func TestAllowStructuredMetadata_ReturnsFalse_WithV11(t *testing.T) {
opts := Options{
ObjectStorage: storage.Options{
Schemas: []lokiv1.ObjectStorageSchema{
{Version: lokiv1.ObjectStorageSchemaV11},
},
},
}

require.False(t, opts.ObjectStorage.AllowStructuredMetadata())
}

func TestAllowStructuredMetadata_ReturnsFalse_WithV12(t *testing.T) {
opts := Options{
ObjectStorage: storage.Options{
Schemas: []lokiv1.ObjectStorageSchema{
{Version: lokiv1.ObjectStorageSchemaV12},
},
},
}

require.False(t, opts.ObjectStorage.AllowStructuredMetadata())
}
5 changes: 5 additions & 0 deletions operator/internal/manifests/query-frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/grafana/loki/operator/internal/manifests/internal/config"
"github.com/grafana/loki/operator/internal/manifests/storage"
)

// BuildQueryFrontend returns a list of k8s objects for Loki QueryFrontend
Expand Down Expand Up @@ -44,6 +45,10 @@ func BuildQueryFrontend(opts Options) ([]client.Object, error) {
}
}

if err := storage.ConfigureDeployment(deployment, opts.ObjectStorage); err != nil {
return nil, err
}

if err := configureHashRingEnv(&deployment.Spec.Template.Spec, opts); err != nil {
return nil, err
}
Expand Down
26 changes: 26 additions & 0 deletions operator/internal/manifests/storage/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ func configureDeployment(d *appsv1.Deployment, opts Options) error {
return kverrors.Wrap(err, "failed to merge gcs object storage spec ")
}

p = ensureObjectStoreStructuredMetadata(&d.Spec.Template.Spec, opts)
if err := mergo.Merge(&d.Spec.Template.Spec, p, mergo.WithOverride); err != nil {
return kverrors.Wrap(err, "failed to merge object store structured metadata CLI arguments ")
}

return nil
}

Expand All @@ -98,6 +103,11 @@ func configureStatefulSet(s *appsv1.StatefulSet, opts Options) error {
return kverrors.Wrap(err, "failed to merge gcs object storage spec ")
}

p = ensureObjectStoreStructuredMetadata(&s.Spec.Template.Spec, opts)
if err := mergo.Merge(&s.Spec.Template.Spec, p, mergo.WithOverride); err != nil {
return kverrors.Wrap(err, "failed to merge object store structured metadata CLI arguments ")
}

return nil
}

Expand All @@ -116,6 +126,22 @@ func configureStatefulSetCA(s *appsv1.StatefulSet, tls *TLSConfig) error {
return nil
}

func ensureObjectStoreStructuredMetadata(p *corev1.PodSpec, opts Options) corev1.PodSpec {
if !opts.AllowStructuredMetadata() {
return *p
}

container := p.Containers[0].DeepCopy()

container.Args = append(container.Args, "-validation.allow-structured-metadata=false")

return corev1.PodSpec{
Containers: []corev1.Container{
*container,
},
}
}

func ensureObjectStoreCredentials(p *corev1.PodSpec, opts Options) corev1.PodSpec {
container := p.Containers[0].DeepCopy()
volumes := p.Volumes
Expand Down
Loading

0 comments on commit 87c38a2

Please sign in to comment.