From ca56ad28400749b9db6eefc7fb9da7c2712d43bc Mon Sep 17 00:00:00 2001 From: jiang1997 Date: Fri, 21 Oct 2022 22:28:04 +0800 Subject: [PATCH] feat: add Kafka support to MeterReportService (#243) * feat: add Kafka support to MeterReportService * remove unnecessary comments * remove unnecessary conversions --- skywalking/agent/protocol/kafka.py | 31 +++++++++++++++++++++++++++++- skywalking/client/kafka.py | 24 ++++++++++++++++++----- skywalking/config.py | 1 + tests/e2e/case/grpc/e2e.yaml | 4 +--- tests/e2e/case/kafka/e2e.yaml | 8 +++----- 5 files changed, 54 insertions(+), 14 deletions(-) diff --git a/skywalking/agent/protocol/kafka.py b/skywalking/agent/protocol/kafka.py index dc2c2d29..0f3b1fcf 100644 --- a/skywalking/agent/protocol/kafka.py +++ b/skywalking/agent/protocol/kafka.py @@ -22,10 +22,11 @@ from skywalking import config from skywalking.agent import Protocol from skywalking.client.kafka import KafkaServiceManagementClient, KafkaTraceSegmentReportService, \ - KafkaLogDataReportService + KafkaLogDataReportService, KafkaMeterDataReportService from skywalking.loggings import logger, getLogger, logger_debug_enabled from skywalking.protocol.common.Common_pb2 import KeyStringValuePair from skywalking.protocol.language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference +from skywalking.protocol.language_agent.Meter_pb2 import MeterData from skywalking.protocol.logging.Logging_pb2 import LogData from skywalking.trace.segment import Segment @@ -39,6 +40,7 @@ def __init__(self): self.service_management = KafkaServiceManagementClient() self.traces_reporter = KafkaTraceSegmentReportService() self.log_reporter = KafkaLogDataReportService() + self.meter_reporter = KafkaMeterDataReportService() def heartbeat(self): self.service_management.send_heart_beat() @@ -133,3 +135,30 @@ def generator(): yield log_data self.log_reporter.report(generator=generator()) + + def report_meter(self, queue: Queue, block: bool = True): + start = None + + def generator(): + nonlocal start + + while True: + try: + timeout = config.QUEUE_TIMEOUT # type: int + if not start: # make sure first time through queue is always checked + start = time() + else: + timeout -= int(time() - start) + if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously + return + meter_data = queue.get(block=block, timeout=timeout) # type: MeterData + except Empty: + return + queue.task_done() + + if logger_debug_enabled: + logger.debug('Reporting Meter') + + yield meter_data + + self.meter_reporter.report(generator=generator()) diff --git a/skywalking/client/kafka.py b/skywalking/client/kafka.py index 2aaa7db5..51be7df3 100644 --- a/skywalking/client/kafka.py +++ b/skywalking/client/kafka.py @@ -21,9 +21,10 @@ from kafka import KafkaProducer from skywalking import config -from skywalking.client import ServiceManagementClient, TraceSegmentReportService, LogDataReportService +from skywalking.client import MeterReportService, ServiceManagementClient, TraceSegmentReportService, LogDataReportService from skywalking.loggings import logger, logger_debug_enabled from skywalking.protocol.common.Common_pb2 import KeyStringValuePair +from skywalking.protocol.language_agent.Meter_pb2 import MeterDataCollection from skywalking.protocol.management.Management_pb2 import InstancePingPkg, InstanceProperties kafka_configs = {} @@ -79,7 +80,7 @@ def send_instance_props(self): ) key = bytes(self.topic_key_register + instance.serviceInstance, encoding='utf-8') - value = bytes(instance.SerializeToString()) + value = instance.SerializeToString() self.producer.send(topic=self.topic, key=key, value=value) def send_heart_beat(self): @@ -96,7 +97,7 @@ def send_heart_beat(self): ) key = bytes(instance_ping_pkg.serviceInstance, encoding='utf-8') - value = bytes(instance_ping_pkg.SerializeToString()) + value = instance_ping_pkg.SerializeToString() future = self.producer.send(topic=self.topic, key=key, value=value) res = future.get(timeout=10) if logger_debug_enabled: @@ -111,7 +112,7 @@ def __init__(self): def report(self, generator): for segment in generator: key = bytes(segment.traceSegmentId, encoding='utf-8') - value = bytes(segment.SerializeToString()) + value = segment.SerializeToString() self.producer.send(topic=self.topic, key=key, value=value) @@ -123,10 +124,23 @@ def __init__(self): def report(self, generator): for log_data in generator: key = bytes(log_data.traceContext.traceSegmentId, encoding='utf-8') - value = bytes(log_data.SerializeToString()) + value = log_data.SerializeToString() self.producer.send(topic=self.topic, key=key, value=value) +class KafkaMeterDataReportService(MeterReportService): + def __init__(self): + self.producer = KafkaProducer(**kafka_configs) + self.topic = config.kafka_topic_meter + + def report(self, generator): + collection = MeterDataCollection() + collection.meterData.extend(list(generator)) + key = bytes(config.service_instance, encoding='utf-8') + value = collection.SerializeToString() + self.producer.send(topic=self.topic, key=key, value=value) + + class KafkaConfigDuplicated(Exception): def __init__(self, key): self.key = key diff --git a/skywalking/config.py b/skywalking/config.py index fd89fa40..2b205814 100644 --- a/skywalking/config.py +++ b/skywalking/config.py @@ -38,6 +38,7 @@ kafka_topic_management: str = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or 'skywalking-managements' kafka_topic_segment: str = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or 'skywalking-segments' kafka_topic_log: str = os.getenv('SW_KAFKA_REPORTER_TOPIC_LOG') or 'skywalking-logs' +kafka_topic_meter: str = os.getenv('SW_KAFKA_REPORTER_TOPIC_METER') or 'skywalking-meters' force_tls: bool = os.getenv('SW_AGENT_FORCE_TLS', '').lower() == 'true' protocol: str = (os.getenv('SW_AGENT_PROTOCOL') or 'grpc').lower() authentication: str = os.getenv('SW_AGENT_AUTHENTICATION') diff --git a/tests/e2e/case/grpc/e2e.yaml b/tests/e2e/case/grpc/e2e.yaml index d184a17e..cdbdafb5 100644 --- a/tests/e2e/case/grpc/e2e.yaml +++ b/tests/e2e/case/grpc/e2e.yaml @@ -61,9 +61,7 @@ verify: - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=e2e-service-provider expected: ../expected/service-instance.yml - # TODO: Metric Collection Implementation is not merged https://github.com/apache/skywalking/issues/7084 - # service instance pvm metrics TODO: PVM Collection Implementation needed https://github.com/apache/skywalking/issues/5944 - # swctl --display yaml --base-url=http://localhost:12800/graphql metrics linear --name instance_jvm_thread_live_count --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' - + # service instance pvm metrics - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name meter_total_cpu_utilization --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' - expected: ../expected/metrics-has-value.yml diff --git a/tests/e2e/case/kafka/e2e.yaml b/tests/e2e/case/kafka/e2e.yaml index 4933b5ba..cdbdafb5 100644 --- a/tests/e2e/case/kafka/e2e.yaml +++ b/tests/e2e/case/kafka/e2e.yaml @@ -61,11 +61,9 @@ verify: - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=e2e-service-provider expected: ../expected/service-instance.yml - # TODO: Metric Collection Implementation is not merged https://github.com/apache/skywalking/issues/7084 - # service instance pvm metrics TODO: PVM Collection Implementation needed https://github.com/apache/skywalking/issues/5944 - # swctl --display yaml --base-url=http://localhost:12800/graphql metrics linear --name instance_jvm_thread_live_count --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' - - # - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name instance_jvm_thread_live_count --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' - - # expected: ../expected/metrics-has-value.yml + # service instance pvm metrics + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name meter_total_cpu_utilization --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' - + expected: ../expected/metrics-has-value.yml # trace segment list - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls