Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka prefix to kakfa topic names #277

Merged
merged 10 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- Add Greenlet profiler (#246)
- Add test and support for Python Slim base images (#249)
- Add support for the tags of Virtual Cache for Redis (#263)
- Add a new configuration `kafka_namespace` to prefix the kafka topic names (#277)

- Plugins:
- Add aioredis, aiormq, amqp, asyncpg, aio-pika, kombu RMQ plugins (#230 Missing test coverage)
Expand Down
1 change: 1 addition & 0 deletions docs/en/setup/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export SW_AGENT_YourConfiguration=YourValue
| service_instance | SW_AGENT_SERVICE_INSTANCE | <class 'str'> | str(uuid.uuid1()).replace('-', '') | The name of this particular awesome Python service instance |
| namespace | SW_AGENT_NAMESPACE | <class 'str'> | | The agent namespace of the Python service (available as tag and the suffix of service name) |
| kafka_bootstrap_servers | SW_AGENT_KAFKA_BOOTSTRAP_SERVERS | <class 'str'> | localhost:9092 | A list of host/port pairs to use for establishing the initial connection to your Kafka cluster. It is in the form of host1:port1,host2:port2,... (used for Kafka reporter protocol) |
| kafka_namespace | SW_AGENT_KAFKA_NAMESPACE | <class 'str'> | | The kafka namespace specified by OAP side SW_NAMESPACE, prepends the following kafka topic names with a `-`. |
| kafka_topic_management | SW_AGENT_KAFKA_TOPIC_MANAGEMENT | <class 'str'> | skywalking-managements | Specifying Kafka topic name for service instance reporting and registering, this should be in sync with OAP |
| kafka_topic_segment | SW_AGENT_KAFKA_TOPIC_SEGMENT | <class 'str'> | skywalking-segments | Specifying Kafka topic name for Tracing data, this should be in sync with OAP |
| kafka_topic_log | SW_AGENT_KAFKA_TOPIC_LOG | <class 'str'> | skywalking-logs | Specifying Kafka topic name for Log data, this should be in sync with OAP |
Expand Down
11 changes: 10 additions & 1 deletion skywalking/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
# A list of host/port pairs to use for establishing the initial connection to your Kafka cluster.
# It is in the form of host1:port1,host2:port2,... (used for Kafka reporter protocol)
kafka_bootstrap_servers: str = os.getenv('SW_AGENT_KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
# The kafka namespace specified by OAP side SW_NAMESPACE, prepends the following kafka topic names with a `-`.
kafka_namespace: str = os.getenv('SW_AGENT_KAFKA_NAMESPACE', '')
# Specifying Kafka topic name for service instance reporting and registering, this should be in sync with OAP
kafka_topic_management: str = os.getenv('SW_AGENT_KAFKA_TOPIC_MANAGEMENT', 'skywalking-managements')
# Specifying Kafka topic name for Tracing data, this should be in sync with OAP
Expand Down Expand Up @@ -222,13 +224,20 @@ def finalize_name() -> None:
"""
This function concatenates the serviceName according to
Java agent's implementation.
TODO: add kafka namespace prefix and cluster concept
TODO: add cluster concept
Ref https://github.com/apache/skywalking-java/pull/123
"""
global service_name
if namespace:
service_name = f'{service_name}|{namespace}'

global kafka_topic_management, kafka_topic_meter, kafka_topic_log, kafka_topic_segment
if kafka_namespace:
kafka_topic_management = f'{kafka_namespace}-{kafka_topic_management}'
kafka_topic_meter = f'{kafka_namespace}-{kafka_topic_meter}'
kafka_topic_log = f'{kafka_namespace}-{kafka_topic_log}'
kafka_topic_segment = f'{kafka_namespace}-{kafka_topic_segment}'


def finalize_regex() -> None:
"""
Expand Down
4 changes: 3 additions & 1 deletion tests/e2e/case/kafka/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ services:
SW_KAFKA_FETCHER_SERVERS: broker-a:9092,broker-b:9092
SW_KAFKA_FETCHER_PARTITIONS: 2
SW_KAFKA_FETCHER_PARTITIONS_FACTOR: 1
SW_NAMESPACE: 'e2e'
depends_on:
broker-a:
condition: service_healthy
Expand All @@ -99,7 +100,7 @@ services:
SW_AGENT_COLLECTOR_ADDRESS: oap:11800
SW_AGENT_PROTOCOL: kafka
SW_AGENT_KAFKA_BOOTSTRAP_SERVERS: broker-a:9092,broker-b:9092

SW_AGENT_KAFKA_NAMESPACE: 'e2e'
depends_on:
oap:
condition: service_healthy
Expand All @@ -112,6 +113,7 @@ services:
SW_AGENT_COLLECTOR_ADDRESS: oap:11800
SW_AGENT_PROTOCOL: kafka
SW_AGENT_KAFKA_BOOTSTRAP_SERVERS: broker-a:9092,broker-b:9092
SW_AGENT_KAFKA_NAMESPACE: 'e2e'
ports:
- "9090"
depends_on:
Expand Down