Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #813 from zalando/aruha-1403-nakadi-consumption-kpi
Browse files Browse the repository at this point in the history
Aruha 1403 nakadi consumption kpi
  • Loading branch information
v-stepanov authored Dec 22, 2017
2 parents 4eaa6c4 + b4717d8 commit 6e4906a
Show file tree
Hide file tree
Showing 16 changed files with 325 additions and 31 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

## [2.5.0] - 2017-12-22

### Added
- Nakadi collects event publishing KPI data
- Nakadi collects event streaming KPI data

## [2.4.2] - 2017-12-21

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import com.google.common.base.Charsets;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PathVariable;
Expand All @@ -23,6 +25,7 @@
import org.zalando.nakadi.security.Client;
import org.zalando.nakadi.service.BlacklistService;
import org.zalando.nakadi.service.EventPublisher;
import org.zalando.nakadi.service.NakadiKpiPublisher;
import org.zalando.problem.Problem;
import org.zalando.problem.ThrowableProblem;
import org.zalando.problem.spring.web.advice.Responses;
Expand All @@ -43,14 +46,21 @@ public class EventPublishingController {
private final EventPublisher publisher;
private final EventTypeMetricRegistry eventTypeMetricRegistry;
private final BlacklistService blacklistService;
private final NakadiKpiPublisher nakadiKpiPublisher;
private final String kpiBatchPublishedEventType;

@Autowired
public EventPublishingController(final EventPublisher publisher,
final EventTypeMetricRegistry eventTypeMetricRegistry,
final BlacklistService blacklistService) {
final BlacklistService blacklistService,
final NakadiKpiPublisher nakadiKpiPublisher,
@Value("${nakadi.kpi.event-types.nakadiBatchPublished}")
final String kpiBatchPublishedEventType) {
this.publisher = publisher;
this.eventTypeMetricRegistry = eventTypeMetricRegistry;
this.blacklistService = blacklistService;
this.nakadiKpiPublisher = nakadiKpiPublisher;
this.kpiBatchPublishedEventType = kpiBatchPublishedEventType;
}

@RequestMapping(value = "/event-types/{eventTypeName}/events", method = POST)
Expand Down Expand Up @@ -117,6 +127,14 @@ private void reportSLOs(final long startingNanos, final int totalSizeBytes, fina

LOG.info("[SLO] [publishing-latency] time={} size={} count={} eventTypeName={} app={}", msSpent,
totalSizeBytes, eventCount, eventTypeName, applicationName);

nakadiKpiPublisher.publish(kpiBatchPublishedEventType, () -> new JSONObject()
.put("event_type", eventTypeName)
.put("app", applicationName)
.put("app_hashed", nakadiKpiPublisher.hash(applicationName))
.put("number_of_events", eventCount)
.put("ms_spent", msSpent)
.put("batch_size", totalSizeBytes));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.zalando.nakadi.service.EventTypeChangeListener;
import org.zalando.nakadi.service.timeline.TimelineService;
import org.zalando.nakadi.util.FeatureToggleService;
import org.zalando.nakadi.util.FlowIdUtils;
import org.zalando.nakadi.view.Cursor;
import org.zalando.problem.Problem;

Expand Down Expand Up @@ -202,8 +203,10 @@ public StreamingResponseBody streamEvents(
@Nullable @RequestHeader(name = "X-nakadi-cursors", required = false) final String cursorsStr,
final HttpServletRequest request, final HttpServletResponse response, final Client client)
throws IOException {
final String flowId = FlowIdUtils.peek();

return outputStream -> {
FlowIdUtils.push(flowId);

if (blacklistService.isConsumptionBlocked(eventTypeName, client.getClientId())) {
writeProblemResponse(response, outputStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.zalando.nakadi.service.subscription.SubscriptionStreamer;
import org.zalando.nakadi.service.subscription.SubscriptionStreamerFactory;
import org.zalando.nakadi.util.FeatureToggleService;
import org.zalando.nakadi.util.FlowIdUtils;
import org.zalando.problem.Problem;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -139,8 +140,11 @@ public StreamingResponseBody streamEvents(
@RequestParam(value = "stream_keep_alive_limit", required = false) final Integer streamKeepAliveLimit,
final HttpServletRequest request, final HttpServletResponse response, final Client client)
throws IOException {
final String flowId = FlowIdUtils.peek();

return outputStream -> {
FlowIdUtils.push(flowId);

if (!featureToggleService.isFeatureEnabled(HIGH_LEVEL_API)) {
response.setStatus(HttpServletResponse.SC_NOT_IMPLEMENTED);
return;
Expand Down
11 changes: 3 additions & 8 deletions src/main/java/org/zalando/nakadi/filters/LoggingFilter.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.zalando.nakadi.filters;

import com.google.common.base.Charsets;
import com.google.common.net.HttpHeaders;
import org.apache.commons.codec.binary.Hex;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -17,23 +15,20 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.Principal;
import java.util.Optional;

@Component
public class LoggingFilter extends OncePerRequestFilter {

private static final Logger LOG = LoggerFactory.getLogger(LoggingFilter.class);
private final MessageDigest sha256MessageDigest;

private final NakadiKpiPublisher nakadiKpiPublisher;
private final String accessLogEventType;

@Autowired
public LoggingFilter(final MessageDigest sha256MessageDigest,
final NakadiKpiPublisher nakadiKpiPublisher,
public LoggingFilter(final NakadiKpiPublisher nakadiKpiPublisher,
@Value("${nakadi.kpi.event-types.nakadiAccessLog}") final String accessLogEventType) {
this.sha256MessageDigest = sha256MessageDigest;
this.nakadiKpiPublisher = nakadiKpiPublisher;
this.accessLogEventType = accessLogEventType;
}
Expand Down Expand Up @@ -74,7 +69,7 @@ protected void doFilterInternal(final HttpServletRequest request,
.put("path", path)
.put("query", query)
.put("app", user)
.put("app_hashed", Hex.encodeHexString(sha256MessageDigest.digest(user.getBytes(Charsets.UTF_8))))
.put("app_hashed", nakadiKpiPublisher.hash(user))
.put("status_code", response.getStatus())
.put("response_time_ms", timing));
}
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/org/zalando/nakadi/metrics/StreamKpiData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.zalando.nakadi.metrics;

public class StreamKpiData {
private long bytesSent = 0;
private long numberOfEventsSent = 0;

public long getAndResetBytesSent() {
final long tmp = bytesSent;
bytesSent = 0;
return tmp;
}

public long getAndResetNumberOfEventsSent() {
final long tmp = numberOfEventsSent;
numberOfEventsSent = 0;
return tmp;
}

public void addBytesSent(final long bytes) {
bytesSent = bytesSent + bytes;
}

public void addNumberOfEventsSent(final long count) {
numberOfEventsSent = numberOfEventsSent + count;
}
}
49 changes: 48 additions & 1 deletion src/main/java/org/zalando/nakadi/service/EventStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import com.codahale.metrics.Meter;
import com.google.common.collect.Lists;
import org.apache.kafka.common.KafkaException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zalando.nakadi.domain.ConsumedEvent;
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.metrics.StreamKpiData;
import org.zalando.nakadi.repository.EventConsumer;

import java.io.IOException;
Expand All @@ -33,20 +35,30 @@ public class EventStream {
private final CursorConverter cursorConverter;
private final Meter bytesFlushedMeter;
private final EventStreamWriterProvider writer;
private final StreamKpiData kpiData;
private final String kpiDataStreamedEventType;
private final long kpiFrequencyMs;
private final NakadiKpiPublisher kpiPublisher;

public EventStream(final EventConsumer eventConsumer,
final OutputStream outputStream,
final EventStreamConfig config,
final BlacklistService blacklistService,
final CursorConverter cursorConverter, final Meter bytesFlushedMeter,
final EventStreamWriterProvider writer) {
final EventStreamWriterProvider writer,
final NakadiKpiPublisher kpiPublisher, final String kpiDataStreamedEventType,
final long kpiFrequencyMs) {
this.eventConsumer = eventConsumer;
this.outputStream = outputStream;
this.config = config;
this.blacklistService = blacklistService;
this.cursorConverter = cursorConverter;
this.bytesFlushedMeter = bytesFlushedMeter;
this.writer = writer;
this.kpiPublisher = kpiPublisher;
this.kpiData = new StreamKpiData();
this.kpiDataStreamedEventType = kpiDataStreamedEventType;
this.kpiFrequencyMs = kpiFrequencyMs;
}

public void streamEvents(final AtomicBoolean connectionReady, final Runnable checkAuthorization) {
Expand All @@ -63,6 +75,8 @@ public void streamEvents(final AtomicBoolean connectionReady, final Runnable che
final long start = currentTimeMillis();
final Map<String, Long> batchStartTimes = createMapWithPartitionKeys(partition -> start);
final List<ConsumedEvent> consumedEvents = new LinkedList<>();
long lastKpiEventSent = System.currentTimeMillis();

while (connectionReady.get() &&
!blacklistService.isConsumptionBlocked(config.getEtName(), config.getConsumingAppId())) {

Expand Down Expand Up @@ -109,6 +123,15 @@ public void streamEvents(final AtomicBoolean connectionReady, final Runnable che
}
}

if (lastKpiEventSent + kpiFrequencyMs < System.currentTimeMillis()) {
final long count = kpiData.getAndResetNumberOfEventsSent();
final long bytes = kpiData.getAndResetBytesSent();

publishKpi(config.getConsumingAppId(), count, bytes);

lastKpiEventSent = System.currentTimeMillis();
}

// check if we reached keepAliveInARow for all the partitions; if yes - then close stream
if (config.getStreamKeepAliveLimit() != 0) {
final boolean keepAliveLimitReachedForAllPartitions = keepAliveInARow
Expand Down Expand Up @@ -142,9 +165,31 @@ public void streamEvents(final AtomicBoolean connectionReady, final Runnable che
} catch (final KafkaException e) {
LOG.error("Error occurred when polling events from kafka; consumer: {}, event-type: {}",
config.getConsumingAppId(), config.getEtName(), e);
} finally {
publishKpi(
config.getConsumingAppId(),
kpiData.getAndResetNumberOfEventsSent(),
kpiData.getAndResetBytesSent());
}
}

private void publishKpi(final String appName, final long count, final long bytes) {
final String appNameHashed = kpiPublisher.hash(appName);

LOG.info("[SLO] [streamed-data] api={} eventTypeName={} app={} appHashed={} numberOfEvents={} bytesStreamed={}",
"lola", config.getEtName(), appName, appNameHashed, count, bytes);

kpiPublisher.publish(
kpiDataStreamedEventType,
() -> new JSONObject()
.put("api", "lola")
.put("event_type", config.getEtName())
.put("app", appName)
.put("app_hashed", kpiPublisher.hash(appName))
.put("number_of_events", count)
.put("bytes_streamed", bytes));
}

private <T> Map<String, T> createMapWithPartitionKeys(final Function<String, T> valueFunction) {
return config
.getCursors().stream().map(NakadiCursor::getPartition)
Expand All @@ -156,6 +201,8 @@ private void sendBatch(final NakadiCursor topicPosition, final List<byte[]> curr
final int bytesWritten = writer.getWriter()
.writeBatch(outputStream, cursorConverter.convert(topicPosition), currentBatch);
bytesFlushedMeter.mark(bytesWritten);
kpiData.addBytesSent(bytesWritten);
kpiData.addNumberOfEventsSent(currentBatch.size());
}


Expand Down
18 changes: 15 additions & 3 deletions src/main/java/org/zalando/nakadi/service/EventStreamFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.codahale.metrics.Meter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.zalando.nakadi.exceptions.InvalidCursorException;
import org.zalando.nakadi.exceptions.NakadiException;
Expand All @@ -15,18 +16,26 @@ public class EventStreamFactory {
private final CursorConverter cursorConverter;
private final EventStreamWriterProvider writerProvider;
private final BlacklistService blacklistService;
private final NakadiKpiPublisher nakadiKpiPublisher;
private final String kpiDataStreamedEventType;
private final long kpiFrequencyMs;

@Autowired
public EventStreamFactory(
final CursorConverter cursorConverter,
final EventStreamWriterProvider writerProvider,
final BlacklistService blacklistService) {
final BlacklistService blacklistService,
final NakadiKpiPublisher nakadiKpiPublisher,
@Value("${nakadi.kpi.event-types.nakadiDataStreamed}") final String kpiDataStreamedEventType,
@Value("${nakadi.kpi.config.stream-data-collection-frequency-ms}") final long kpiFrequencyMs) {
this.cursorConverter = cursorConverter;
this.writerProvider = writerProvider;
this.blacklistService = blacklistService;
this.nakadiKpiPublisher = nakadiKpiPublisher;
this.kpiDataStreamedEventType = kpiDataStreamedEventType;
this.kpiFrequencyMs = kpiFrequencyMs;
}


public EventStream createEventStream(final OutputStream outputStream, final EventConsumer eventConsumer,
final EventStreamConfig config, final Meter bytesFlushedMeter)
throws NakadiException, InvalidCursorException {
Expand All @@ -37,6 +46,9 @@ public EventStream createEventStream(final OutputStream outputStream, final Even
blacklistService,
cursorConverter,
bytesFlushedMeter,
writerProvider);
writerProvider,
nakadiKpiPublisher,
kpiDataStreamedEventType,
kpiFrequencyMs);
}
}
12 changes: 11 additions & 1 deletion src/main/java/org/zalando/nakadi/service/NakadiKpiPublisher.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package org.zalando.nakadi.service;

import com.google.common.base.Charsets;
import org.apache.commons.codec.binary.Hex;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.zalando.nakadi.util.FeatureToggleService;

import java.security.MessageDigest;
import java.util.function.Supplier;

@Component
Expand All @@ -16,12 +19,15 @@ public class NakadiKpiPublisher {

private final FeatureToggleService featureToggleService;
private final EventsProcessor eventsProcessor;
private final MessageDigest sha256MessageDigest;

@Autowired
protected NakadiKpiPublisher(final FeatureToggleService featureToggleService,
final EventsProcessor eventsProcessor) {
final EventsProcessor eventsProcessor,
final MessageDigest sha256MessageDigest) {
this.featureToggleService = featureToggleService;
this.eventsProcessor = eventsProcessor;
this.sha256MessageDigest = sha256MessageDigest;
}

public void publish(final String etName, final Supplier<JSONObject> eventSupplier) {
Expand All @@ -37,4 +43,8 @@ public void publish(final String etName, final Supplier<JSONObject> eventSupplie
}
}

public String hash(final String value) {
return Hex.encodeHexString(sha256MessageDigest.digest(value.getBytes(Charsets.UTF_8)));
}

}
Loading

0 comments on commit 6e4906a

Please sign in to comment.