Skip to content

Commit

Permalink
feat: add Kafka support to MeterReportService (#243)
Browse files Browse the repository at this point in the history
* feat: add Kafka support to MeterReportService

* remove unnecessary comments

* remove unnecessary conversions
  • Loading branch information
jiang1997 authored Oct 21, 2022
1 parent 877928f commit ca56ad2
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 14 deletions.
31 changes: 30 additions & 1 deletion skywalking/agent/protocol/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
from skywalking import config
from skywalking.agent import Protocol
from skywalking.client.kafka import KafkaServiceManagementClient, KafkaTraceSegmentReportService, \
KafkaLogDataReportService
KafkaLogDataReportService, KafkaMeterDataReportService
from skywalking.loggings import logger, getLogger, logger_debug_enabled
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
from skywalking.protocol.language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference
from skywalking.protocol.language_agent.Meter_pb2 import MeterData
from skywalking.protocol.logging.Logging_pb2 import LogData
from skywalking.trace.segment import Segment

Expand All @@ -39,6 +40,7 @@ def __init__(self):
self.service_management = KafkaServiceManagementClient()
self.traces_reporter = KafkaTraceSegmentReportService()
self.log_reporter = KafkaLogDataReportService()
self.meter_reporter = KafkaMeterDataReportService()

def heartbeat(self):
self.service_management.send_heart_beat()
Expand Down Expand Up @@ -133,3 +135,30 @@ def generator():
yield log_data

self.log_reporter.report(generator=generator())

def report_meter(self, queue: Queue, block: bool = True):
start = None

def generator():
nonlocal start

while True:
try:
timeout = config.QUEUE_TIMEOUT # type: int
if not start: # make sure first time through queue is always checked
start = time()
else:
timeout -= int(time() - start)
if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
return
meter_data = queue.get(block=block, timeout=timeout) # type: MeterData
except Empty:
return
queue.task_done()

if logger_debug_enabled:
logger.debug('Reporting Meter')

yield meter_data

self.meter_reporter.report(generator=generator())
24 changes: 19 additions & 5 deletions skywalking/client/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
from kafka import KafkaProducer

from skywalking import config
from skywalking.client import ServiceManagementClient, TraceSegmentReportService, LogDataReportService
from skywalking.client import MeterReportService, ServiceManagementClient, TraceSegmentReportService, LogDataReportService
from skywalking.loggings import logger, logger_debug_enabled
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
from skywalking.protocol.language_agent.Meter_pb2 import MeterDataCollection
from skywalking.protocol.management.Management_pb2 import InstancePingPkg, InstanceProperties

kafka_configs = {}
Expand Down Expand Up @@ -79,7 +80,7 @@ def send_instance_props(self):
)

key = bytes(self.topic_key_register + instance.serviceInstance, encoding='utf-8')
value = bytes(instance.SerializeToString())
value = instance.SerializeToString()
self.producer.send(topic=self.topic, key=key, value=value)

def send_heart_beat(self):
Expand All @@ -96,7 +97,7 @@ def send_heart_beat(self):
)

key = bytes(instance_ping_pkg.serviceInstance, encoding='utf-8')
value = bytes(instance_ping_pkg.SerializeToString())
value = instance_ping_pkg.SerializeToString()
future = self.producer.send(topic=self.topic, key=key, value=value)
res = future.get(timeout=10)
if logger_debug_enabled:
Expand All @@ -111,7 +112,7 @@ def __init__(self):
def report(self, generator):
for segment in generator:
key = bytes(segment.traceSegmentId, encoding='utf-8')
value = bytes(segment.SerializeToString())
value = segment.SerializeToString()
self.producer.send(topic=self.topic, key=key, value=value)


Expand All @@ -123,10 +124,23 @@ def __init__(self):
def report(self, generator):
for log_data in generator:
key = bytes(log_data.traceContext.traceSegmentId, encoding='utf-8')
value = bytes(log_data.SerializeToString())
value = log_data.SerializeToString()
self.producer.send(topic=self.topic, key=key, value=value)


class KafkaMeterDataReportService(MeterReportService):
def __init__(self):
self.producer = KafkaProducer(**kafka_configs)
self.topic = config.kafka_topic_meter

def report(self, generator):
collection = MeterDataCollection()
collection.meterData.extend(list(generator))
key = bytes(config.service_instance, encoding='utf-8')
value = collection.SerializeToString()
self.producer.send(topic=self.topic, key=key, value=value)


class KafkaConfigDuplicated(Exception):
def __init__(self, key):
self.key = key
1 change: 1 addition & 0 deletions skywalking/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
kafka_topic_management: str = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or 'skywalking-managements'
kafka_topic_segment: str = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or 'skywalking-segments'
kafka_topic_log: str = os.getenv('SW_KAFKA_REPORTER_TOPIC_LOG') or 'skywalking-logs'
kafka_topic_meter: str = os.getenv('SW_KAFKA_REPORTER_TOPIC_METER') or 'skywalking-meters'
force_tls: bool = os.getenv('SW_AGENT_FORCE_TLS', '').lower() == 'true'
protocol: str = (os.getenv('SW_AGENT_PROTOCOL') or 'grpc').lower()
authentication: str = os.getenv('SW_AGENT_AUTHENTICATION')
Expand Down
4 changes: 1 addition & 3 deletions tests/e2e/case/grpc/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ verify:
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=e2e-service-provider
expected: ../expected/service-instance.yml

# TODO: Metric Collection Implementation is not merged https://github.com/apache/skywalking/issues/7084
# service instance pvm metrics TODO: PVM Collection Implementation needed https://github.com/apache/skywalking/issues/5944
# swctl --display yaml --base-url=http://localhost:12800/graphql metrics linear --name instance_jvm_thread_live_count --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
# service instance pvm metrics
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name meter_total_cpu_utilization --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
expected: ../expected/metrics-has-value.yml

Expand Down
8 changes: 3 additions & 5 deletions tests/e2e/case/kafka/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,9 @@ verify:
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=e2e-service-provider
expected: ../expected/service-instance.yml

# TODO: Metric Collection Implementation is not merged https://github.com/apache/skywalking/issues/7084
# service instance pvm metrics TODO: PVM Collection Implementation needed https://github.com/apache/skywalking/issues/5944
# swctl --display yaml --base-url=http://localhost:12800/graphql metrics linear --name instance_jvm_thread_live_count --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
# - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name instance_jvm_thread_live_count --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
# expected: ../expected/metrics-has-value.yml
# service instance pvm metrics
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name meter_total_cpu_utilization --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
expected: ../expected/metrics-has-value.yml

# trace segment list
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls
Expand Down

0 comments on commit ca56ad2

Please sign in to comment.