Skip to content

Commit

Permalink
update kafka-operator chart, add steps in doc to use go with kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
anantharam committed Jan 2, 2025
1 parent 4d7095b commit 3a5d13a
Show file tree
Hide file tree
Showing 9 changed files with 349 additions and 11 deletions.
107 changes: 102 additions & 5 deletions argocd-helm-charts/strimzi-kafka-operator/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,105 @@
# Strimzi kafka operator

## operational tips
## Setting up the strimzi kafka operator

- Many configuration changes requires a rolling restart, and
during a rolling restart the controller that is elected the controller MUST be rolled last.
- Between each roll, the cluster must be fully in-sync to satisfy min.in-sync.replicas.
- Every kafka broker (pod) must be directly reachable by clients.
### Configuration

The current ready-to-go configuration provided in the [values file](./values.yaml) is the default configuration required
to run kafka with tls enabled.

more configuration examples can be found in the [examples folder](./examples/)

### Install

- To install the kafka-operator locally in a cluster(f.ex minikube) go to the chart directory `argocd-helm-charts/strimzi-kafka-operator` and run the below command

```bash
helm install kafka-operator . -n default
```

- Once deployed ensure that all the CRDs (kafka, kafkaUser, kafkaTopic) and ingress(if enabled) are created
- After all the pods are created as shown below

![alt text](./images/image.png)

- Verify that the secrets which contain all the necessary certificates including kafka user certificate are created as shown below

![alt text](./images/image1.png)

## Using confluent kafka go

- These steps are for the config provided in the [default values file](./values.yaml).
- If golang service is running outside the cluster, ensure that the `listener` is not of type `internal` in the config.
- Repository - https://github.com/confluentinc/confluent-kafka-go/

### Producer config -

#### TLS authentication -

```go
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "192.168.49.2:30486",
"security.protocol": "SSL",
"ssl.endpoint.identification.algorithm": "none",
"ssl.ca.location": "/home/ananth/Desktop/kafka-local/ca/cluster.crt",
"ssl.certificate.location": "/home/ananth/Desktop/kafka-local/ca/user.crt",
"ssl.key.location": "/home/ananth/Desktop/kafka-local/ca/user.key",
})
```

- `ssl.ca.location` - `cluster.crt` certificate can be found in the secret `strimzi-kafka-cluster-ca-cert` (required)
- `ssl.certificate.location` - client certificate stored in secret created for `KafkaUser` (required)
- `ssl.key.location` - client's private key stored in secret created for `KafkaUser` (required)
- `"ssl.endpoint.identification.algorithm": "none"` - for more info - https://github.com/confluentinc/librdkafka/issues/4349#issuecomment-1640196425 (required)
- `security.protocol` - `SSL` (required)
- `bootstrap.servers` - `<internal/external IP>:<port>` (required)

#### SCRAM-SHA-512 authentication -

```go
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "192.168.49.2:30263",
"sasl.mechanisms": "SCRAM-SHA-512",
"security.protocol": "SASL_PLAINTEXT",
"sasl.username": "my-user",
"sasl.password": "UkPeaYQDJSnj1kLKVd96kXh9C60Mf6Uo",
})
```

- `sasl.password` - password found in the secret created for `KafkaUser` (required)
- `sasl.username` - username of the `KafkaUser` (required)
- `sasl.mechanisms` - `SCRAM-SHA-512` (required)
- `security.protocol` - `SASL_PLAINTEXT` (required)

### Consumer config -

#### TLS authentication -

```go
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "192.168.49.2:30486",
"group.id": "my-consumer-group",
"auto.offset.reset": "earliest",
"security.protocol": "SSL",
"ssl.endpoint.identification.algorithm": "none",
"ssl.ca.location": "/home/ananth/Desktop/kafka-local/ca/cluster.crt",
"ssl.certificate.location": "/home/ananth/Desktop/kafka-local/ca/user.crt",
"ssl.key.location": "/home/ananth/Desktop/kafka-local/ca/user.key",
})
```

#### SCRAM-SHA-512 authentication -

```go
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "192.168.49.2:30486",
"group.id": "my-consumer-group",
"auto.offset.reset": "earliest",
"sasl.mechanisms": "SCRAM-SHA-512",
"security.protocol": "SASL_PLAINTEXT",
"sasl.username": "my-user",
"sasl.password": "UkPeaYQDJSnj1kLKVd96kXh9C60Mf6Uo",
})
```

*NOTE* - Ensure that the `KafkaUser` has the required permissions for the resource group `my-consumer-group`.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
kafka:
- name: strimzi-kafka
kafka:
replicas: 1
authorization:
type: simple
listeners:
- name: ingress
port: 9094
type: ingress
tls: true
authentication:
type: tls
configuration:
bootstrap:
host: localhost
annotations:
kubernetes.io/ingress.class: nginx
brokers:
- id: 0
host: kafka-0
annotations:
kubernetes.io/ingress.class: nginx
- id: 1
host: kafka-1
annotations:
kubernetes.io/ingress.class: nginx
- id: 2
host: kafka-2
annotations:
kubernetes.io/ingress.class: nginx
class: nginx
storage:
type: jbod
volumes:
- id: 0
class: rook-ceph-block
type: persistent-claim
size: 1Gi
deleteClaim: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
zookeeper:
replicas: 3
storage:
type: persistent-claim
class: rook-ceph-block
size: 1Gi
deleteClaim: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
kafka:
- name: strimzi-kafka
kafka:
replicas: 1
authorization:
type: simple
listeners:
- name: internal
port: 9094
type: internal
tls: false
authentication:
type: scram-sha-512
storage:
type: jbod
volumes:
- id: 0
class: rook-ceph-block
type: persistent-claim
size: 1Gi
deleteClaim: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
zookeeper:
replicas: 3
storage:
type: persistent-claim
class: rook-ceph-block
size: 1Gi
deleteClaim: false

kafkaTopics:
- topicName: my-topic
cluster: strimzi-kafka
partitions: 3
replicas: 1

kafkaUser:
- name: my-user
cluster: strimzi-kafka
authentication:
type: scram-sha-512
authorization:
type: simple
acls:
- type: allow
resource:
type: topic
name: my-topic
patternType: literal
host: "*"
operations:
- Read
- Write
- type: allow
resource:
type: group
name: my-group
patternType: literal
host: "*"
operations:
- Read

ingress:
enable: false
name: kafka
rules:
- host: localhost
http:
paths:
- path: /
pathType: prefix
backend:
service:
name: strimzi-kafka-kafka-internal-bootstrap
port:
number: 9094
tls:
enable: false
config:
- hosts:
- localhost
secretName: local-host-tls
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
41 changes: 41 additions & 0 deletions argocd-helm-charts/strimzi-kafka-operator/templates/ingress.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{{- if.Values.ingress.enable }}
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: {{ .Values.ingress.name }}
{{- if .Values.ingress.annotations }}
annotations:
{{- range $key, $value := .Values.ingress.annotations }}
{{ $key }}: {{ $value }}
{{- end }}
{{- end }}
spec:
ingressClassName: {{ .Values.ingress.class }}
rules:
{{- range .Values.ingress.rules}}
- host: {{ .host }}
{{- if .http }}
http:
paths:
{{- range .http.paths }}
- path: {{ .path }}
pathType: {{ .pathType }}
backend:
service:
name: {{ .backend.service.name }}
port:
number: {{ .backend.service.port.number }}
{{- end }}
{{- end}}
{{- end }}
{{- if .Values.ingress.tls.enable }}
tls:
{{- range .Values.ingress.tls.config }}
- hosts:
{{- range .hosts }}
- {{ . }}
{{- end }}
secretName: {{ .secretName }}
{{- end}}
{{- end }}
{{- end }}
33 changes: 30 additions & 3 deletions argocd-helm-charts/strimzi-kafka-operator/templates/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ metadata:
spec:
kafka:
replicas: {{ .kafka.replicas }}
{{- if .kafka.authorization }}
authorization:
type: {{ .kafka.authorization.type }}
{{- end}}
listeners:
{{- range .kafka.listeners }}
- name: {{ .name }}
Expand All @@ -19,13 +21,38 @@ spec:
authentication:
type: {{ .authentication.type | default "tls" }}
{{- end }}
{{- if .configuration }}
configuration:
bootstrap:
host: {{ .configuration.bootstrap.host }}
{{- if .configuration.bootstrap.annotations }}
annotations:
{{- range $key, $value := .configuration.bootstrap.annotations }}
{{ $key }}: {{ $value }}
{{- end }}
{{- end}}
brokers:
{{- range .configuration.brokers }}
- broker: {{ .id }}
host: {{ .host }}
{{- if .annotations }}
annotations:
{{- range $key, $value := .annotations }}
{{ $key }}: {{ $value }}
{{- end }}
{{- end}}
{{- end}}
class: {{ .configuration.class }}
{{- end}}
{{- end }}
storage:
type: {{ .kafka.storage.type }}
class: {{ .kafka.storage.type | default "rook-ceph-block" }}
{{- if .kafka.storage.class }}
class: {{ .kafka.storage.class}}
{{- end}}
{{- if .kafka.storage.size}}
size: {{ .kafka.storage.size }}

# volumes is to be set if the above storage type is jbod
{{- end}}
{{- if eq .kafka.storage.type "jbod" }}
volumes:
{{- range .kafka.storage.volumes }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ spec:
- type: {{ .type }}
resource:
type: {{ .resource.type }}
# resource of type cluster does not support name and patternType
{{- if ne .resource.type "cluster" }}
name: {{ .resource.name }}
patternType: {{ .resource.patternType }}
Expand Down
Loading

0 comments on commit 3a5d13a

Please sign in to comment.