Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

ARUHA-2372 Store event disk size stats from nakadi #1073

Merged
merged 5 commits into from
Jun 25, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.zalando.nakadi.repository.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -254,7 +255,8 @@ private KafkaTopicRepository createKafkaTopicRepository() {
nakadiSettings,
kafkaSettings,
zookeeperSettings,
kafkaTopicConfigFactory);
kafkaTopicConfigFactory,
new ObjectMapper());
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.zalando.nakadi.repository;

import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.zalando.nakadi.config.NakadiSettings;
Expand Down Expand Up @@ -29,19 +30,22 @@ public class KafkaRepositoryCreator implements TopicRepositoryCreator {
private final ZookeeperSettings zookeeperSettings;
private final KafkaTopicConfigFactory kafkaTopicConfigFactory;
private final MetricRegistry metricRegistry;
private final ObjectMapper objectMapper;

@Autowired
public KafkaRepositoryCreator(
final NakadiSettings nakadiSettings,
final KafkaSettings kafkaSettings,
final ZookeeperSettings zookeeperSettings,
final KafkaTopicConfigFactory kafkaTopicConfigFactory,
final MetricRegistry metricRegistry) {
final MetricRegistry metricRegistry,
final ObjectMapper objectMapper) {
this.nakadiSettings = nakadiSettings;
this.kafkaSettings = kafkaSettings;
this.zookeeperSettings = zookeeperSettings;
this.kafkaTopicConfigFactory = kafkaTopicConfigFactory;
this.metricRegistry = metricRegistry;
this.objectMapper = objectMapper;
}

@Override
Expand All @@ -59,7 +63,8 @@ public TopicRepository createTopicRepository(final Storage storage) throws Topic
final KafkaFactory kafkaFactory =
new KafkaFactory(new KafkaLocationManager(zooKeeperHolder, kafkaSettings), metricRegistry);
final KafkaTopicRepository kafkaTopicRepository = new KafkaTopicRepository(zooKeeperHolder,
kafkaFactory, nakadiSettings, kafkaSettings, zookeeperSettings, kafkaTopicConfigFactory);
kafkaFactory, nakadiSettings, kafkaSettings, zookeeperSettings, kafkaTopicConfigFactory,
objectMapper);
// check that it does work
kafkaTopicRepository.listTopics();
return kafkaTopicRepository;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.zalando.nakadi.domain.PartitionEndStatistics;
import org.zalando.nakadi.domain.PartitionStatistics;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.domain.TopicPartition;
import org.zalando.nakadi.exceptions.runtime.EventPublishingException;
import org.zalando.nakadi.exceptions.runtime.InvalidCursorException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
Expand All @@ -15,6 +16,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public interface TopicRepository {
Expand Down Expand Up @@ -67,6 +69,12 @@ List<PartitionEndStatistics> loadTopicEndStatistics(Collection<Timeline> topics)

List<String> listPartitionNames(String topicId);

/**
* Provides estimation of disk size occupied by particular topic partition. Replicated data is not included
* @return Maximum size occupied by topic partitions.
*/
Map<TopicPartition, Long> getSizeStats();

EventConsumer.LowLevelConsumer createEventConsumer(String clientId, List<NakadiCursor> positions)
throws InvalidCursorException;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.zalando.nakadi.repository.kafka;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Map;

public class BubukuSizeStats {
public static class TotalStats {
private final long usedKb;
private final long freeKb;

public TotalStats(
@JsonProperty("used_kb") final long usedKb,
@JsonProperty("free_kb") final long freeKb) {
this.usedKb = usedKb;
this.freeKb = freeKb;
}

public long getUsedKb() {
return usedKb;
}

public long getFreeKb() {
return freeKb;
}
}

private final TotalStats totalStats;
private final Map<String, Map<String, Long>> perPartitionStats;

public BubukuSizeStats(
@JsonProperty("disk") final TotalStats totalStats,
@JsonProperty("topics") final Map<String, Map<String, Long>> perPartitionStats) {
this.totalStats = totalStats;
this.perPartitionStats = perPartitionStats;
}

public TotalStats getTotalStats() {
return totalStats;
}

public Map<String, Map<String, Long>> getPerPartitionStats() {
return perPartitionStats;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.zalando.nakadi.repository.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import kafka.admin.AdminUtils;
import kafka.server.ConfigType;
Expand Down Expand Up @@ -83,20 +84,23 @@ public class KafkaTopicRepository implements TopicRepository {
private final ZookeeperSettings zookeeperSettings;
private final ConcurrentMap<String, HystrixKafkaCircuitBreaker> circuitBreakers;
private final KafkaTopicConfigFactory kafkaTopicConfigFactory;
private final ObjectMapper objectMapper;

public KafkaTopicRepository(final ZooKeeperHolder zkFactory,
final KafkaFactory kafkaFactory,
final NakadiSettings nakadiSettings,
final KafkaSettings kafkaSettings,
final ZookeeperSettings zookeeperSettings,
final KafkaTopicConfigFactory kafkaTopicConfigFactory) {
final KafkaTopicConfigFactory kafkaTopicConfigFactory,
final ObjectMapper objectMapper) {
this.zkFactory = zkFactory;
this.kafkaFactory = kafkaFactory;
this.nakadiSettings = nakadiSettings;
this.kafkaSettings = kafkaSettings;
this.zookeeperSettings = zookeeperSettings;
this.kafkaTopicConfigFactory = kafkaTopicConfigFactory;
this.circuitBreakers = new ConcurrentHashMap<>();
this.objectMapper = objectMapper;
}

private CompletableFuture<Exception> publishItem(
Expand Down Expand Up @@ -479,6 +483,33 @@ public List<String> listPartitionNames(final String topicId) {
.withExceptionsThatForceRetry(org.apache.kafka.common.errors.TimeoutException.class));
}

@Override
public Map<org.zalando.nakadi.domain.TopicPartition, Long> getSizeStats() {
final Map<org.zalando.nakadi.domain.TopicPartition, Long> result = new HashMap<>();

try {
final List<String> brokers = zkFactory.get().getChildren().forPath("/bubuku/size_stats");

for (final String broker : brokers) {
final BubukuSizeStats stats = objectMapper.readValue(
zkFactory.get().getData().forPath("/bubuku/size_stats/" + broker),
BubukuSizeStats.class);
stats.getPerPartitionStats().forEach((topic, partitionSizes) -> {
partitionSizes.forEach((partition, size) -> {
final org.zalando.nakadi.domain.TopicPartition tp =
new org.zalando.nakadi.domain.TopicPartition(topic, partition);

result.compute(tp, (ignore, oldSize) ->
Optional.ofNullable(oldSize).map(v -> Math.max(oldSize, size)).orElse(size));
});
});
}
return result;
} catch (Exception e) {
throw new RuntimeException("Failed to acquire size statistics", e);
}
}

public List<String> listPartitionNamesInternal(final String topicId) {
final Producer<String, String> producer = kafkaFactory.takeProducer();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private void sendEventBatch(final String etName) {
}
}

void enrichAndSubmit(final String etName, final JSONObject event) {
public void enrichAndSubmit(final String etName, final JSONObject event) {
final JSONObject metadata = new JSONObject()
.put("occurred_at", Instant.now())
.put("eid", uuidGenerator.randomUUID())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.zalando.nakadi.service.job;


import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties(prefix = "nakadi.jobs.diskUsageStats")
public class DiskUsageStatsConfig {
private String authDataType;
private String authValue;
private String owningApplication;
private String eventTypeName;
private long runPeriodMs;

public String getAuthDataType() {
return authDataType;
}

public void setAuthDataType(final String authDataType) {
this.authDataType = authDataType;
}

public String getAuthValue() {
return authValue;
}

public void setAuthValue(final String authValue) {
this.authValue = authValue;
}

public String getOwningApplication() {
return owningApplication;
}

public void setOwningApplication(final String owningApplication) {
this.owningApplication = owningApplication;
}

public String getEventTypeName() {
return eventTypeName;
}

public void setEventTypeName(final String eventTypeName) {
this.eventTypeName = eventTypeName;
}

public long getRunPeriodMs() {
return runPeriodMs;
}

public void setRunPeriodMs(final long runPeriodMs) {
this.runPeriodMs = runPeriodMs;
}
}
123 changes: 123 additions & 0 deletions src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package org.zalando.nakadi.service.job;

import com.google.common.annotations.VisibleForTesting;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.domain.TopicPartition;
import org.zalando.nakadi.repository.TopicRepositoryHolder;
import org.zalando.nakadi.repository.db.TimelineDbRepository;
import org.zalando.nakadi.service.EventsProcessor;
import org.zalando.nakadi.service.SystemEventTypeInitializer;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Service
public class DiskUsageStatsJob {
private static final String JOB_NAME = "disk-usage";

private final ExclusiveJobWrapper wrapper;
private final TimelineDbRepository timelineDbRepository;
private final TopicRepositoryHolder topicRepositoryHolder;
private final EventsProcessor eventsProcessor;
private final SystemEventTypeInitializer systemEventTypeInitializer;
private final DiskUsageStatsConfig config;

private static final Logger LOG = LoggerFactory.getLogger(DiskUsageStatsJob.class);

public DiskUsageStatsJob(
final JobWrapperFactory jobWrapperFactory,
final TimelineDbRepository timelineDbRepository,
final TopicRepositoryHolder topicRepositoryHolder,
final EventsProcessor eventsProcessor,
final SystemEventTypeInitializer systemEventTypeInitializer,
final DiskUsageStatsConfig config) {
this.timelineDbRepository = timelineDbRepository;
this.topicRepositoryHolder = topicRepositoryHolder;
this.eventsProcessor = eventsProcessor;
this.systemEventTypeInitializer = systemEventTypeInitializer;
this.config = config;
this.wrapper = jobWrapperFactory.createExclusiveJobWrapper(JOB_NAME, config.getRunPeriodMs());
}

@PostConstruct
public void prepareEventType() throws IOException {
final Map<String, String> nameReplacements = new HashMap<>();
nameReplacements.put("event_type_name_placeholder", config.getEventTypeName());
nameReplacements.put("owning_application_placeholder", config.getOwningApplication());
nameReplacements.put("auth_data_type_placeholder", config.getAuthDataType());
nameReplacements.put("auth_value_placeholder", config.getAuthValue());
this.systemEventTypeInitializer.createEventTypesFromResource(
"disk_usage_event_type.json",
nameReplacements);
}

@Scheduled(
fixedDelayString = "${nakadi.jobs.checkRunMs}",
initialDelayString = "${random.int(${nakadi.jobs.checkRunMs})}")
public void dumpDiskStats() {
try {
wrapper.runJobLocked(this::dumpDiskStatsLocked);
} catch (RuntimeException ex) {
LOG.warn("Disk stats dump failed", ex);
}
}

private void dumpDiskStatsLocked() {
final Map<String, Long> eventTypeSize = loadDiskUsage();
// Stats are here.
antban marked this conversation as resolved.
Show resolved Hide resolved
publishSizeStats(eventTypeSize);
}

@VisibleForTesting
Map<String, Long> loadDiskUsage() {
final Map<String, Map<String, String>> storageTopicToEventType = new HashMap<>();
final Map<String, Storage> storages = new HashMap<>();
final List<Timeline> allTimelines = timelineDbRepository.listTimelinesOrdered();
for (final Timeline t : allTimelines) {
if (t.isDeleted()) {
continue;
}
storages.put(t.getStorage().getId(), t.getStorage());
storageTopicToEventType
.computeIfAbsent(t.getStorage().getId(), v -> new HashMap<>())
.put(t.getTopic(), t.getEventType());
}

final Map<String, Long> eventTypeSize = new HashMap<>();

for (final Map.Entry<String, Map<String, String>> storageEntry : storageTopicToEventType.entrySet()) {
final Map<TopicPartition, Long> data = topicRepositoryHolder
.getTopicRepository(storages.get(storageEntry.getKey()))
.getSizeStats();
data.entrySet().stream()
.filter(v -> storageEntry.getValue().containsKey(v.getKey().getTopic()))
.forEach(item -> {
final String eventType = storageEntry.getValue().get(item.getKey().getTopic());
eventTypeSize.compute(
eventType,
(et, oldSize) -> null == oldSize ? item.getValue() : (oldSize + item.getValue()));
});
}
return eventTypeSize;
}

private void publishSizeStats(final Map<String, Long> eventTypeSizes) {
eventTypeSizes.entrySet().stream()
.map(x -> {
final JSONObject obj = new JSONObject();
obj.put("event_type", x.getKey());
obj.put("size_bytes", x.getValue() * 1024);
return obj;
})
.forEach(item -> eventsProcessor.enrichAndSubmit(config.getEventTypeName(), item));
}
}
Loading