Skip to content

Commit

Permalink
Changed the kafka_types.go file and introduced a mapvalue struct.
Browse files Browse the repository at this point in the history
In kvs.go, introduced an InsertValueMap function and a SecretProvider interface.

Signed-off-by: TheJadeLion2004 <cs22b098@smail.iitm.ac.in>
  • Loading branch information
reegnz authored and TheJadeLion2004 committed Sep 2, 2024
1 parent 9b31cf2 commit 99a4034
Show file tree
Hide file tree
Showing 9 changed files with 380 additions and 51 deletions.
27 changes: 24 additions & 3 deletions apis/fluentbit/v1alpha2/plugins/output/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,27 @@ import (

// +kubebuilder:object:generate:=true

type mapvalue struct {
StringVal string `json:"format,omitempty"`
Secret *plugins.Secret `json:"format,omitempty"`

Check failure on line 14 in apis/fluentbit/v1alpha2/plugins/output/kafka_types.go

View workflow job for this annotation

GitHub Actions / Basic test and verify

struct field Secret repeats json tag "format" also at kafka_types.go:13
}

// Implement the GetStringVal method to satisfy the SecretProvider interface
func (m mapvalue) GetStringVal() string {
return m.StringVal
}

// mapvalue implicitly implements params.SecretProvider because SecretProvider is an empty interface
var _ params.SecretProvider = mapvalue{}

func convertMap(input map[string]mapvalue) map[string]params.SecretProvider {
result := make(map[string]params.SecretProvider)
for k, v := range input {
result[k] = v
}
return result
}

// Kafka output plugin allows to ingest your records into an Apache Kafka service. <br />
// **For full documentation, refer to https://docs.fluentbit.io/manual/pipeline/outputs/kafka**
type Kafka struct {
Expand All @@ -35,8 +56,8 @@ type Kafka struct {
// then by default the first topic in the Topics list will indicate the topic to be used.
TopicKey string `json:"topicKey,omitempty"`
// {property} can be any librdkafka properties
Rdkafka map[string]string `json:"rdkafka,omitempty"`
//adds unknown topics (found in Topic_Key) to Topics. So in Topics only a default topic needs to be configured
Rdkafka map[string]mapvalue `json:"rdkafka,omitempty"`
DynamicTopic *bool `json:"dynamicTopic,omitempty"`
//Fluent Bit queues data into rdkafka library,
//if for some reason the underlying library cannot flush the records the queue might fills up blocking new addition of records.
Expand Down Expand Up @@ -84,8 +105,8 @@ func (k *Kafka) Params(_ plugins.SecretLoader) (*params.KVs, error) {
kvs.Insert("queue_full_retries", fmt.Sprint(*k.QueueFullRetries))
}

kvs.InsertStringMap(k.Rdkafka, func(k, v string) (string, string) {
return fmt.Sprintf("rdkafka.%s", k), v
kvs.InsertMapValMap(convertMap(k.Rdkafka), func(k, v string) (string, params.SecretProvider) {
return fmt.Sprintf("rdkafka.%s", k), mapvalue{StringVal: v}
})

return kvs, nil
Expand Down
32 changes: 0 additions & 32 deletions apis/fluentbit/v1alpha2/plugins/output/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/params/kvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import (

type kvTransformFunc func(string, string) (string, string)

type SecretProvider interface {
GetStringVal() string
}

type kvTransformFunc1 func(string, string) (string, SecretProvider)

type KVs struct {
keys []string
values []string
Expand All @@ -25,6 +31,36 @@ func NewKVs() *KVs {
}
}

func (kvs *KVs) InsertMapValMap(m map[string]SecretProvider, f kvTransformFunc1) {
if len(m) > 0 {
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}

sort.Strings(keys)

for _, k := range keys {
v := m[k]
strval := v.GetStringVal()
if f != nil {
transformedKey, transformedVal := f(k, strval)

if transformedVal != nil {
strval = transformedVal.GetStringVal()
} else {
strval = "" // Default to an empty string if transformation returns nil
}

k = transformedKey
}

kvs.Insert(k, strval)

}
}
}

func (kvs *KVs) Insert(key, value string) {
kvs.keys = append(kvs.keys, key)
kvs.values = append(kvs.values, value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1953,7 +1953,8 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specipvafy data format, options available: json,
msgpack.'
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -1974,7 +1975,44 @@ spec:
type: integer
rdkafka:
additionalProperties:
type: string
properties:
format:
description: Secret defines the key of a value.
properties:
valueFrom:
description: ValueSource defines how to find a value's
key.
properties:
secretKeyRef:
description: Selects a key of a secret in the pod's
namespace
properties:
key:
description: The key of the secret to select
from. Must be a valid secret key.
type: string
name:
default: ""
description: |-
Name of the referent.
This field is effectively required, but due to backwards compatibility is
allowed to be empty. Instances of this type with an empty value here are
almost certainly wrong.
TODO: Add other useful fields. apiVersion, kind, uid?
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896.
type: string
optional:
description: Specify whether the Secret or its
key must be defined
type: boolean
required:
- key
type: object
x-kubernetes-map-type: atomic
type: object
type: object
type: object
description: '{property} can be any librdkafka properties'
type: object
timestampFormat:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1953,7 +1953,8 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specipvafy data format, options available: json,
msgpack.'
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -1974,7 +1975,44 @@ spec:
type: integer
rdkafka:
additionalProperties:
type: string
properties:
format:
description: Secret defines the key of a value.
properties:
valueFrom:
description: ValueSource defines how to find a value's
key.
properties:
secretKeyRef:
description: Selects a key of a secret in the pod's
namespace
properties:
key:
description: The key of the secret to select
from. Must be a valid secret key.
type: string
name:
default: ""
description: |-
Name of the referent.
This field is effectively required, but due to backwards compatibility is
allowed to be empty. Instances of this type with an empty value here are
almost certainly wrong.
TODO: Add other useful fields. apiVersion, kind, uid?
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896.
type: string
optional:
description: Specify whether the Secret or its
key must be defined
type: boolean
required:
- key
type: object
x-kubernetes-map-type: atomic
type: object
type: object
type: object
description: '{property} can be any librdkafka properties'
type: object
timestampFormat:
Expand Down
42 changes: 40 additions & 2 deletions config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1953,7 +1953,8 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specipvafy data format, options available: json,
msgpack.'
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -1974,7 +1975,44 @@ spec:
type: integer
rdkafka:
additionalProperties:
type: string
properties:
format:
description: Secret defines the key of a value.
properties:
valueFrom:
description: ValueSource defines how to find a value's
key.
properties:
secretKeyRef:
description: Selects a key of a secret in the pod's
namespace
properties:
key:
description: The key of the secret to select
from. Must be a valid secret key.
type: string
name:
default: ""
description: |-
Name of the referent.
This field is effectively required, but due to backwards compatibility is
allowed to be empty. Instances of this type with an empty value here are
almost certainly wrong.
TODO: Add other useful fields. apiVersion, kind, uid?
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896.
type: string
optional:
description: Specify whether the Secret or its
key must be defined
type: boolean
required:
- key
type: object
x-kubernetes-map-type: atomic
type: object
type: object
type: object
description: '{property} can be any librdkafka properties'
type: object
timestampFormat:
Expand Down
42 changes: 40 additions & 2 deletions config/crd/bases/fluentbit.fluent.io_outputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1953,7 +1953,8 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specipvafy data format, options available: json,
msgpack.'
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -1974,7 +1975,44 @@ spec:
type: integer
rdkafka:
additionalProperties:
type: string
properties:
format:
description: Secret defines the key of a value.
properties:
valueFrom:
description: ValueSource defines how to find a value's
key.
properties:
secretKeyRef:
description: Selects a key of a secret in the pod's
namespace
properties:
key:
description: The key of the secret to select
from. Must be a valid secret key.
type: string
name:
default: ""
description: |-
Name of the referent.
This field is effectively required, but due to backwards compatibility is
allowed to be empty. Instances of this type with an empty value here are
almost certainly wrong.
TODO: Add other useful fields. apiVersion, kind, uid?
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896.
type: string
optional:
description: Specify whether the Secret or its
key must be defined
type: boolean
required:
- key
type: object
x-kubernetes-map-type: atomic
type: object
type: object
type: object
description: '{property} can be any librdkafka properties'
type: object
timestampFormat:
Expand Down
Loading

0 comments on commit 99a4034

Please sign in to comment.